You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "johanl-db (via GitHub)" <gi...@apache.org> on 2023/03/27 17:23:02 UTC

[GitHub] [spark] johanl-db commented on a diff in pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy

johanl-db commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1149566989


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -519,6 +519,13 @@ object FileSourceMetadataAttribute {
   def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute =
     attr.withMetadata(removeInternalMetadata(attr.metadata))
 
+  /**
+   * Cleanup the internal metadata information of a struct field, if it is

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -221,14 +225,23 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false))
 
   /**
-   * Create a file metadata struct column containing fields supported by the given file format.
+   * All fields the file format's _metadata struct defines.

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -322,18 +329,13 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
 
       // extra Project node: wrap flat metadata columns to a metadata struct
       val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
-        val structColumns = metadataColumns.map { col => col.name match {
-            case FileFormat.FILE_PATH | FileFormat.FILE_NAME | FileFormat.FILE_SIZE |
-                 FileFormat.FILE_BLOCK_START | FileFormat.FILE_BLOCK_LENGTH |
-                 FileFormat.FILE_MODIFICATION_TIME =>
-              col
-            case FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME =>
-              generatedMetadataColumns
-                .find(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
-                // Change the `_tmp_metadata_row_index` to `row_index`,
-                // and also change the nullability to not nullable,
-                // which is consistent with the nullability of `row_index` field
-                .get.withName(FileFormat.ROW_INDEX).withNullability(false)
+        val structColumns = metadataStruct.dataType.asInstanceOf[StructType].fields.map { field =>
+          // Construct the metadata struct the query expects to see, using the columns we previously
+          // created. Be sure to restore the proper name and nullability for each metadata field.
+          metadataColumnsByName(field.name) match {

Review Comment:
   I simplified this code to always restore the name and nullability, I don't think it warrants a method anymore.
   I actually think it was not correct before that since you could have a generated column created with `nullable = true` on [L272](https://github.com/apache/spark/pull/40545/commits/2f5f9a26ec5cdf0c04893a18e2cfa386228e7aa2#diff-7e4f78e90e8699733afbe43e2b265b95e514896ac68f1fb9e60705d59a0b7ed9R272) and nullability wouldn't be reset if its `name` and `internalName` are the same



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,37 +247,41 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed to readers,
       //  as the values will be null when trying to read the missing column from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection node below.
+      // Also remember the attribute for each column name, so we can easily map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataStructField(
-                  name = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
-                  internalName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
-                  dataType = LongType,
-                  nullable = true).toAttribute
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataStructField(field.name, field.dataType).toAttribute
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {

Review Comment:
   Added method `createMetadataColumn`



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -221,14 +225,23 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false))
 
   /**
-   * Create a file metadata struct column containing fields supported by the given file format.
+   * All fields the file format's _metadata struct defines.
    */
-  def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = {
-    val fields = if (fileFormat.isInstanceOf[ParquetFileFormat]) {
-      BASE_METADATA_FIELDS :+ StructField(FileFormat.ROW_INDEX, LongType, nullable = false)
+  def metadataSchemaFields(fileFormat: FileFormat): Seq[StructField] =
+    if (fileFormat.isInstanceOf[ParquetFileFormat]) {
+      BASE_METADATA_FIELDS :+ ROW_INDEX_FIELD
     } else {
       BASE_METADATA_FIELDS
     }
+
+  /**
+   * Create a file metadata struct column containing fields supported by the given file format.

Review Comment:
   Done



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -198,6 +198,10 @@ object FileFormat {
   // until they can be placed in the _metadata struct.
   val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
 
+  // The field readers can use to access the generated row index column.

Review Comment:
   Removed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org