You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by GitBox <gi...@apache.org> on 2022/02/03 06:08:56 UTC

[GitHub] [spark] jackierwzhang opened a new pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

jackierwzhang opened a new pull request #35385:
URL: https://github.com/apache/spark/pull/35385


   ### What changes were proposed in this pull request?
   Field Id is a native field in the Parquet schema (https://github.com/apache/parquet-format/blob/master/src/main/thrift/parquet.thrift#L398)
   
   After this PR, when the requested schema has field IDs, Parquet readers will first use the field ID to determine which Parquet columns to read, before falling back to using column names as before. 
   
   This PR supports:
   - OSS vectorized reader
   
   does not support:
   - Parquet-mr reader due to lack of field id support (needs a follow up ticket)
   
   ### Why are the changes needed?
   It enables matching columns by field id for supported DWs like iceberg and Delta. Specifically, it enables easy conversion from Iceberg (which uses field ids by name) to Delta, and allows `id` mode for Delta [column mapping](https://docs.databricks.com/delta/delta-column-mapping.html)
   
   ### Does this PR introduce _any_ user-facing change?
   N/A
   
   ### How was this patch tested?
   Existing tests + new unit tests.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang edited a comment on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang edited a comment on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031112571


   @huaxingao 
   
   Got it. 
   
   As for duplicated field id, I think in my approach, reading parquet files with duplicated id across different groups are allowed, essentially we just don't want confusion when matching fields which are on the same level in the schema.
   
   Btw just curious, since you have been working on field id resolution for parquet-mr, do you know whether it currently supports reading and writing field ids yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802280998



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Not really. Again, enabling this flag would only try to match field ids `if there exists any`, but disabling this flag will completely ignore matching using field id.
   
   I wanted to enable this flag for all tests to detect any regressions in existing test cases, in case that in the future this flag is turned on by default.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802283455



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"

Review comment:
       It is a spark metadata field we are proposing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801102086



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
+      useFieldId: Boolean,

Review comment:
       I suppose this still needs to be updated...

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("Parquet reads infer fields using field ids correctly") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType().add(
+          "a", StringType, true, withId(0))
+          .add("b", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("random", IntegerType, true, withId(1))
+          .add("name", StringType, true, withId(0))
+
+      val readData = Seq(Row("text", 100), Row("more", 200))
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        // read with schema
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
+          .where("b < 50"), Seq.empty)
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
+          .where("a >= 'oh'"), Row("text", 100) :: Nil)
+        // schema inference should pull into the schema with ids
+        val reader = spark.read.parquet(dir.getCanonicalPath)
+        assert(reader.schema == writeSchema)
+        checkAnswer(reader.where("name >= 'oh'"), Row(100, "text") :: Nil)
+      }
+
+      // blocked for Parquet-mr reader
+      val e = intercept[SparkException] {
+        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+          checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)
+        }
+      }
+      val cause = e.getCause
+      assert(cause.isInstanceOf[java.io.IOException] &&
+        cause.getMessage.contains("Parquet-mr reader does not support schema with field IDs."))
+    }
+  }
+
+  test("absence of field ids") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+          .add("b", StringType, true, withId(2))
+          .add("c", IntegerType, true, withId(3))
+
+      val writeSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(3))
+          .add("randomName", StringType, true)
+
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath),
+          // 3 different cases for the 3 columns to read:
+          //   - a: ID 1 is not found, but there is column with name `a`, still return null
+          //   - b: ID 2 is not found, return null
+          //   - c: ID 3 is found, read it
+          Row(null, null, 100) :: Row(null, null, 200) :: Nil)
+      }
+    }
+  }
+
+  test("multiple id matches") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+          .add("rand1", StringType, true, withId(2))
+          .add("rand2", StringType, true, withId(1))
+
+      val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr"))
+
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        val cause = intercept[SparkException] {
+          spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect()
+        }.getCause
+        assert(cause.isInstanceOf[RuntimeException] &&
+          cause.getMessage.contains("Found duplicate field(s)"))
+      }
+    }
+  }
+
+  test("read parquet file without ids") {

Review comment:
       Can we add another test when half of the columns has field ids and the other one does not? Thank you.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
##########
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
+
+  private val FAKE_COLUMN_NAME = "_fake_name_"
+  private val UUID_REGEX =
+    "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r
+
+  private def withId(id: Int) =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  private def testSchemaClipping(

Review comment:
       There is already `testSchemaClipping` in ParquetSchemaSuite. Would it be possible to refactor this? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803355927



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -277,27 +346,40 @@ object ParquetReadSupport {
       parquetMap: GroupType,
       keyType: DataType,
       valueType: DataType,
-      caseSensitive: Boolean): GroupType = {
+      caseSensitive: Boolean,
+      useFieldId: Boolean): GroupType = {
     // Precondition of this method, only handles maps with nested key types or value types.
     assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
 
     val repeatedGroup = parquetMap.getType(0).asGroupType()
     val parquetKeyType = repeatedGroup.getType(0)
     val parquetValueType = repeatedGroup.getType(1)
 
-    val clippedRepeatedGroup =
-      Types
+    val clippedRepeatedGroup = {
+      val newRepeatedGroup = Types
         .repeatedGroup()
         .as(repeatedGroup.getLogicalTypeAnnotation)
-        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
-        .addField(clipParquetType(parquetValueType, valueType, caseSensitive))
+        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive, useFieldId))
+        .addField(clipParquetType(parquetValueType, valueType, caseSensitive, useFieldId))
         .named(repeatedGroup.getName)
+      if (useFieldId && repeatedGroup.getId != null) {
+        newRepeatedGroup.withId(repeatedGroup.getId.intValue())
+      } else {
+        newRepeatedGroup
+      }
+    }
 
-    Types
+    val newMap = Types
       .buildGroup(parquetMap.getRepetition)
       .as(parquetMap.getLogicalTypeAnnotation)
       .addField(clippedRepeatedGroup)
       .named(parquetMap.getName)
+
+    if (useFieldId && parquetMap.getId() != null) {

Review comment:
       Good idea! it should be `parquetType.getId != null` instead of `newParquetType.get != null` btw




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801273754



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
##########
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
+
+  private val FAKE_COLUMN_NAME = "_fake_name_"
+  private val UUID_REGEX =
+    "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r
+
+  private def withId(id: Int) =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  private def testSchemaClipping(

Review comment:
       I would say they have become distinct enough from each other (especially with the random UUID logic) that it is hard to merge them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801099106



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {

Review comment:
       Understood. Can we rename this method to `withVectorizedReader`? This would make it clear that we test only vectorised parquet reader. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801095848



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +355,13 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+
+        if (SQLConf.get.parquetFieldIdEnabled &&
+            ParquetUtils.hasFieldIds(requiredSchema)) {
+          throw new IOException("Parquet-mr reader does not support schema with field IDs." +
+            s" Please choose a different Parquet reader. Read schema: ${requiredSchema.json}")

Review comment:
       Yes, that could work. I would probably update your statement to something like this:
   > Parquet-mr reader does not currently support reading schema with field IDs. Please enable vectorised Parquet reader or set <config> to false.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799044927



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)

Review comment:
       You meant one flag for read and another flag for write, with both disabled by default?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] AmplabJenkins commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
AmplabJenkins commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1028761271


   Can one of the admins verify this patch?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799047791



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -85,13 +85,70 @@ class ParquetReadSupport(
       StructType.fromString(schemaString)
     }
 
+    val parquetRequestedSchema = ParquetReadSupport.getRequestedSchema(
+      context.getFileSchema, catalystRequestedSchema, conf, enableVectorizedReader)
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    val parquetRequestedSchema = readContext.getRequestedSchema
+    new ParquetRecordMaterializer(
+      parquetRequestedSchema,
+      ParquetReadSupport.expandUDT(catalystRequestedSchema),
+      new ParquetToSparkSchemaConverter(conf),
+      convertTz,
+      datetimeRebaseSpec,
+      int96RebaseSpec)
+  }
+}
+
+object ParquetReadSupport extends Logging {
+  val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  def generateFakeColumnName: String = s"_fake_name_${UUID.randomUUID()}"
+
+  def getRequestedSchema(
+      parquetFileSchema: MessageType,
+      catalystRequestedSchema: StructType,
+      conf: Configuration,
+      enableVectorizedReader: Boolean): MessageType = {
     val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
       SQLConf.CASE_SENSITIVE.defaultValue.get)
     val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
-    val parquetFileSchema = context.getFileSchema
+    val useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_ENABLED.key,
+      SQLConf.PARQUET_FIELD_ID_ENABLED.defaultValue.get)
+    val ignoreMissingIds = conf.getBoolean(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key,
+      SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.defaultValue.get)
+
+    if (!ignoreMissingIds &&
+        !containsFieldIds(parquetFileSchema) &&
+        ParquetUtils.hasFieldIds(catalystRequestedSchema)) {
+      throw new RuntimeException(
+        s"""
+           |Spark read schema expects field Ids, but Parquet file schema doesn't contain field Ids.

Review comment:
       how about "please remove the field ids from Spark schema or ignore the missing ids by setting `spark.sql.parquet.fieldId.ignoreMissing = true`"?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
+      useFieldId: Boolean,

Review comment:
       Sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799050749



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,11 +432,50 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
+          }
+        }.getOrElse {
+        // When there is no ID match, we use a fake name to avoid a name match by accident
+        // We need this name to be unique as well, otherwise there will be type conflicts

Review comment:
       This happens when we have a spark column with field id, but there is no parquet column with matching field id.
   By default, if we do nothing, then:
   1. the underlying parquet reader would try to match by name which can cause confusion if user expects the column to be matched by id.
   2. if two fake names are the same but they are of different types, the underlying reader would throw errors indicating conflicting types exist in the schema.
   
   So by generating a unique fake name, this column won't be matched by name and hence user would see consistent `null`s for that column's data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802279394



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +433,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))

Review comment:
       Please see my comment here: https://github.com/apache/spark/pull/35385#discussion_r799050749.
   
   This fake name is not meant to referenced, it is designed to fill `null`s for columns that don't have a matching field id.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799240706



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")
+      .doc("When the Parquet file does't have any field IDs but the" +
+        " Spark read schema is using field IDs to read, we will return silently return nulls" +

Review comment:
       nit: extra `return`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799046507



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +355,13 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+
+        if (SQLConf.get.parquetFieldIdEnabled &&
+            ParquetUtils.hasFieldIds(requiredSchema)) {
+          throw new IOException("Parquet-mr reader does not support schema with field IDs." +
+            s" Please choose a different Parquet reader. Read schema: ${requiredSchema.json}")

Review comment:
       sure, sounds good, how about "Please choose a different Parquet reader or disable reading field id by setting `spark.sql.parquet.fieldId.enabled = false`"

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +355,13 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+
+        if (SQLConf.get.parquetFieldIdEnabled &&
+            ParquetUtils.hasFieldIds(requiredSchema)) {
+          throw new IOException("Parquet-mr reader does not support schema with field IDs." +
+            s" Please choose a different Parquet reader. Read schema: ${requiredSchema.json}")

Review comment:
       sure, sounds good, how about "Please choose a different Parquet reader or disable reading field id by setting `spark.sql.parquet.fieldId.enabled = false`"?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799244801



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -4251,6 +4268,8 @@ class SQLConf extends Serializable with Logging {
 
   def inferDictAsStruct: Boolean = getConf(SQLConf.INFER_NESTED_DICT_AS_STRUCT)
 
+  def parquetFieldIdEnabled: Boolean = getConf(SQLConf.PARQUET_FIELD_ID_ENABLED)
+

Review comment:
       should we have `def ignoreMissingParquetFieldId` here?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801093005



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +

Review comment:
       My understanding is that the code would use field ids if the flag is enabled, if the flag is disabled, the code would use names instead. My main concern is ambiguity resolution in schema.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802170617



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,42 @@ private[parquet] class ParquetRowConverter(
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
     // to prevent throwing IllegalArgumentException when searching catalyst type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    def nameToIndex =
+      catalystType.fields.zipWithIndex.map { case (f, idx) =>
+          (f.name, idx)
+        }.toMap
+
+    val catalystFieldIdxByName = if (SQLConf.get.caseSensitiveAnalysis) {
+      nameToIndex
     } else {
-      CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+      CaseInsensitiveMap(nameToIndex)
     }
+
+    // (SPARK-38094) parquet field ids, if exist, should be prioritized for matching
+    val catalystFieldIdxByFieldId =
+      if (SQLConf.get.parquetFieldIdReadEnabled && ParquetUtils.hasFieldIds(catalystType)) {
+        catalystType.fields
+          .zipWithIndex
+          .filter { case (f, _) => ParquetUtils.hasFieldId(f) }

Review comment:
       The recursive check only detects if **any** (not all of the) fields in the schema contains field id.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802168938



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
-      caseSensitive: Boolean = true): MessageType = {
+      caseSensitive: Boolean,

Review comment:
       I did a code search and didn't find any breaking usages. I think the APIs here are private within Spark?
   If we are talking about maintaining API compatibility for all `def`s in this class, we'd have to overload a bunch of other methods too bc they all had this additional `useFieldId` field.
   
   @sadikovi what do you think? should we just make `useFieldId` default to false? 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802188024



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
-      caseSensitive: Boolean = true): MessageType = {
+      caseSensitive: Boolean,

Review comment:
       I have not reviewed the newer changes yet but you can create an overloaded method without useFieldId to make sure we maintain binary compatibility. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802188024



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
-      caseSensitive: Boolean = true): MessageType = {
+      caseSensitive: Boolean,

Review comment:
       I have not reviewed the newest changes yet but you can create an overloaded method without useFieldId to make sure we maintain binary compatibility. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803139570



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"

Review comment:
       I just want to make sure this does not clash with any existing ones in Spark or Parquet-mr.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799048552



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -324,23 +401,29 @@ object ParquetReadSupport {
    * @return A list of clipped [[GroupType]] fields, which can be empty.
    */
   private def clipParquetGroupFields(
-      parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = {
-    val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
-    if (caseSensitive) {
-      val caseSensitiveParquetFieldMap =
+      parquetRecord: GroupType,
+      structType: StructType,
+      useFieldId: Boolean,
+      caseSensitive: Boolean): Seq[Type] = {
+    val toParquet = new SparkToParquetSchemaConverter(
+      writeLegacyParquetFormat = false, useFieldId = useFieldId)
+    lazy val caseSensitiveParquetFieldMap =
         parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
-      structType.map { f =>
-        caseSensitiveParquetFieldMap
+    lazy val caseInsensitiveParquetFieldMap =
+        parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT))
+    lazy val idToParquetFieldMap =

Review comment:
       It is not allowed, If you look a couple lines below. we throw an exception for that.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang edited a comment on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang edited a comment on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031236141


   @huaxingao Are you suggesting the code that sets field id like https://github.com/apache/parquet-mr/blob/1adc22804a700d78f8480667d083e91d6147339f/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L279 isn't really materialized to files?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang edited a comment on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang edited a comment on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031112571


   @huaxingao 
   
   Got it. 
   
   As for duplicated field id, I think in my approach, reading parquet files with duplicated id across different groups are allowed, essentially we just don't want confusion when matching fields which are on the same level in the schema.
   
   Btw just curious, since you have been working on field id resolution for parquet-mr, do you have whether it current supports reading and writing field ids yet?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang edited a comment on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang edited a comment on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031112571


   @huaxingao 
   
   Got it. I think in my approach, reading parquet files with duplicated id across different groups are allowed, essentially we just don't want confusion when matching fields which are on the same level in the schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802280998



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Not really. Again, enabling this flag would only try to match field ids `if there exists any`, but disabling this flag will completely ignore matching using field id.
   
   I wanted to enable this flag for all tests to detect any regressions in existing test cases, in case when this flag is turned on by default in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803320649



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Oh I see what you meant, let me just grid test this for the parquet suites only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802260182



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,33 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled," +
+        " Parquet writers will populate the field Id" +

Review comment:
       Can you keep spaces at the end of the string similar to all of the examples in the file?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +433,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))
+        }
+    }
+
+    if (useFieldId && ParquetUtils.hasFieldIds(structType)) {
+      structType.map { f =>
+        if (ParquetUtils.hasFieldId(f)) {

Review comment:
       Does case sensitivity make sense in the case of id matching? Would it cause API change when it used to work for case insensitive fields but now it would not because we have written field ids? Or would it in this case ask users to disable `useFieldId` flag?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"

Review comment:
       Did this config value come from Parquet-mr or is it something that you are proposing the PR?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {
+    require(hasFieldId(field),
+      s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
+    field.metadata.getLong(FIELD_ID_METADATA_KEY).toInt

Review comment:
       What happens if field id is not an integer, e.g. set to Long or out of order number, negative, or a string "abc"? Is there a test to check various behaviours?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
##########
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
+
+  private val FAKE_COLUMN_NAME = "_fake_name_"
+  private val UUID_REGEX =
+    "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r
+
+  private def withId(id: Int) =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  private def testSchemaClipping(

Review comment:
       The random id generation could be optional based on useFieldId flag. It seems your version covers the one in ParquetSchemaSuite. I think it would be good to refactor either here or in a follow-up. 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +433,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))

Review comment:
       Can you elaborate why we need to generate a fake name? How can we reference this field later?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -85,13 +85,70 @@ class ParquetReadSupport(
       StructType.fromString(schemaString)
     }
 
+    val parquetRequestedSchema = ParquetReadSupport.getRequestedSchema(
+      context.getFileSchema, catalystRequestedSchema, conf, enableVectorizedReader)
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    val parquetRequestedSchema = readContext.getRequestedSchema
+    new ParquetRecordMaterializer(
+      parquetRequestedSchema,
+      ParquetReadSupport.expandUDT(catalystRequestedSchema),
+      new ParquetToSparkSchemaConverter(conf),
+      convertTz,
+      datetimeRebaseSpec,
+      int96RebaseSpec)
+  }
+}
+
+object ParquetReadSupport extends Logging {
+  val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  def generateFakeColumnName: String = s"_fake_name_${UUID.randomUUID()}"
+
+  def getRequestedSchema(
+      parquetFileSchema: MessageType,
+      catalystRequestedSchema: StructType,
+      conf: Configuration,
+      enableVectorizedReader: Boolean): MessageType = {
     val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
       SQLConf.CASE_SENSITIVE.defaultValue.get)
     val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
-    val parquetFileSchema = context.getFileSchema
+    val useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_ENABLED.key,
+      SQLConf.PARQUET_FIELD_ID_ENABLED.defaultValue.get)
+    val ignoreMissingIds = conf.getBoolean(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key,
+      SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.defaultValue.get)
+
+    if (!ignoreMissingIds &&
+        !containsFieldIds(parquetFileSchema) &&
+        ParquetUtils.hasFieldIds(catalystRequestedSchema)) {
+      throw new RuntimeException(
+        s"""
+           |Spark read schema expects field Ids, but Parquet file schema doesn't contain field Ids.

Review comment:
       Yes, that would be possible.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Does this mean that we will not test match by name and will always test by field id?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/parquet/ParquetWrite.scala
##########
@@ -81,6 +81,9 @@ case class ParquetWrite(
 
     conf.set(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key, sqlConf.parquetOutputTimestampType.toString)
 
+    conf

Review comment:
       nit: can you refactor it to be 
   ```
   conf.set(
     SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.key, 
     sqlConf.parquetFieldIdWriteEnabled.toString)
   ```
   
   if the line is too long?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
-      caseSensitive: Boolean = true): MessageType = {
+      caseSensitive: Boolean,

Review comment:
       I think we should create an overloaded method with the original parameters to maintain binary compatibility here. For other methods, it is fine to keep as is since it is private API.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803320649



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Yeah, but it requires the original schema to contain `parquet.field.id` metadata, which is not present in any of the existing suites, so it should behavior exactly like name matching.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031743446


   @jackierwzhang 
   No, those are set correctly. 
   What I meant is that the field ids are not really used. Seems only the `ColumnPath` is used in column index, column resolution, etc. I am thinking of adding field id in `ColumnDescriptor` and keeping a map between id and `ColumnDescriptor`, or a map between id and `ColumnPath`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #35385: [SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
cloud-fan closed pull request #35385:
URL: https://github.com/apache/spark/pull/35385


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799053096



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {

Review comment:
       Yeah there's a method called `withAllParquetReaders` which contains both parquet-mr and vectorized reader, but I only want to use the vectorized reader here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799050749



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,11 +432,50 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
+          }
+        }.getOrElse {
+        // When there is no ID match, we use a fake name to avoid a name match by accident
+        // We need this name to be unique as well, otherwise there will be type conflicts

Review comment:
       This happens when we have a spark column with field id, but there is no parquet column with matching field id.
   By default, if we do nothing, the underlying parquet reader would try to match by name can cause confusion if user expects the column to be matched by id. So by generating a unique fake name, this column won't be matched by name and hence user would see consistent `null`s for that column's data.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799063775



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+   withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("general test") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType().add(
+          "a", StringType, true, withId(0))
+          .add("b", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("random", IntegerType, true, withId(1))
+          .add("name", StringType, true, withId(0))
+
+      val readData = Seq(Row("text", 100), Row("more", 200))
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)

Review comment:
       Good question. I think schema inference would kick in and sample some parquet files with schema (and field ids if exist), then it's gonna use the spark schema w/ field id to load the parquet files again, so ids would be used whenever possible. 
   I will add another test to confirm it,




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802280998



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Not really. Again, enabling this flag would only try to match field ids **if they exist**, but disabling this flag will completely ignore matching using field id.
   
   I wanted to enable this flag for all tests to detect any regressions in existing test cases, in case when this flag is turned on by default in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801259155



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +

Review comment:
       Ah, I meant even when this flag is enabled, my statement above still applies: the matching is a best-effort basis.
   
   Disabling this flag will complete avoid reading and writing field ids.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801096500



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -85,13 +85,70 @@ class ParquetReadSupport(
       StructType.fromString(schemaString)
     }
 
+    val parquetRequestedSchema = ParquetReadSupport.getRequestedSchema(
+      context.getFileSchema, catalystRequestedSchema, conf, enableVectorizedReader)
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    val parquetRequestedSchema = readContext.getRequestedSchema
+    new ParquetRecordMaterializer(
+      parquetRequestedSchema,
+      ParquetReadSupport.expandUDT(catalystRequestedSchema),
+      new ParquetToSparkSchemaConverter(conf),
+      convertTz,
+      datetimeRebaseSpec,
+      int96RebaseSpec)
+  }
+}
+
+object ParquetReadSupport extends Logging {
+  val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  def generateFakeColumnName: String = s"_fake_name_${UUID.randomUUID()}"
+
+  def getRequestedSchema(
+      parquetFileSchema: MessageType,
+      catalystRequestedSchema: StructType,
+      conf: Configuration,
+      enableVectorizedReader: Boolean): MessageType = {
     val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
       SQLConf.CASE_SENSITIVE.defaultValue.get)
     val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
-    val parquetFileSchema = context.getFileSchema
+    val useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_ENABLED.key,
+      SQLConf.PARQUET_FIELD_ID_ENABLED.defaultValue.get)
+    val ignoreMissingIds = conf.getBoolean(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key,
+      SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.defaultValue.get)
+
+    if (!ignoreMissingIds &&
+        !containsFieldIds(parquetFileSchema) &&
+        ParquetUtils.hasFieldIds(catalystRequestedSchema)) {
+      throw new RuntimeException(
+        s"""
+           |Spark read schema expects field Ids, but Parquet file schema doesn't contain field Ids.

Review comment:
       How can one remove field ids? Maybe we can just recommend disabling the feature flag.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803141001



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,214 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+

Review comment:
       nit: empty line.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
##########
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
+
+  private val FAKE_COLUMN_NAME = "_fake_name_"
+  private val UUID_REGEX =
+    "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r
+
+  private def withId(id: Int) =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  private def testSchemaClipping(

Review comment:
       IMHO, it might be quicker to do it in this PR but it is up to you how you would like to approach this.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -438,16 +438,19 @@ class ParquetToSparkSchemaConverter(
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
-      SQLConf.ParquetOutputTimestampType.INT96) {
+      SQLConf.ParquetOutputTimestampType.INT96,
+    useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {

Review comment:
       +1. Can you check that all javadoc comments have been updated? Thanks.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       But they would exist once we start writing field ids for all of the fields, would not they?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {
+    require(hasFieldId(field),
+      s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
+    field.metadata.getLong(FIELD_ID_METADATA_KEY).toInt

Review comment:
       Can we throw a nice error message telling the user that their field id was incorrect and it should be a valid signed integer?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {

Review comment:
       IMHO, this is fine. I see that hasField() is used separately, and the assertion would still have be implemented somewhere anyway. As long as there is a test for this, it should be good.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("Parquet reads infer fields using field ids correctly") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType().add(
+          "a", StringType, true, withId(0))
+          .add("b", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("random", IntegerType, true, withId(1))
+          .add("name", StringType, true, withId(0))
+
+      val readData = Seq(Row("text", 100), Row("more", 200))
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        // read with schema
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
+          .where("b < 50"), Seq.empty)
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
+          .where("a >= 'oh'"), Row("text", 100) :: Nil)
+        // schema inference should pull into the schema with ids
+        val reader = spark.read.parquet(dir.getCanonicalPath)
+        assert(reader.schema == writeSchema)
+        checkAnswer(reader.where("name >= 'oh'"), Row(100, "text") :: Nil)
+      }
+
+      // blocked for Parquet-mr reader
+      val e = intercept[SparkException] {
+        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+          checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)
+        }
+      }
+      val cause = e.getCause
+      assert(cause.isInstanceOf[java.io.IOException] &&
+        cause.getMessage.contains("Parquet-mr reader does not support schema with field IDs."))
+    }
+  }
+
+  test("absence of field ids") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+          .add("b", StringType, true, withId(2))
+          .add("c", IntegerType, true, withId(3))
+
+      val writeSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(3))
+          .add("randomName", StringType, true)
+
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath),
+          // 3 different cases for the 3 columns to read:
+          //   - a: ID 1 is not found, but there is column with name `a`, still return null
+          //   - b: ID 2 is not found, return null
+          //   - c: ID 3 is found, read it
+          Row(null, null, 100) :: Row(null, null, 200) :: Nil)
+      }
+    }
+  }
+
+  test("multiple id matches") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+          .add("rand1", StringType, true, withId(2))
+          .add("rand2", StringType, true, withId(1))
+
+      val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr"))
+
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        val cause = intercept[SparkException] {
+          spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect()
+        }.getCause
+        assert(cause.isInstanceOf[RuntimeException] &&
+          cause.getMessage.contains("Found duplicate field(s)"))
+      }
+    }
+  }
+
+  test("read parquet file without ids") {

Review comment:
       This was marked as resolved but what test was added?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803318479



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +433,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))
+        }
+    }
+
+    if (useFieldId && ParquetUtils.hasFieldIds(structType)) {
+      structType.map { f =>
+        if (ParquetUtils.hasFieldId(f)) {

Review comment:
       yeah, I think user should turn off the `spark.sql.parquet.fieldId.read.enabled` flag if they see anything surprising.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803314684



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {
+    require(hasFieldId(field),
+      s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
+    field.metadata.getLong(FIELD_ID_METADATA_KEY).toInt

Review comment:
       Sure




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802185648



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -438,16 +438,19 @@ class ParquetToSparkSchemaConverter(
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
-      SQLConf.ParquetOutputTimestampType.INT96) {
+      SQLConf.ParquetOutputTimestampType.INT96,
+    useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
-    outputTimestampType = conf.parquetOutputTimestampType)
+    outputTimestampType = conf.parquetOutputTimestampType,
+    useFieldId = conf.parquetFieldIdWriteEnabled)
 
   def this(conf: Configuration) = this(
     writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
     outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
-      conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)))
+      conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)),
+    useFieldId = SQLConf.get.parquetFieldIdWriteEnabled)

Review comment:
       So this configuration was actually manually set upstream by Spark too, from SQLConf as well, so functionally it is the same.
   
   But for consistency's sake, I updated the PR to set this conf upstream too.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803138470



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +433,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))

Review comment:
       I guess my question was whether or not this will have any side effect when calling `convertField`.  If that makes a full copy of the field, then it should be all good.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031170753


   > I think in my approach, reading parquet files with duplicated id across different groups are allowed, essentially we just don't want confusion when matching fields which are on the same level in the schema.
   
   Sounds reasonable. I hope I can do the same too, but seems to me that I need to resolve the column by id only, which requires that the id to be unique in the entire schema. This is going to be a breaking change. Not sure if I am allowed to do it or not.
   
   It doesn't seem to me that parquet-mr supports reading and writing field ids yet. The field ids are not in `ColumnDescriptor`. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803308096



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+    withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("Parquet reads infer fields using field ids correctly") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType().add(
+          "a", StringType, true, withId(0))
+          .add("b", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("random", IntegerType, true, withId(1))
+          .add("name", StringType, true, withId(0))
+
+      val readData = Seq(Row("text", 100), Row("more", 200))
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        // read with schema
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
+          .where("b < 50"), Seq.empty)
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath)
+          .where("a >= 'oh'"), Row("text", 100) :: Nil)
+        // schema inference should pull into the schema with ids
+        val reader = spark.read.parquet(dir.getCanonicalPath)
+        assert(reader.schema == writeSchema)
+        checkAnswer(reader.where("name >= 'oh'"), Row(100, "text") :: Nil)
+      }
+
+      // blocked for Parquet-mr reader
+      val e = intercept[SparkException] {
+        withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "false") {
+          checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)
+        }
+      }
+      val cause = e.getCause
+      assert(cause.isInstanceOf[java.io.IOException] &&
+        cause.getMessage.contains("Parquet-mr reader does not support schema with field IDs."))
+    }
+  }
+
+  test("absence of field ids") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+          .add("b", StringType, true, withId(2))
+          .add("c", IntegerType, true, withId(3))
+
+      val writeSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(3))
+          .add("randomName", StringType, true)
+
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath),
+          // 3 different cases for the 3 columns to read:
+          //   - a: ID 1 is not found, but there is column with name `a`, still return null
+          //   - b: ID 2 is not found, return null
+          //   - c: ID 3 is found, read it
+          Row(null, null, 100) :: Row(null, null, 200) :: Nil)
+      }
+    }
+  }
+
+  test("multiple id matches") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("a", IntegerType, true, withId(1))
+          .add("rand1", StringType, true, withId(2))
+          .add("rand2", StringType, true, withId(1))
+
+      val writeData = Seq(Row(100, "text", "txt"), Row(200, "more", "mr"))
+
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        val cause = intercept[SparkException] {
+          spark.read.schema(readSchema).parquet(dir.getCanonicalPath).collect()
+        }.getCause
+        assert(cause.isInstanceOf[RuntimeException] &&
+          cause.getMessage.contains("Found duplicate field(s)"))
+      }
+    }
+  }
+
+  test("read parquet file without ids") {

Review comment:
       The additional tests were embedded here https://github.com/apache/spark/blob/723dff7f9d00b4bdc9e0b6d0ea7ecac2820ffdbe/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala#L46




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803309969



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -277,27 +346,40 @@ object ParquetReadSupport {
       parquetMap: GroupType,
       keyType: DataType,
       valueType: DataType,
-      caseSensitive: Boolean): GroupType = {
+      caseSensitive: Boolean,
+      useFieldId: Boolean): GroupType = {
     // Precondition of this method, only handles maps with nested key types or value types.
     assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
 
     val repeatedGroup = parquetMap.getType(0).asGroupType()
     val parquetKeyType = repeatedGroup.getType(0)
     val parquetValueType = repeatedGroup.getType(1)
 
-    val clippedRepeatedGroup =
-      Types
+    val clippedRepeatedGroup = {
+      val newRepeatedGroup = Types
         .repeatedGroup()
         .as(repeatedGroup.getLogicalTypeAnnotation)
-        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
-        .addField(clipParquetType(parquetValueType, valueType, caseSensitive))
+        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive, useFieldId))
+        .addField(clipParquetType(parquetValueType, valueType, caseSensitive, useFieldId))
         .named(repeatedGroup.getName)
+      if (useFieldId && repeatedGroup.getId != null) {
+        newRepeatedGroup.withId(repeatedGroup.getId.intValue())
+      } else {
+        newRepeatedGroup
+      }
+    }
 
-    Types
+    val newMap = Types
       .buildGroup(parquetMap.getRepetition)
       .as(parquetMap.getLogicalTypeAnnotation)
       .addField(clippedRepeatedGroup)
       .named(parquetMap.getName)
+
+    if (useFieldId && parquetMap.getId() != null) {

Review comment:
       Nice nit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803134815



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,33 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, " +
+        "Parquet writers will populate the field Id " +
+        "metadata (if present) in the Spark schema to the Parquet schema.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val PARQUET_FIELD_ID_READ_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.read.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " +
+        "will use field IDs (if present) in the requested Spark schema to look up Parquet " +
+        "fields instead of using column names")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")

Review comment:
       I agree, it is better to be specific that it applies to reads only.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803310340



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +433,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))

Review comment:
       We are making a copy so it should be safe.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803327511



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +446,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))
+        }
+    }
+
+    if (useFieldId && ParquetUtils.hasFieldIds(structType)) {

Review comment:
       Nice nit!




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799043344



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +

Review comment:
       It would try to match by id if id exists, otherwise, it would try to match by name. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802301441



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
##########
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
+
+  private val FAKE_COLUMN_NAME = "_fake_name_"
+  private val UUID_REGEX =
+    "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r
+
+  private def withId(id: Int) =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  private def testSchemaClipping(

Review comment:
       Will do in a follow up I think.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sunchao commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sunchao commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802937583



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +358,7 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+

Review comment:
       nit: unrelated change 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -242,29 +297,43 @@ object ParquetReadSupport {
       // "_tuple" appended then the repeated type is the element type and elements are required.
       // Build a new LIST-annotated group with clipped `repeatedGroup` as element type and the
       // only field.
-      if (
+      val newParquetList = if (
         repeatedGroup.getFieldCount > 1 ||
         repeatedGroup.getName == "array" ||
         repeatedGroup.getName == parquetList.getName + "_tuple"
       ) {
         Types
           .buildGroup(parquetList.getRepetition)
           .as(LogicalTypeAnnotation.listType())
-          .addField(clipParquetType(repeatedGroup, elementType, caseSensitive))
+          .addField(clipParquetType(repeatedGroup, elementType, caseSensitive, useFieldId))
           .named(parquetList.getName)
       } else {
+        val newRepeatedGroup = Types
+          .repeatedGroup()
+          .addField(
+            clipParquetType(
+              repeatedGroup.getType(0), elementType, caseSensitive, useFieldId))
+          .named(repeatedGroup.getName)
+
+        val newElementType = if (useFieldId && repeatedGroup.getId() != null) {

Review comment:
       nit: I think we can use `getId`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +446,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))
+        }
+    }
+
+    if (useFieldId && ParquetUtils.hasFieldIds(structType)) {

Review comment:
       I think this can be simplified:
   ```scala
       val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
       structType.map { f =>
         if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
           matchIdField(f)
         } else if (caseSensitive) {
           matchCaseSensitiveField(f)
         } else {
           matchCaseInsensitiveField(f)
         }
       }
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {

Review comment:
       maybe we can consider to combine `getFieldId` and `hasFieldId` into a single method:
   ```
   def getFieldId(field: StructField): Option[Int]
   ```

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,39 @@ private[parquet] class ParquetRowConverter(
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
     // to prevent throwing IllegalArgumentException when searching catalyst type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    def nameToIndex: Map[String, Int] = catalystType.fieldNames.zipWithIndex.toMap
+
+    val catalystFieldIdxByName = if (SQLConf.get.caseSensitiveAnalysis) {
+      nameToIndex
     } else {
-      CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+      CaseInsensitiveMap(nameToIndex)
     }
+
+    // (SPARK-38094) parquet field ids, if exist, should be prioritized for matching
+    val catalystFieldIdxByFieldId =
+      if (SQLConf.get.parquetFieldIdReadEnabled && ParquetUtils.hasFieldIds(catalystType)) {
+        catalystType.fields
+          .zipWithIndex
+          .filter { case (f, _) => ParquetUtils.hasFieldId(f) }
+          .map { case (f, idx) => (ParquetUtils.getFieldId(f), idx) }
+          .toMap
+      } else {
+        Map.empty[Int, Int]
+      }
+
     parquetType.getFields.asScala.map { parquetField =>
-      val fieldIndex = catalystFieldNameToIndex(parquetField.getName)
-      val catalystField = catalystType(fieldIndex)
+      val catalystFieldIndex = Option(parquetField.getId).map { fieldId =>

Review comment:
       nit: I think we can use `flatMap` here:
   ```scala
   Option(parquetField.getId).flatMap { fieldId =>
           // field has id, try to match by id first before falling back to match by name
           catalystFieldIdxByFieldId.get(fieldId.intValue())
         }.getOrElse {
           // field doesn't have id, just match by name
           catalystFieldIdxByName(parquetField.getName)
         }
   ```
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -438,16 +438,19 @@ class ParquetToSparkSchemaConverter(
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
-      SQLConf.ParquetOutputTimestampType.INT96) {
+      SQLConf.ParquetOutputTimestampType.INT96,
+    useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {

Review comment:
       can we add some doc above describing how this parameter is used?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,33 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, " +
+        "Parquet writers will populate the field Id " +
+        "metadata (if present) in the Spark schema to the Parquet schema.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val PARQUET_FIELD_ID_READ_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.read.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " +
+        "will use field IDs (if present) in the requested Spark schema to look up Parquet " +
+        "fields instead of using column names")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")

Review comment:
       should this be `spark.sql.parquet.fieldId.read.ignoreMissing`? wonder if it will be used on the write path.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -277,27 +346,40 @@ object ParquetReadSupport {
       parquetMap: GroupType,
       keyType: DataType,
       valueType: DataType,
-      caseSensitive: Boolean): GroupType = {
+      caseSensitive: Boolean,
+      useFieldId: Boolean): GroupType = {
     // Precondition of this method, only handles maps with nested key types or value types.
     assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
 
     val repeatedGroup = parquetMap.getType(0).asGroupType()
     val parquetKeyType = repeatedGroup.getType(0)
     val parquetValueType = repeatedGroup.getType(1)
 
-    val clippedRepeatedGroup =
-      Types
+    val clippedRepeatedGroup = {
+      val newRepeatedGroup = Types
         .repeatedGroup()
         .as(repeatedGroup.getLogicalTypeAnnotation)
-        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
-        .addField(clipParquetType(parquetValueType, valueType, caseSensitive))
+        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive, useFieldId))
+        .addField(clipParquetType(parquetValueType, valueType, caseSensitive, useFieldId))
         .named(repeatedGroup.getName)
+      if (useFieldId && repeatedGroup.getId != null) {
+        newRepeatedGroup.withId(repeatedGroup.getId.intValue())
+      } else {
+        newRepeatedGroup
+      }
+    }
 
-    Types
+    val newMap = Types
       .buildGroup(parquetMap.getRepetition)
       .as(parquetMap.getLogicalTypeAnnotation)
       .addField(clippedRepeatedGroup)
       .named(parquetMap.getName)
+
+    if (useFieldId && parquetMap.getId() != null) {

Review comment:
       can we move this into `clipParquetType`? 
   ```scala
     private def clipParquetType(
         parquetType: Type,
         catalystType: DataType,
         caseSensitive: Boolean,
         useFieldId: Boolean): Type = {
       val newParquetType = catalystType match {
         ...
       }
       if (useFieldId && newParquetType.getId != null) {
         newParquetType.withId(parquetType.getId.intValue())
       } else {
         newParquetType
       }
     }
   ```
   

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -109,6 +169,7 @@ class ParquetReadSupport(
       // in parquetRequestedSchema which are not present in the file.
       parquetClippedSchema
     }
+

Review comment:
       ditto




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802280998



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Not really. Again, enabling this flag would only try to match field ids **if they exist**, but disabling this flag will completely ignore matching using field id. so if I read with a spark schema that has no ids at all, and turn on this flag, it would be exactly the same as name matching.
   
   I wanted to enable this flag for all tests to detect any regressions in existing test cases, in case when this flag is turned on by default in the future.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803309969



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -277,27 +346,40 @@ object ParquetReadSupport {
       parquetMap: GroupType,
       keyType: DataType,
       valueType: DataType,
-      caseSensitive: Boolean): GroupType = {
+      caseSensitive: Boolean,
+      useFieldId: Boolean): GroupType = {
     // Precondition of this method, only handles maps with nested key types or value types.
     assert(!isPrimitiveCatalystType(keyType) || !isPrimitiveCatalystType(valueType))
 
     val repeatedGroup = parquetMap.getType(0).asGroupType()
     val parquetKeyType = repeatedGroup.getType(0)
     val parquetValueType = repeatedGroup.getType(1)
 
-    val clippedRepeatedGroup =
-      Types
+    val clippedRepeatedGroup = {
+      val newRepeatedGroup = Types
         .repeatedGroup()
         .as(repeatedGroup.getLogicalTypeAnnotation)
-        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive))
-        .addField(clipParquetType(parquetValueType, valueType, caseSensitive))
+        .addField(clipParquetType(parquetKeyType, keyType, caseSensitive, useFieldId))
+        .addField(clipParquetType(parquetValueType, valueType, caseSensitive, useFieldId))
         .named(repeatedGroup.getName)
+      if (useFieldId && repeatedGroup.getId != null) {
+        newRepeatedGroup.withId(repeatedGroup.getId.intValue())
+      } else {
+        newRepeatedGroup
+      }
+    }
 
-    Types
+    val newMap = Types
       .buildGroup(parquetMap.getRepetition)
       .as(parquetMap.getLogicalTypeAnnotation)
       .addField(clippedRepeatedGroup)
       .named(parquetMap.getName)
+
+    if (useFieldId && parquetMap.getId() != null) {

Review comment:
       Nice nit.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031112571


   > @jackierwzhang FYI: I am working with @shangxinli on column id resolution in parquet-mr [link](https://issues.apache.org/jira/browse/PARQUET-2006), with pretty much the same motivation as yours. The work will probably overlap with yours. One thing that I just realized is that the field id can be NOT unique in schema. For example:
   > 
   > ```
   > message ParquetSchema {
   >   required group reqMap (MAP) = 1 {
   >     repeated group key_value (MAP_KEY_VALUE) {
   >       required binary key (STRING);
   >       optional group value (MAP) {
   >         repeated group key_value (MAP_KEY_VALUE) {
   >           required binary key (STRING);
   >           optional group value {
   >             required binary name (STRING) = 1;
   >             optional binary age (STRING) = 2;
   >             optional binary gender (STRING) = 3;
   >             optional group addedStruct = 4 {
   >               required binary name (STRING) = 1;
   >               optional binary age (STRING) = 2;
   >               optional binary gender (STRING) = 3;
   >             }
   >           }
   >         }
   >       }
   >     }
   >   }
   > }
   > ```
   > 
   > I probably need to change the format specification to make the field id unique.
   
   Got it. I think in my approach, reading parquet files with duplicated id across different groups are allowed, essentially we just don't want confusion when matching fields which are on the same level in the schema.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801094062



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)

Review comment:
       I was thinking about having 2 flags:
   - writing field ids, true by default
   - reading field ids, false by default initially to avoid any regressions.
   
   Alternatively, we can always write field ids, but reads need to be configurable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031236141


   @huaxingao Are you suggesting that code that sets field id like https://github.com/apache/parquet-mr/blob/1adc22804a700d78f8480667d083e91d6147339f/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L279 isn't really materialized to files?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1029741756


   > does not support:
   > - Parquet-mr reader due to lack of field id support (needs a follow up ticket)
   
   Just for my own knowledge: what needs to be done to make parquet-mr support field id?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799044927



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)

Review comment:
       You meant one flag for read and another flag for write?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799063775



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+   withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("general test") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType().add(
+          "a", StringType, true, withId(0))
+          .add("b", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("random", IntegerType, true, withId(1))
+          .add("name", StringType, true, withId(0))
+
+      val readData = Seq(Row("text", 100), Row("more", 200))
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)

Review comment:
       Good question. I think schema inference would kick in and sample some parquet files with schema (and field ids if exist), then it's gonna use the spark schema w/ field id to load the parquet files again, so ids would be used whenever possible. Thus, i think it'll be ["random", "name"], or exactly the write schema itself.
   I will add another test to confirm it.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803320649



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Yeah, but it requires the original schema to contain `parquet.field.id` metadata, which is not present in any of the existing suites, so it should behavior exactly like name matching.
   
   Turning this on actually ensures that we didn't introduce any regression for existing code under this mixed matching mode, and detects if this metadata field has been used anywhere. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801273754



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdSchemaSuite.scala
##########
@@ -0,0 +1,501 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.parquet.schema.{MessageType, MessageTypeParser}
+
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
+
+class ParquetFieldIdSchemaSuite extends ParquetSchemaTest {
+
+  private val FAKE_COLUMN_NAME = "_fake_name_"
+  private val UUID_REGEX =
+    "[a-fA-F0-9]{8}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{4}-[a-fA-F0-9]{12}".r
+
+  private def withId(id: Int) =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  private def testSchemaClipping(

Review comment:
       hmm I would say they have become distinct enough from each other (especially with the random UUID logic) that it is hard to merge them.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803137819



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +433,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))
+        }
+    }
+
+    if (useFieldId && ParquetUtils.hasFieldIds(structType)) {
+      structType.map { f =>
+        if (ParquetUtils.hasFieldId(f)) {

Review comment:
       This does not answer the question above. I understand that name matching would be apply for fields that don't have ids but what about the opposite?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1032199353


   > @jackierwzhang No, those are set correctly. What I meant is that the field ids are not really used. Seems only the `ColumnPath` is used in column index, column resolution, etc. I am thinking of adding field id in `ColumnDescriptor` and keeping a map between id and `ColumnDescriptor`, or a map between id and `ColumnPath`.
   
   Got it. I was asking because I tested locally and found that parquet-mr **can** actually save and read field ids via Spark, so I don't have to patch anything for the parquet-mr repo.
   
   Tho there are a couple of small problems remaining for id matching on the parquet-mr side, I believe It's possible to extend this PR (or open another) to enable spark to match by id in that code path; I'm gonna do that soon.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang edited a comment on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang edited a comment on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031236141


   @huaxingao Are you suggesting the code that:
   
   1. sets field id like https://github.com/apache/parquet-mr/blob/1adc22804a700d78f8480667d083e91d6147339f/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L279, and
   2. reads field id like https://github.com/apache/parquet-mr/blob/5608695f5777de1eb0899d9075ec9411cfdf31d3/parquet-thrift/src/main/java/org/apache/parquet/thrift/ThriftSchemaConvertVisitor.java#L271
   
   aren't really doing anything?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801265948



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -85,13 +85,70 @@ class ParquetReadSupport(
       StructType.fromString(schemaString)
     }
 
+    val parquetRequestedSchema = ParquetReadSupport.getRequestedSchema(
+      context.getFileSchema, catalystRequestedSchema, conf, enableVectorizedReader)
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    val parquetRequestedSchema = readContext.getRequestedSchema
+    new ParquetRecordMaterializer(
+      parquetRequestedSchema,
+      ParquetReadSupport.expandUDT(catalystRequestedSchema),
+      new ParquetToSparkSchemaConverter(conf),
+      convertTz,
+      datetimeRebaseSpec,
+      int96RebaseSpec)
+  }
+}
+
+object ParquetReadSupport extends Logging {
+  val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  def generateFakeColumnName: String = s"_fake_name_${UUID.randomUUID()}"
+
+  def getRequestedSchema(
+      parquetFileSchema: MessageType,
+      catalystRequestedSchema: StructType,
+      conf: Configuration,
+      enableVectorizedReader: Boolean): MessageType = {
     val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
       SQLConf.CASE_SENSITIVE.defaultValue.get)
     val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
-    val parquetFileSchema = context.getFileSchema
+    val useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_ENABLED.key,
+      SQLConf.PARQUET_FIELD_ID_ENABLED.defaultValue.get)
+    val ignoreMissingIds = conf.getBoolean(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key,
+      SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.defaultValue.get)
+
+    if (!ignoreMissingIds &&
+        !containsFieldIds(parquetFileSchema) &&
+        ParquetUtils.hasFieldIds(catalystRequestedSchema)) {
+      throw new RuntimeException(
+        s"""
+           |Spark read schema expects field Ids, but Parquet file schema doesn't contain field Ids.

Review comment:
       I think spark has this [API](https://github.com/apache/spark/blob/58d3f1516ed812b692709991e551829aa0090578/sql/catalyst/src/main/scala/org/apache/spark/sql/types/StructType.scala#L187) to modify metadata of StructField, so it'd be possible to remove field id from the schema right?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801281141



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +355,13 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+
+        if (SQLConf.get.parquetFieldIdEnabled &&
+            ParquetUtils.hasFieldIds(requiredSchema)) {
+          throw new IOException("Parquet-mr reader does not support schema with field IDs." +
+            s" Please choose a different Parquet reader. Read schema: ${requiredSchema.json}")

Review comment:
       deprecated as parquet-mr is supported now.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802282702



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,10 +433,49 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, caseSensitive, useFieldId)
+          }
+        }.getOrElse {
+          // When there is no ID match, we use a fake name to avoid a name match by accident
+          // We need this name to be unique as well, otherwise there will be type conflicts
+          toParquet.convertField(f.copy(name = generateFakeColumnName))
+        }
+    }
+
+    if (useFieldId && ParquetUtils.hasFieldIds(structType)) {
+      structType.map { f =>
+        if (ParquetUtils.hasFieldId(f)) {

Review comment:
       It still does make sense when we are reading using spark schema with partially filled field ids. In that case, we would fall back to name matching for those columns without field ids and case-sensitivity still matters.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #35385: [SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
cloud-fan commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1044679239


   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sadikovi commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
sadikovi commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799006839



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFileFormat.scala
##########
@@ -354,6 +355,13 @@ class ParquetFileFormat
         }
       } else {
         logDebug(s"Falling back to parquet-mr")
+
+        if (SQLConf.get.parquetFieldIdEnabled &&
+            ParquetUtils.hasFieldIds(requiredSchema)) {
+          throw new IOException("Parquet-mr reader does not support schema with field IDs." +
+            s" Please choose a different Parquet reader. Read schema: ${requiredSchema.json}")

Review comment:
       Maybe we should recommend disabling the flag instead or in addition to?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)

Review comment:
       I am wondering if there should be two flags or one flag for reads only that is switched to false to enable easier migration.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {

Review comment:
       I believe there is already a method for it in ParquetTest.scala, otherwise it is fine to keep as is.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+   withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("general test") {

Review comment:
       Can we change it to "Parquet reads infer fields using field ids correctly" or something like that? We need to make sure it is clear that we write random field names and infer fields using ids instead. Maybe you can add a small comment to clarify.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {
+    require(hasFieldId(field),
+      "The key `parquet.field.id` doesn't exist in the metadata of " + field)

Review comment:
       Can we use FIELD_ID_METADATA_KEY here?

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.

Review comment:
       We should just use vectorised reader here, there is only one.

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,11 +432,50 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
+          }
+        }.getOrElse {
+        // When there is no ID match, we use a fake name to avoid a name match by accident
+        // We need this name to be unique as well, otherwise there will be type conflicts

Review comment:
       I am not quite sure I follow this statement. Can you elaborate what you mean by assigning a fake name here? What kind of fields will fall into this category? Do fake ids need to be deterministic?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
+      useFieldId: Boolean,

Review comment:
       Is it possible to not change the order of the fields and add `useFieldId` at the end of the list of arguments instead?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -85,13 +85,70 @@ class ParquetReadSupport(
       StructType.fromString(schemaString)
     }
 
+    val parquetRequestedSchema = ParquetReadSupport.getRequestedSchema(
+      context.getFileSchema, catalystRequestedSchema, conf, enableVectorizedReader)
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)
+  }
+
+  /**
+   * Called on executor side after [[init()]], before instantiating actual Parquet record readers.
+   * Responsible for instantiating [[RecordMaterializer]], which is used for converting Parquet
+   * records to Catalyst [[InternalRow]]s.
+   */
+  override def prepareForRead(
+      conf: Configuration,
+      keyValueMetaData: JMap[String, String],
+      fileSchema: MessageType,
+      readContext: ReadContext): RecordMaterializer[InternalRow] = {
+    val parquetRequestedSchema = readContext.getRequestedSchema
+    new ParquetRecordMaterializer(
+      parquetRequestedSchema,
+      ParquetReadSupport.expandUDT(catalystRequestedSchema),
+      new ParquetToSparkSchemaConverter(conf),
+      convertTz,
+      datetimeRebaseSpec,
+      int96RebaseSpec)
+  }
+}
+
+object ParquetReadSupport extends Logging {
+  val SPARK_ROW_REQUESTED_SCHEMA = "org.apache.spark.sql.parquet.row.requested_schema"
+
+  val SPARK_METADATA_KEY = "org.apache.spark.sql.parquet.row.metadata"
+
+  def generateFakeColumnName: String = s"_fake_name_${UUID.randomUUID()}"
+
+  def getRequestedSchema(
+      parquetFileSchema: MessageType,
+      catalystRequestedSchema: StructType,
+      conf: Configuration,
+      enableVectorizedReader: Boolean): MessageType = {
     val caseSensitive = conf.getBoolean(SQLConf.CASE_SENSITIVE.key,
       SQLConf.CASE_SENSITIVE.defaultValue.get)
     val schemaPruningEnabled = conf.getBoolean(SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.key,
       SQLConf.NESTED_SCHEMA_PRUNING_ENABLED.defaultValue.get)
-    val parquetFileSchema = context.getFileSchema
+    val useFieldId = conf.getBoolean(SQLConf.PARQUET_FIELD_ID_ENABLED.key,
+      SQLConf.PARQUET_FIELD_ID_ENABLED.defaultValue.get)
+    val ignoreMissingIds = conf.getBoolean(SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.key,
+      SQLConf.IGNORE_MISSING_PARQUET_FIELD_ID.defaultValue.get)
+
+    if (!ignoreMissingIds &&
+        !containsFieldIds(parquetFileSchema) &&
+        ParquetUtils.hasFieldIds(catalystRequestedSchema)) {
+      throw new RuntimeException(
+        s"""
+           |Spark read schema expects field Ids, but Parquet file schema doesn't contain field Ids.

Review comment:
       Can we extend the error message to hint on how to fix the error?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)

Review comment:
       I am inclined to keep this config as false by default

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +

Review comment:
       How does it work when there is a mixture of columns that have field id set and ones that don't?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -324,23 +401,29 @@ object ParquetReadSupport {
    * @return A list of clipped [[GroupType]] fields, which can be empty.
    */
   private def clipParquetGroupFields(
-      parquetRecord: GroupType, structType: StructType, caseSensitive: Boolean): Seq[Type] = {
-    val toParquet = new SparkToParquetSchemaConverter(writeLegacyParquetFormat = false)
-    if (caseSensitive) {
-      val caseSensitiveParquetFieldMap =
+      parquetRecord: GroupType,
+      structType: StructType,
+      useFieldId: Boolean,
+      caseSensitive: Boolean): Seq[Type] = {
+    val toParquet = new SparkToParquetSchemaConverter(
+      writeLegacyParquetFormat = false, useFieldId = useFieldId)
+    lazy val caseSensitiveParquetFieldMap =
         parquetRecord.getFields.asScala.map(f => f.getName -> f).toMap
-      structType.map { f =>
-        caseSensitiveParquetFieldMap
+    lazy val caseInsensitiveParquetFieldMap =
+        parquetRecord.getFields.asScala.groupBy(_.getName.toLowerCase(Locale.ROOT))
+    lazy val idToParquetFieldMap =

Review comment:
       Is it possible to fields to have the same field id?

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -349,11 +432,50 @@ object ParquetReadSupport {
               throw QueryExecutionErrors.foundDuplicateFieldInCaseInsensitiveModeError(
                 f.name, parquetTypesString)
             } else {
-              clipParquetType(parquetTypes.head, f.dataType, caseSensitive)
+              clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
             }
           }.getOrElse(toParquet.convertField(f))
+    }
+
+    def matchIdField(f: StructField): Type = {
+      val fieldId = ParquetUtils.getFieldId(f)
+      idToParquetFieldMap
+        .get(fieldId)
+        .map { parquetTypes =>
+          if (parquetTypes.size > 1) {
+            // Need to fail if there is ambiguity, i.e. more than one field is matched
+            val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
+            throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
+              fieldId, parquetTypesString)
+          } else {
+            clipParquetType(parquetTypes.head, f.dataType, useFieldId, caseSensitive)
+          }
+        }.getOrElse {
+        // When there is no ID match, we use a fake name to avoid a name match by accident

Review comment:
       nit: indentation needs to be fixed here.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+   withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)

Review comment:
       nit: indentation is wrong.

##########
File path: sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetFieldIdIOSuite.scala
##########
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet
+
+import scala.collection.JavaConverters._
+
+import org.apache.spark.SparkException
+import org.apache.spark.sql.{QueryTest, Row}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSparkSession
+import org.apache.spark.sql.types.{IntegerType, Metadata, MetadataBuilder, StringType, StructType}
+
+class ParquetFieldIdIOSuite extends QueryTest with ParquetTest with SharedSparkSession  {
+
+  private def withId(id: Int): Metadata =
+    new MetadataBuilder().putLong(ParquetUtils.FIELD_ID_METADATA_KEY, id).build()
+
+  /**
+   * Field id is supported in OSS vectorized reader at the moment.
+   * parquet-mr support is coming soon.
+   */
+  private def withAllSupportedReaders(code: => Unit): Unit = {
+   withSQLConf(SQLConf.PARQUET_VECTORIZED_READER_ENABLED.key -> "true")(code)
+  }
+
+  test("general test") {
+    withTempDir { dir =>
+      val readSchema =
+        new StructType().add(
+          "a", StringType, true, withId(0))
+          .add("b", IntegerType, true, withId(1))
+
+      val writeSchema =
+        new StructType()
+          .add("random", IntegerType, true, withId(1))
+          .add("name", StringType, true, withId(0))
+
+      val readData = Seq(Row("text", 100), Row("more", 200))
+      val writeData = Seq(Row(100, "text"), Row(200, "more"))
+      spark.createDataFrame(writeData.asJava, writeSchema)
+        .write.mode("overwrite").parquet(dir.getCanonicalPath)
+
+      withAllSupportedReaders {
+        checkAnswer(spark.read.schema(readSchema).parquet(dir.getCanonicalPath), readData)

Review comment:
       What happens if you read data without providing a schema? Will the schema be ["random", "name"] or ["name", "random"]?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
huaxingao commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799240174



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names; Parquet writers will also populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")
+      .doc("When the Parquet file does't have any field IDs but the" +

Review comment:
       nit: typo `does't`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] huaxingao commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
huaxingao commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031068435


   @jackierwzhang 
   FYI: I am working with @shangxinli on column id resolution in parquet-mr [link](https://issues.apache.org/jira/browse/PARQUET-2006), with pretty much the same motivation as yours. The work will probably overlap with yours. 
   One thing that I just realized is that the field id can be NOT unique in schema. For example:
   ```
   message ParquetSchema {
     required group reqMap (MAP) = 1 {
       repeated group key_value (MAP_KEY_VALUE) {
         required binary key (STRING);
         optional group value (MAP) {
           repeated group key_value (MAP_KEY_VALUE) {
             required binary key (STRING);
             optional group value {
               required binary name (STRING) = 1;
               optional binary age (STRING) = 2;
               optional binary gender (STRING) = 3;
               optional group addedStruct = 4 {
                 required binary name (STRING) = 1;
                 optional binary age (STRING) = 2;
                 optional binary gender (STRING) = 3;
               }
             }
           }
         }
       }
     }
   }
   ```
   I probably need to change the format specification to make the field id unique.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r799043344



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +

Review comment:
       It would try to match by id if id exists, otherwise, it would fall back to match by name. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801259155



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,23 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +

Review comment:
       Ah, I meant even when this flag is enabled, my statement above still applies.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1030483065


   > > does not support:
   > > 
   > > * Parquet-mr reader due to lack of field id support (needs a follow up ticket)
   > 
   > Just for my own knowledge: what needs to be done to make parquet-mr support field id?
   
   I am still investigating, previously I thought it requires support from parquet-mr, but now looks like it's not necessary. 
   
   I am working on a fix locally, which might be pushed out as part of this PR or another.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang edited a comment on pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang edited a comment on pull request #35385:
URL: https://github.com/apache/spark/pull/35385#issuecomment-1031236141


   @huaxingao Are you suggesting the code that sets field id like https://github.com/apache/parquet-mr/blob/1adc22804a700d78f8480667d083e91d6147339f/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java#L279 isn't really doing anything?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] martin-g commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
martin-g commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r801428949



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -156,9 +189,10 @@ object ParquetReadSupport {
   def clipParquetSchema(
       parquetSchema: MessageType,
       catalystSchema: StructType,
-      caseSensitive: Boolean = true): MessageType = {
+      caseSensitive: Boolean,

Review comment:
       is the API break allowed here ?
   Maybe it is better to add overloaded method ?!

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadSupport.scala
##########
@@ -85,13 +85,70 @@ class ParquetReadSupport(
       StructType.fromString(schemaString)
     }
 
+    val parquetRequestedSchema = ParquetReadSupport.getRequestedSchema(
+      context.getFileSchema, catalystRequestedSchema, conf, enableVectorizedReader)
+    new ReadContext(parquetRequestedSchema, Map.empty[String, String].asJava)

Review comment:
       Why not just use `new HashMap<String, String>()` ? 

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,42 @@ private[parquet] class ParquetRowConverter(
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
     // to prevent throwing IllegalArgumentException when searching catalyst type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    def nameToIndex =
+      catalystType.fields.zipWithIndex.map { case (f, idx) =>
+          (f.name, idx)
+        }.toMap
+
+    val catalystFieldIdxByName = if (SQLConf.get.caseSensitiveAnalysis) {
+      nameToIndex
     } else {
-      CaseInsensitiveMap(catalystType.fieldNames.zipWithIndex.toMap)
+      CaseInsensitiveMap(nameToIndex)
     }
+
+    // (SPARK-38094) parquet field ids, if exist, should be prioritized for matching
+    val catalystFieldIdxByFieldId =
+      if (SQLConf.get.parquetFieldIdReadEnabled && ParquetUtils.hasFieldIds(catalystType)) {
+        catalystType.fields
+          .zipWithIndex
+          .filter { case (f, _) => ParquetUtils.hasFieldId(f) }

Review comment:
       is this filter needed ? 
   `ParquetUtils.hasFieldIds(catalystType)` at line 219 checks recursively

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetSchemaConverter.scala
##########
@@ -438,16 +438,19 @@ class ParquetToSparkSchemaConverter(
 class SparkToParquetSchemaConverter(
     writeLegacyParquetFormat: Boolean = SQLConf.PARQUET_WRITE_LEGACY_FORMAT.defaultValue.get,
     outputTimestampType: SQLConf.ParquetOutputTimestampType.Value =
-      SQLConf.ParquetOutputTimestampType.INT96) {
+      SQLConf.ParquetOutputTimestampType.INT96,
+    useFieldId: Boolean = SQLConf.PARQUET_FIELD_ID_WRITE_ENABLED.defaultValue.get) {
 
   def this(conf: SQLConf) = this(
     writeLegacyParquetFormat = conf.writeLegacyParquetFormat,
-    outputTimestampType = conf.parquetOutputTimestampType)
+    outputTimestampType = conf.parquetOutputTimestampType,
+    useFieldId = conf.parquetFieldIdWriteEnabled)
 
   def this(conf: Configuration) = this(
     writeLegacyParquetFormat = conf.get(SQLConf.PARQUET_WRITE_LEGACY_FORMAT.key).toBoolean,
     outputTimestampType = SQLConf.ParquetOutputTimestampType.withName(
-      conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)))
+      conf.get(SQLConf.PARQUET_OUTPUT_TIMESTAMP_TYPE.key)),
+    useFieldId = SQLConf.get.parquetFieldIdWriteEnabled)

Review comment:
       It is inconsistent with the other settings above.
   I am not sure why but the other use `conf.get(SQLConf.X_Y_Z.key).toAbc`

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,30 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled," +
+        " Parquet writers will populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf
+      .createWithDefault(true)
+
+  val PARQUET_FIELD_ID_READ_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.read.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers" +
+        " will use field IDs (if present) in the requested Spark schema to look up Parquet" +
+        " fields instead of using column names")
+      .booleanConf
+      .createWithDefault(false)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")
+      .doc("When the Parquet file doesn't have any field IDs but the" +
+        " Spark read schema is using field IDs to read, we will silently return nulls" +
+        "when this flag is enabled, or error otherwise.")

Review comment:
       missing leading space before `when`

##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala
##########
@@ -203,16 +203,42 @@ private[parquet] class ParquetRowConverter(
   private[this] val fieldConverters: Array[Converter with HasParentContainerUpdater] = {
     // (SPARK-31116) Use case insensitive map if spark.sql.caseSensitive is false
     // to prevent throwing IllegalArgumentException when searching catalyst type's field index
-    val catalystFieldNameToIndex = if (SQLConf.get.caseSensitiveAnalysis) {
-      catalystType.fieldNames.zipWithIndex.toMap
+    def nameToIndex =
+      catalystType.fields.zipWithIndex.map { case (f, idx) =>
+          (f.name, idx)
+        }.toMap

Review comment:
       isn't `catalystType.fieldNames.zipWithIndex.toMap` the same, but less lines ?

##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,30 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled," +
+        " Parquet writers will populate the field Id" +
+        " metadata (if present) in the Spark schema to the Parquet schema.")
+      .booleanConf

Review comment:
       does it need ` .version("3.3.0")` ?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803323608



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"

Review comment:
       Yep, we didn't find anything existing in current spark or parquet. will note this in user facing change section.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803320649



##########
File path: sql/core/src/test/scala/org/apache/spark/sql/test/TestSQLContext.scala
##########
@@ -61,7 +61,10 @@ private[sql] object TestSQLContext {
   val overrideConfs: Map[String, String] =
     Map(
       // Fewer shuffle partitions to speed up testing.
-      SQLConf.SHUFFLE_PARTITIONS.key -> "5")
+      SQLConf.SHUFFLE_PARTITIONS.key -> "5",
+      // Enable parquet read field id for tests to ensure correctness
+      SQLConf.PARQUET_FIELD_ID_READ_ENABLED.key -> "true"

Review comment:
       Yeah, but it requires the original schema to contain `parquet.field.id` metadata, which is not present in any of the existing suites, so it should behavior exactly like name matching.
   Turning this on actually tests this behavior. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802292496



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {
+    require(hasFieldId(field),
+      s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
+    field.metadata.getLong(FIELD_ID_METADATA_KEY).toInt

Review comment:
       Good point, we don't. 
   I think there are a couple situations here:
   1. If it is negative -> it is allowed, because in the [thrift definition](https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L398), field id can be signed.
   2. If it is a string or other non-integral values -> getLong would fail with a casting exception
   3. If it is a long -> we may have a problem when user specified a long that overflows an integer, it would be a wrong number. I am going to fix this here with a stricter cast and add a test 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r802292496



##########
File path: sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetUtils.scala
##########
@@ -144,6 +144,42 @@ object ParquetUtils {
       file.getName == ParquetFileWriter.PARQUET_METADATA_FILE
   }
 
+  /**
+   * A StructField metadata key used to set the field id of a column in the Parquet schema.
+   */
+  val FIELD_ID_METADATA_KEY = "parquet.field.id"
+
+  /**
+   * Whether there exists a field in the schema, whether inner or leaf, has the parquet field
+   * ID metadata.
+   */
+  def hasFieldIds(schema: StructType): Boolean = {
+    def recursiveCheck(schema: DataType): Boolean = {
+      schema match {
+        case st: StructType =>
+          st.exists(field => hasFieldId(field) || recursiveCheck(field.dataType))
+
+        case at: ArrayType => recursiveCheck(at.elementType)
+
+        case mt: MapType => recursiveCheck(mt.keyType) || recursiveCheck(mt.valueType)
+
+        case _ =>
+          // No need to really check primitive types, just to terminate the recursion
+          false
+      }
+    }
+    if (schema.isEmpty) false else recursiveCheck(schema)
+  }
+
+  def hasFieldId(field: StructField): Boolean =
+    field.metadata.contains(FIELD_ID_METADATA_KEY)
+
+  def getFieldId(field: StructField): Int = {
+    require(hasFieldId(field),
+      s"The key `$FIELD_ID_METADATA_KEY` doesn't exist in the metadata of " + field)
+    field.metadata.getLong(FIELD_ID_METADATA_KEY).toInt

Review comment:
       Good point, we don't. 
   I think there are a couple situations here:
   1. If it is negative -> it is allowed, because in the [thrift definition](https://github.com/apache/parquet-format/blob/54e53e5d7794d383529dd30746378f19a12afd58/src/main/thrift/parquet.thrift#L398), field id can be signed.
   2. If it is a string or other non-integral values -> getLong would fail with a casting exception
   3. If it is a long -> we may have a problem when user specified a long that overflows an integer, it would be a wrong number. I am going to fix this here and add a test 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] jackierwzhang commented on a change in pull request #35385: [WIP][SPARK-38094] Enable matching schema column names by field ids

Posted by GitBox <gi...@apache.org>.
jackierwzhang commented on a change in pull request #35385:
URL: https://github.com/apache/spark/pull/35385#discussion_r803315011



##########
File path: sql/catalyst/src/main/scala/org/apache/spark/sql/internal/SQLConf.scala
##########
@@ -934,6 +934,33 @@ object SQLConf {
     .intConf
     .createWithDefault(4096)
 
+   val PARQUET_FIELD_ID_WRITE_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.write.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, " +
+        "Parquet writers will populate the field Id " +
+        "metadata (if present) in the Spark schema to the Parquet schema.")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(true)
+
+  val PARQUET_FIELD_ID_READ_ENABLED =
+    buildConf("spark.sql.parquet.fieldId.read.enabled")
+      .doc("Field ID is a native field of the Parquet schema spec. When enabled, Parquet readers " +
+        "will use field IDs (if present) in the requested Spark schema to look up Parquet " +
+        "fields instead of using column names")
+      .version("3.3.0")
+      .booleanConf
+      .createWithDefault(false)
+
+  val IGNORE_MISSING_PARQUET_FIELD_ID =
+    buildConf("spark.sql.parquet.fieldId.ignoreMissing")

Review comment:
       Sure.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org