You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "jaceklaskowski (via GitHub)" <gi...@apache.org> on 2023/03/25 14:45:46 UTC

[GitHub] [spark] jaceklaskowski commented on a diff in pull request #40545: [WIP][SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy

jaceklaskowski commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1148384721


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -198,6 +198,10 @@ object FileFormat {
   // until they can be placed in the _metadata struct.
   val ROW_INDEX_TEMPORARY_COLUMN_NAME = s"_tmp_metadata_$ROW_INDEX"
 
+  // The field readers can use to access the generated row index column.

Review Comment:
   I don't think the description says anything useful in addition to what's implied anyway from `ROW_INDEX_FIELD` being a public val (_constant_) that others (e.g., "readers") could use. Please remove the comment.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,37 +247,41 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed to readers,
       //  as the values will be null when trying to read the missing column from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection node below.
+      // Also remember the attribute for each column name, so we can easily map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataStructField(
-                  name = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
-                  internalName = FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME,
-                  dataType = LongType,
-                  nullable = true).toAttribute
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataStructField(field.name, field.dataType).toAttribute
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {

Review Comment:
   Can we introduce a (even internal) method with a name that says what this `foreach` does? Or even a helper function that `foreach` uses for the check. It's going to make comprehension so much easier 🙏 



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -519,6 +519,13 @@ object FileSourceMetadataAttribute {
   def cleanupFileSourceMetadataInformation(attr: Attribute): Attribute =
     attr.withMetadata(removeInternalMetadata(attr.metadata))
 
+  /**
+   * Cleanup the internal metadata information of a struct field, if it is

Review Comment:
   nit: Just "Removes the internal field metadata" would be enough. It's not clear even from the implementation itself that it's important that the `StructField` be of the types mentioned.



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -221,14 +225,23 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false))
 
   /**
-   * Create a file metadata struct column containing fields supported by the given file format.
+   * All fields the file format's _metadata struct defines.

Review Comment:
   Replace with "Supported metadata fields of the given [[FileFormat]]"



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileFormat.scala:
##########
@@ -221,14 +225,23 @@ object FileFormat {
     FileSourceConstantMetadataStructField(FILE_MODIFICATION_TIME, TimestampType, nullable = false))
 
   /**
-   * Create a file metadata struct column containing fields supported by the given file format.
+   * All fields the file format's _metadata struct defines.
    */
-  def createFileMetadataCol(fileFormat: FileFormat): AttributeReference = {
-    val fields = if (fileFormat.isInstanceOf[ParquetFileFormat]) {
-      BASE_METADATA_FIELDS :+ StructField(FileFormat.ROW_INDEX, LongType, nullable = false)
+  def metadataSchemaFields(fileFormat: FileFormat): Seq[StructField] =
+    if (fileFormat.isInstanceOf[ParquetFileFormat]) {
+      BASE_METADATA_FIELDS :+ ROW_INDEX_FIELD
     } else {
       BASE_METADATA_FIELDS
     }
+
+  /**
+   * Create a file metadata struct column containing fields supported by the given file format.

Review Comment:
   nit: Creates...[[FileFormat]]



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -322,18 +329,13 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
 
       // extra Project node: wrap flat metadata columns to a metadata struct
       val withMetadataProjections = metadataStructOpt.map { metadataStruct =>
-        val structColumns = metadataColumns.map { col => col.name match {
-            case FileFormat.FILE_PATH | FileFormat.FILE_NAME | FileFormat.FILE_SIZE |
-                 FileFormat.FILE_BLOCK_START | FileFormat.FILE_BLOCK_LENGTH |
-                 FileFormat.FILE_MODIFICATION_TIME =>
-              col
-            case FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME =>
-              generatedMetadataColumns
-                .find(_.name == FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)
-                // Change the `_tmp_metadata_row_index` to `row_index`,
-                // and also change the nullability to not nullable,
-                // which is consistent with the nullability of `row_index` field
-                .get.withName(FileFormat.ROW_INDEX).withNullability(false)
+        val structColumns = metadataStruct.dataType.asInstanceOf[StructType].fields.map { field =>
+          // Construct the metadata struct the query expects to see, using the columns we previously
+          // created. Be sure to restore the proper name and nullability for each metadata field.
+          metadataColumnsByName(field.name) match {

Review Comment:
   Can we introduce a method `restoreProperNameAndNullability(field: StructField)` for this comment and the code that follows? We could then remove the comment altogether since the method name would say it all. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org