You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org> on 2023/03/28 13:52:54 UTC

[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40545: [SPARK-42918] Generalize handling of metadata attributes in FileSourceStrategy

ryan-johnson-databricks commented on code in PR #40545:
URL: https://github.com/apache/spark/pull/40545#discussion_r1150634235


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed to readers,
       //  as the values will be null when trying to read the missing column from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection node below.
+      // Also remember the attribute for each column name, so we can easily map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataAttribute(
-                  FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, nullable = true)
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataAttribute(field.name, field.dataType)
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        def createMetadataColumn(field: StructField) = field match {

Review Comment:
   I'm not convinced that introducing this single-use helper improves readability or maintainability?
   ```scala
   def helper(field: StructField) = field match {
     case ...
     case ...
   }
   metadataStruct.dataType.asInstanceOf[StructType].fields.foreach(createMetadataColumn)
   ```
   vs. just inlining the logic:
   ```scala
   metadataStruct.dataType.asInstanceOf[StructType].fields.foreach {
     case ...
     case ...
   }
   ```



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -236,33 +247,42 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
       // For generated metadata columns, they are set as nullable when passed to readers,
       //  as the values will be null when trying to read the missing column from the file.
       //  They are then replaced by the actual values later in the process.
-      // All metadata columns will be non-null in the returned output.
-      // We then change the nullability to non-nullable in the metadata projection node below.
-      val constantMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty
-      val generatedMetadataColumns: mutable.Buffer[Attribute] = mutable.Buffer.empty
+      // We then restore the specified nullability in the metadata projection node below.
+      // Also remember the attribute for each column name, so we can easily map back to it.
+      val constantMetadataColumns = mutable.Buffer.empty[Attribute]
+      val generatedMetadataColumns = mutable.Buffer.empty[Attribute]
+      val metadataColumnsByName = mutable.Map.empty[String, Attribute]
 
       metadataStructOpt.foreach { metadataStruct =>
-        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach { field =>
-          field.name match {
-            case FileFormat.ROW_INDEX =>
-              if ((readDataColumns ++ partitionColumns).map(_.name.toLowerCase(Locale.ROOT))
-                  .contains(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME)) {
-                throw new AnalysisException(FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME +
-                  " is a reserved column name that cannot be read in combination with " +
-                  s"${FileFormat.METADATA_NAME}.${FileFormat.ROW_INDEX} column.")
-              }
-              generatedMetadataColumns +=
-                FileSourceGeneratedMetadataAttribute(
-                  FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME, LongType, nullable = true)
-            case _ =>
-              constantMetadataColumns +=
-                FileSourceConstantMetadataAttribute(field.name, field.dataType)
-          }
+        val schemaColumns = (readDataColumns ++ partitionColumns)
+          .map(_.name.toLowerCase(Locale.ROOT))
+          .toSet
+
+        def createMetadataColumn(field: StructField) = field match {
+          case FileSourceGeneratedMetadataStructField(field, internalName) =>
+            if (schemaColumns.contains(internalName)) {
+              throw new AnalysisException(internalName +
+                s"${internalName} is a reserved column name that cannot be read in combination " +
+                s"with ${FileFormat.METADATA_NAME}.${field.name} column.")
+            }
+
+            // NOTE: Readers require the internal column to be nullable because it's not part of the
+            // file's public schema. The projection below will restore the correct nullability for
+            // the column while constructing the final metadata struct.
+            val attr = field.copy(internalName, nullable = true).toAttribute
+            metadataColumnsByName.put(field.name, attr)
+            generatedMetadataColumns += attr
+
+          case FileSourceConstantMetadataStructField(field) =>
+            val attr = field.toAttribute
+            metadataColumnsByName.put(field.name, attr)
+            constantMetadataColumns += attr
+
+          case field => throw new AnalysisException(s"Unrecognized file metadata field: $field")
         }
-      }
 
-      val metadataColumns: Seq[Attribute] =
-        constantMetadataColumns.toSeq ++ generatedMetadataColumns.toSeq
+        metadataStruct.dataType.asInstanceOf[StructType].fields.foreach(createMetadataColumn)

Review Comment:
   Rather than cast the struct here, would it make sense to directly match on it?
   ```scala
   metadataStructOpt.foreach { case AttributeReference(_, schema: StructType, _, _) =>
       ...
     schema.fields.foreach(...)
   }
   ```



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/namedExpressions.scala:
##########
@@ -501,80 +506,93 @@ object FileSourceMetadataAttribute {
 
   val FILE_SOURCE_METADATA_COL_ATTR_KEY = "__file_source_metadata_col"
 
+  val METADATA: Metadata = new MetadataBuilder()
+    .withMetadata(MetadataAttribute.METADATA)
+    .putBoolean(METADATA_COL_ATTR_KEY, value = true)

Review Comment:
   Isn't this already part of `MetadataAttribute.METADATA`?



##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -282,29 +302,21 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
       // all references will be bound to output attributes which are either
       // [[FileSourceConstantMetadataAttribute]] or [[FileSourceGeneratedMetadataAttribute]] after
       // the flattening from the metadata struct.
-      def rebindFileSourceMetadataAttributesInFilters(
-          filters: Seq[Expression]): Seq[Expression] = {
-        // The row index field attribute got renamed.
-        def newFieldName(name: String) = name match {
-          case FileFormat.ROW_INDEX => FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
-          case other => other
-        }
-
+      def rebindFileSourceMetadataAttributesInFilters(filters: Seq[Expression]): Seq[Expression] =
         filters.map { filter =>
           filter.transform {
             // Replace references to the _metadata column. This will affect references to the column
             // itself but also where fields from the metadata struct are used.
             case MetadataStructColumn(AttributeReference(_, fields @ StructType(_), _, _)) =>

Review Comment:
   aside: `fields @ StructType(_)` is equialent to `fields: StructType`, no?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org