You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "olaky (via GitHub)" <gi...@apache.org> on 2023/03/07 07:19:33 UTC

[GitHub] [spark] olaky commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

olaky commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1127449861


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -244,6 +245,89 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"))
   }
 
+  metadataColumnsTest("df metadataColumn - schema conflict",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    // get the real metadata column (whose name should have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "__metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age", "_METADATA", "_metadata")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - no schema conflict",
+    schema) { (df, f0, f1) =>
+    // get the real metadata column (whose name should _NOT_ have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "_metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, f0(METADATA_FILE_NAME)),
+        Row("lily", 31, f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - column not found", schema) { (df, f0, f1) =>
+    // Not a column at all
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("foo")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`foo`", "proposal" -> "`_metadata`"))
+
+    // Name exists, but does not reference a metadata column
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("name")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`name`", "proposal" -> "`_metadata`"))
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - one",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    checkAnswer(
+      df.select("name", "age", "_metadata", "__metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - several",
+    new StructType()
+      .add(schema("name").copy(name = "_metadata"))
+      .add(schema("age").copy(name = "__metadata"))
+      .add(schema("info").copy(name = "___metadata"))) { (df, f0, f1) =>
+    // the user data has the schema: _metadata, __metadata, ___metadata.id, ___metadata.university
+
+    checkAnswer(
+      df.select("_metadata", "__metadata", "___metadata", "____metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+

Review Comment:
   Question about a use case with aliasing: If the metadata column is aliased, we still have to use the original name in withMetadata (or the other functions). So
   ```
   df.select("my_metdata", _metadata).withMetadata("another_metadata_name", "my_metadata")
   ```
   Is not possible I believe? I think this is ok, it just reflects that withMetadata is immune to renaming, but could be good to document this clearly



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.

Review Comment:
   I had an instance in the debugger where this was not the case yesterday



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.
+    (metadataOutput ++ output).collectFirst {
+      case MetadataAttributeWithLogicalName(attr, logicalName)
+          if conf.resolver(name, logicalName) => attr
+    }.getOrElse {
+      val availableMetadataColumns = (metadataOutput ++ output).collect {
+        case MetadataAttributeWithLogicalName(_, logicalName) => logicalName

Review Comment:
   How about adding MetadataColumn here as well in case we have a metadata column without logical name? Even though this might not be possible right now, it would hurt debuggability if metadata columns are missing from this output



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org