You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org> on 2023/03/06 16:05:31 UTC

[GitHub] [spark] ryan-johnson-databricks opened a new pull request, #40300: [SPARK-42683] Automatically rename conflicting metadata columns

ryan-johnson-databricks opened a new pull request, #40300:
URL: https://github.com/apache/spark/pull/40300

   ### What changes were proposed in this pull request?
   
   Today, if a data source already has an output column called `_metadata`, queries cannot access the file-source metadata column that normally carries that name. We can address this conflict with two changes to metadata column handling:
   
   1. Automatically rename any metadata column whose name conflicts with an output column.
   2. Add a way to reliably find metadata columns, even if they were renamed.
   
   In this PR, the name is made unique by prepending underscores to the original name until it no longer conflicts. This improves debuggability of the resulting query plan, because a human can still determine quickly what column it might be. It also gives a potential user surface for accessing the column manually, by adjusting column name in the query to add a predictable number of underscores.
   
   In addition, we define new dataframe methods `metadataColumn` and `withMetadataColumn`, which mirror the existing methods `col` and `withColumn`, but which only work for metadata columns.
   
   ### Why are the changes needed?
   
   Today, it's too easy to lose access to metadata columns if the user's table happened to have the wrong column name. This sharp edge limits the utility of metadata columns in general, because the feature doesn't work reliably for all table schemas.
   
   ### Does this PR introduce _any_ user-facing change?
   
   Suppose we have the following table definition:
   ```sql
   CREATE TABLE has_metadata_conflict(x INTEGER, y INTEGER, _metadata VARCHAR)
   ```
   Then this query would return a string and the file-source metadata column is completely inaccessible:
   ```sql
   SELECT _metadata FROM has_metadata_conflict
   ```
   The metadata column is also not available through the dataframe API, and the example below would return the table's string column:
   ```scala
   df.withColumn("_metadata")
   ```
   
   With the change, the original query still returns a string, but the file-source metadata column can still be found and accessed by invoking `DataSet.withMetadataColumn` or `DataSet.metadataColumn`:
   ```scala
   df.withMetadataColumn("_metadata")
   ```
   
   The renamed metadata column can also be selected manually (as  `__metadata` in this case), if the user prefers to rewrite the query:
   ```sql
   SELECT __metadata FROM has_metadata_conflict
   ```
   
   ### How was this patch tested?
   
   New unit tests added.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1130583100


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   is it easy to fix?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan closed pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan closed pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns
URL: https://github.com/apache/spark/pull/40300


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1151946878


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java:
##########
@@ -48,11 +47,22 @@ public interface SupportsMetadataColumns extends Table {
    * The columns returned by this method may be passed as {@link StructField} in requested
    * projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
    * <p>
-   * If a table column and a metadata column have the same name, the metadata column will never be
-   * requested and is ignored. It is recommended that Table implementations reject data column names
-   * that conflict with metadata column names.
+   * If a table column and a metadata column have the same name, the conflict is resolved by either
+   * renaming or suppressing the metadata column. See {@link canRenameConflictingMetadataColumns}.
    *
    * @return an array of {@link MetadataColumn}
    */
   MetadataColumn[] metadataColumns();
+
+  /**
+   * Determines how this data source handles name conflicts between metadata and data columns.
+   * <p>
+   * If true, spark will automatically rename the metadata column to resolve the conflict; metadata
+   * columns (renamed or not) can be reliably accessed by calling {@link Dataset.metadataColumn}.

Review Comment:
   Updated to reference `MetadataAttributeWithLogicalName`, which hides the implementation details.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1129169029


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.

Review Comment:
   Do we have a test to reproduce it? This should not happen after https://github.com/apache/spark/commit/5705436f70e6c6d5a127db7773d3627c8e3d695a



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] olaky commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "olaky (via GitHub)" <gi...@apache.org>.
olaky commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1134514723


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   Also the DataFrame API replaces columns if they have a conflicting name, so the idea of unique names is in many places and I think choosing a unique name is better than trying to make everything use expression ids 



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -244,6 +245,89 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"))
   }
 
+  metadataColumnsTest("df metadataColumn - schema conflict",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    // get the real metadata column (whose name should have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "__metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age", "_METADATA", "_metadata")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - no schema conflict",
+    schema) { (df, f0, f1) =>
+    // get the real metadata column (whose name should _NOT_ have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "_metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, f0(METADATA_FILE_NAME)),
+        Row("lily", 31, f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - column not found", schema) { (df, f0, f1) =>
+    // Not a column at all
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("foo")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`foo`", "proposal" -> "`_metadata`"))
+
+    // Name exists, but does not reference a metadata column
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("name")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`name`", "proposal" -> "`_metadata`"))
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - one",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    checkAnswer(
+      df.select("name", "age", "_metadata", "__metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - several",
+    new StructType()
+      .add(schema("name").copy(name = "_metadata"))
+      .add(schema("age").copy(name = "__metadata"))
+      .add(schema("info").copy(name = "___metadata"))) { (df, f0, f1) =>
+    // the user data has the schema: _metadata, __metadata, ___metadata.id, ___metadata.university
+
+    checkAnswer(
+      df.select("_metadata", "__metadata", "___metadata", "____metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+

Review Comment:
   Makes sense



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.
+    (metadataOutput ++ output).collectFirst {
+      case MetadataAttributeWithLogicalName(attr, logicalName)
+          if conf.resolver(name, logicalName) => attr
+    }.getOrElse {
+      val availableMetadataColumns = (metadataOutput ++ output).collect {
+        case MetadataAttributeWithLogicalName(_, logicalName) => logicalName

Review Comment:
   Comment is good, I just missed it



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1127307264


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2714,6 +2726,17 @@ class Dataset[T] private[sql](
    */
   def withColumn(colName: String, col: Column): DataFrame = withColumns(Seq(colName), Seq(col))
 
+  /**
+   * Returns a new Dataset by selecting a metadata column with the given logical name.
+   *
+   * A metadata column can be accessed this way even if the underlying data source defines a data
+   * column with a conflicting name.
+   *
+   * @group untypedrel
+   * @since 4.0.0
+   */
+  def withMetadataColumn(colName: String): DataFrame = withColumn(colName, metadataColumn(colName))

Review Comment:
   people can just do `df.withColumn(name, df.metadataColumn(col))`, right?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1149997056


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   > ... simply won't gain the ability to rename metadata columns on conflict.
   
   is it true? The rename happens at the Spark side, when Spark append metadata columns to the normal output columns.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1149407329


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   As far as I can tell, v2 intentionally puts the burden of pruning (and of metadata handling) on the datasource. So every datasource _does_ have to implement the policy it wants to use. 
   
   But that's not necessarily a bad thing -- a datasource that chooses not to implement this change simply won't gain the ability to rename metadata columns on conflict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1151163428


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   Update: I made the renaming opt-in for dsv2 -- by default they fully keep existing behavior, but they can override `canRenameConflictingMetadataColumns` if they support automatic renaming.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1129824136


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   The immediate issue is that node types implementing `ExposesMetadataColumns` all use its `metadataOutputWithOutConflicts` method to ensure that names metadata columns in `metadataOutput` do not overlap with names of non-metadata columns in `output`. Today's code simply removes conflicting metadata attributes; this PR renames them instead. 
   
   We could potentially explore removing that method entirely, but I don't know what side effects that might have?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on PR #40300:
URL: https://github.com/apache/spark/pull/40300#issuecomment-1463005688

   > It's a good idea to provide an API that allows people to unambiguously reference metadata columns, and I like the new `Dataset.metadataColumn` function. However, I think the prepending underscore approach is a bit hacky. It's too implicit and I'd prefer a more explicit syntax like `SELECT metadata(_metadata) FROM t`. We can discuss this more and invite more SQL experts. Shall we exclude it from this PR for now?
   
   @cloud-fan The prepended underscore is _NOT_ primarily intended as a user surface. Rather, it's a reliale way to get a unique column name that's still at least somewhat readable if you look at the query plan (unlike e.g. a uuid). The new `Dataset.metadataColumn` method does not even _look_ at a renamed attribute's name, for example.
   
   At this point, the only reference in the code to prepended underscores is the two unit tests ("metadata name conflict resolved with leading underscores") that try to validate that the renaming works as intended. If you don't think the test coverage is important, we could remove even that?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1150601666


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   Today, if a dsv2 has a metadata column name conflict, the column is not selectable at all and attempts to do so will return the data column instead. The docs for `SupportsMetadataColumns` trait say:
   >   * If a table column and a metadata column have the same name, the metadata column will never be
   >   * requested and is ignored. It is recommended that Table implementations reject data column names
   >   * that conflict with metadata column names.
   
   After the changes in this PR:
   * Implementations that follow the above advice won't need renaming in the first place
   * Implementationt which do not follow the advice will continue to return the data column as before
   * Users who attempt to call `df.metadataColumn` on a dsv2 that allows conflicts but doesn't support renaming yet would hit an analysis error.



##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   Today, if a dsv2 has a metadata column name conflict, the column is not selectable at all and attempts to do so will return the data column instead. The docs for `SupportsMetadataColumns` trait say:
   >   * If a table column and a metadata column have the same name, the metadata column will never be
   >   * requested and is ignored. It is recommended that Table implementations reject data column names
   >   * that conflict with metadata column names.
   
   After the changes in this PR:
   * Implementations that follow the above advice won't need renaming in the first place
   * Implementations which do not follow the advice will continue to return the data column as before
   * Users who attempt to call `df.metadataColumn` on a dsv2 that allows conflicts but doesn't support renaming yet would hit an analysis error.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1151495057


##########
sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/FileSourceStrategy.scala:
##########
@@ -295,8 +297,9 @@ object FileSourceStrategy extends Strategy with PredicateHelper with Logging {
             // Replace references to the _metadata column. This will affect references to the column
             // itself but also where fields from the metadata struct are used.
             case MetadataStructColumn(AttributeReference(_, fields @ StructType(_), _, _)) =>
-              CreateStruct(fields.map(
-                field => metadataColumns.find(attr => attr.name == newFieldName(field.name)).get))
+              val reboundFields = fields.map(

Review Comment:
   unnecessary change?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] viirya commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "viirya (via GitHub)" <gi...@apache.org>.
viirya commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1130515459


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   Looks like `getProjection` wrongly matches attribute. It tries to get pruned datatype from `schema` by looking with field name. `StructType` maintains the mapping between (field name -> StructField), not (expr id -> StructField). If there are duplicate columns, it possibly picks incorrect attribute.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1131780862


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -244,6 +245,89 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"))
   }
 
+  metadataColumnsTest("df metadataColumn - schema conflict",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    // get the real metadata column (whose name should have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "__metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age", "_METADATA", "_metadata")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - no schema conflict",
+    schema) { (df, f0, f1) =>
+    // get the real metadata column (whose name should _NOT_ have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "_metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, f0(METADATA_FILE_NAME)),
+        Row("lily", 31, f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - column not found", schema) { (df, f0, f1) =>
+    // Not a column at all
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("foo")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`foo`", "proposal" -> "`_metadata`"))
+
+    // Name exists, but does not reference a metadata column
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("name")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`name`", "proposal" -> "`_metadata`"))
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - one",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    checkAnswer(
+      df.select("name", "age", "_metadata", "__metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - several",
+    new StructType()
+      .add(schema("name").copy(name = "_metadata"))
+      .add(schema("age").copy(name = "__metadata"))
+      .add(schema("info").copy(name = "___metadata"))) { (df, f0, f1) =>
+    // the user data has the schema: _metadata, __metadata, ___metadata.id, ___metadata.university
+
+    checkAnswer(
+      df.select("_metadata", "__metadata", "___metadata", "____metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+

Review Comment:
   Went ahead and added a test case.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1149336751


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   Indeed the latter -- that's why this code needs search for metadata columns by their logical names, and also why `createReaderFactory` needs to map them back to their logical names (because the internal code only knows about the logical names).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1127830262


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.

Review Comment:
   I originally added it because I was also hitting cases where `metadataOutput` did not contain the metadata column.



##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2714,6 +2726,17 @@ class Dataset[T] private[sql](
    */
   def withColumn(colName: String, col: Column): DataFrame = withColumns(Seq(colName), Seq(col))
 
+  /**
+   * Returns a new Dataset by selecting a metadata column with the given logical name.
+   *
+   * A metadata column can be accessed this way even if the underlying data source defines a data
+   * column with a conflicting name.
+   *
+   * @group untypedrel
+   * @since 4.0.0
+   */
+  def withMetadataColumn(colName: String): DataFrame = withColumn(colName, metadataColumn(colName))

Review Comment:
   > people can just do `df.withColumn(name, df.metadataColumn(colName))`, right?
   
   Yes. Fair point. 
   
   It does lead to possible weirdness, in case the column obtained by `df.metadataColumn` is added to some other DF, e.g.
   ```scala
   df.select(...).as("view").withColumn("m", df.metadataColumn("_metadata"))
   ```
   ...but the same hazard exists for normal columns already, so it's probably fine?
   ```scala
   df.select(...).as("view").withColumn("m", df.col("other"))
   ```
   



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.

Review Comment:
   `SubqueryAlias` does not always propagate its child's `metadataOutput`, even if the child's own `output` includes metadata columns. That can result in a query plan whose `output` contains attributes which satisfy `MetadataAttribute` but are not in `metadataOutput`. 
   
   I raised https://github.com/apache/spark/pull/40321 as a possible fix for that issue.



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.
+    (metadataOutput ++ output).collectFirst {
+      case MetadataAttributeWithLogicalName(attr, logicalName)
+          if conf.resolver(name, logicalName) => attr
+    }.getOrElse {
+      val availableMetadataColumns = (metadataOutput ++ output).collect {
+        case MetadataAttributeWithLogicalName(_, logicalName) => logicalName

Review Comment:
   [MetadataAttributeWithLogicalName.unapply](https://github.com/apache/spark/pull/40300/files/8b5c9bf7ef1c465d6b7bd17a24f3750a5ce727f8#diff-cf96171d13fd77e670764766ae22afafbc4a396316bd758a89b60a6fe70d5b0dR517) already checks all metadata attributes, because a metadata column's physical is also its logical name unless/until renamed. Otherwise, every site that uses the matcher would also have to check for not-renamed columns, which is error-prone and redundant.
   
   The doc comment for the matcher tries to make that clear?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -244,6 +245,89 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"))
   }
 
+  metadataColumnsTest("df metadataColumn - schema conflict",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    // get the real metadata column (whose name should have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "__metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age", "_METADATA", "_metadata")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - no schema conflict",
+    schema) { (df, f0, f1) =>
+    // get the real metadata column (whose name should _NOT_ have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "_metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, f0(METADATA_FILE_NAME)),
+        Row("lily", 31, f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - column not found", schema) { (df, f0, f1) =>
+    // Not a column at all
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("foo")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`foo`", "proposal" -> "`_metadata`"))
+
+    // Name exists, but does not reference a metadata column
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("name")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`name`", "proposal" -> "`_metadata`"))
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - one",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    checkAnswer(
+      df.select("name", "age", "_metadata", "__metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - several",
+    new StructType()
+      .add(schema("name").copy(name = "_metadata"))
+      .add(schema("age").copy(name = "__metadata"))
+      .add(schema("info").copy(name = "___metadata"))) { (df, f0, f1) =>
+    // the user data has the schema: _metadata, __metadata, ___metadata.id, ___metadata.university
+
+    checkAnswer(
+      df.select("_metadata", "__metadata", "___metadata", "____metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+

Review Comment:
   Where/how should I document it? (easy enough to add a test case, for example?)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1129916870


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   A quick local test of the change, with this query over a table with conflicting user schema:
   ```scala
   df.select("_metadata")
     .withColumn("file_name", df.metadataColumn("_metadata").getField("file_name"))
   ```
   
   ... produces the expected logical plan (names conflict but `exprId` differs):
   ```
   Project [_metadata#20, _metadata#24.file_name AS file_name#26]
   +- Project [_metadata#20, _metadata#24]
      +- Relation [name#18,age#19,_METADATA#20,_metadata#24] parquet
   ```
   
   ... but query optimization fails with: 
   ```
   [info]   java.lang.IllegalArgumentException: file_name does not exist. Available:
   [info]   at org.apache.spark.sql.types.StructType.$anonfun$fieldIndex$1(StructType.scala:313)
   [info]   at scala.collection.immutable.Map$EmptyMap$.getOrElse(Map.scala:110)
   [info]   at org.apache.spark.sql.types.StructType.fieldIndex(StructType.scala:312)
     ...
   [info]   at org.apache.spark.sql.catalyst.expressions.ProjectionOverSchema.getProjection(ProjectionOverSchema.scala:71)
     ...
   [info]   at org.apache.spark.sql.execution.datasources.SchemaPruning$.$anonfun$buildNewProjection$3(SchemaPruning.scala:154)
     ...
   [info]   at org.apache.spark.sql.execution.datasources.SchemaPruning$.buildNewProjection(SchemaPruning.scala:154)
   [info]   at org.apache.spark.sql.execution.datasources.SchemaPruning$.org$apache$spark$sql$execution$datasources$SchemaPruning$$prunePhysicalColumns(SchemaPruning.scala:99)
     ...
   [info]   at org.apache.spark.sql.execution.datasources.SchemaPruning$.apply(SchemaPruning.scala:41)
     ...
   [info]   at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
     ...
   [info]   at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:139)
     ...
   [info]   at org.apache.spark.sql.QueryTest.checkAnswer(QueryTest.scala:148)
   [info]   at org.apache.spark.sql.execution.datasources.FileMetadataStructSuite.$anonfun$new$13(FileMetadataStructSuite.scala:255)
   ```
   So it appears that at least one optimizer rule fails to honor expression ids, and assumes the column names are unique?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1131748163


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   I don't think there's anything in `StructType` that could track expr id (and IMO it shouldn't be trying to do so -- different levels of abstraction). 
   
   It really looks like `StructType` doesn't allow duplicate names. The class is full of code that assumes uniqueness, often leveraging these private members: 
   ```scala
     private lazy val fieldNamesSet: Set[String] = fieldNames.toSet
     private lazy val nameToField: Map[String, StructField] = fields.map(f => f.name -> f).toMap
     private lazy val nameToIndex: Map[String, Int] = Utils.toMapWithIndex(fieldNames)
   ```
   The `fieldNamesSet` goes all the way back to 2015...



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1149347950


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   Shall we move this handling to `V2ScanRelationPushDown`? in memory table is just a testing v2 source and we can't expect all existing v2 sources to make the same change like here.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1131780602


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2714,6 +2726,17 @@ class Dataset[T] private[sql](
    */
   def withColumn(colName: String, col: Column): DataFrame = withColumns(Seq(colName), Seq(col))
 
+  /**
+   * Returns a new Dataset by selecting a metadata column with the given logical name.
+   *
+   * A metadata column can be accessed this way even if the underlying data source defines a data
+   * column with a conflicting name.
+   *
+   * @group untypedrel
+   * @since 4.0.0
+   */
+  def withMetadataColumn(colName: String): DataFrame = withColumn(colName, metadataColumn(colName))

Review Comment:
   Removed `withMetadataColumn` method for now. Nothing stops us from adding it in the future if it's needed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1130417773


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   @viirya this seems like a bug in nested column pruning



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] sigmod commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "sigmod (via GitHub)" <gi...@apache.org>.
sigmod commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1130602165


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   > If there are duplicate columns, it possibly picks incorrect attribute.
   
   Is it possible to keep both in the requested schema?  
   
   cc @cashmand 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1149407329


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   As far as I can tell, v2 intentionally puts the burden of pruning (and of metadata handling) on the datasource. So by design, every datasource _does_ have to implement the policy it wants to use. 
   
   But that's not necessarily a bad thing -- a datasource that chooses not to implement this change simply won't gain the ability to rename metadata columns on conflict.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1129167837


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -340,13 +358,26 @@ trait ExposesMetadataColumns extends LogicalPlan {
       val resolve = conf.resolver
       val outputNames = outputSet.map(_.name)
 
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
+      // Generate a unique name by prepending underscores.
+      @scala.annotation.tailrec
+      def makeUnique(name: String): String = name match {
+        case name if outputNames.exists(resolve(_, name)) => makeUnique(s"_$name")
+        case name => name
+      }
+
+      // Rename metadata struct columns whose names conflict with output columns.

Review Comment:
   Name is not the id of a column, e.g. people can do `SELECT 1 a, 2 a`. I don't quite understand why we need to rename here. What we need is to resolve `_metadata` to the actual metadata `AttributeReference`. At the end Spark will bind `AttributeReference` correctly by looking at `exprId`.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40300:
URL: https://github.com/apache/spark/pull/40300#issuecomment-1485216976

   about https://github.com/apache/spark/pull/40300/files#r1129818813 , I think if `SubqueryAlias` can't propagate metadata columns, then `df.metadataColumn` should not be able to get the column, what do you think? @ryan-johnson-databricks 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] ryan-johnson-databricks commented on pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "ryan-johnson-databricks (via GitHub)" <gi...@apache.org>.
ryan-johnson-databricks commented on PR #40300:
URL: https://github.com/apache/spark/pull/40300#issuecomment-1487610774

   > about https://github.com/apache/spark/pull/40300/files#r1129818813 , I think if `SubqueryAlias` can't propagate metadata columns, then `df.metadataColumn` should not be able to get the column, what do you think? @ryan-johnson-databricks
   
   IMO changing the behavior of `SubqueryAlias` (and other specific plan node types) is out of scope for this PR -- this PR does not change that existing situation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1151490309


##########
sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java:
##########
@@ -48,11 +47,22 @@ public interface SupportsMetadataColumns extends Table {
    * The columns returned by this method may be passed as {@link StructField} in requested
    * projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
    * <p>
-   * If a table column and a metadata column have the same name, the metadata column will never be
-   * requested and is ignored. It is recommended that Table implementations reject data column names
-   * that conflict with metadata column names.
+   * If a table column and a metadata column have the same name, the conflict is resolved by either
+   * renaming or suppressing the metadata column. See {@link canRenameConflictingMetadataColumns}.
    *
    * @return an array of {@link MetadataColumn}
    */
   MetadataColumn[] metadataColumns();
+
+  /**
+   * Determines how this data source handles name conflicts between metadata and data columns.
+   * <p>
+   * If true, spark will automatically rename the metadata column to resolve the conflict; metadata
+   * columns (renamed or not) can be reliably accessed by calling {@link Dataset.metadataColumn}.

Review Comment:
   Let's also mention what the data source developers should do
   ```
   Data Source implementations can get the original metadata column name by
   accessing a special metadata key from `StructField`: "__metadata_col"
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1149329725


##########
sql/catalyst/src/test/scala/org/apache/spark/sql/connector/catalog/InMemoryBaseTable.scala:
##########
@@ -297,8 +297,14 @@ abstract class InMemoryBaseTable(
       InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema, tableSchema)
 
     override def pruneColumns(requiredSchema: StructType): Unit = {
-      val schemaNames = metadataColumnNames ++ tableSchema.map(_.name)
-      schema = StructType(requiredSchema.filter(f => schemaNames.contains(f.name)))
+      // The required schema could contain conflict-renamed metadata columns, so we need to match
+      // them by their logical (original) names, not their current names.
+      val schemaNames = tableSchema.map(_.name).toSet
+      val prunedFields = requiredSchema.filter {

Review Comment:
   does `requiredSchema` contain the logical name or physical name? I think it's the latter as `V2ScanRelationPushDown` does not handle metadata column name mapping.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40300:
URL: https://github.com/apache/spark/pull/40300#issuecomment-1489959075

   thanks, merging to master!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] olaky commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "olaky (via GitHub)" <gi...@apache.org>.
olaky commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1127449861


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -244,6 +245,89 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
       parameters = Map("fieldName" -> "`file_name`", "fields" -> "`id`, `university`"))
   }
 
+  metadataColumnsTest("df metadataColumn - schema conflict",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    // get the real metadata column (whose name should have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "__metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age", "_METADATA", "_metadata")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - no schema conflict",
+    schema) { (df, f0, f1) =>
+    // get the real metadata column (whose name should _NOT_ have been adjusted)
+    val metadataColumn = df.metadataColumn("_metadata")
+    assert(metadataColumn.expr.asInstanceOf[NamedExpression].name == "_metadata")
+
+    // select user data
+    checkAnswer(
+      df.select("name", "age")
+        .withColumn("file_name", metadataColumn.getField("file_name")),
+      Seq(
+        Row("jack", 24, f0(METADATA_FILE_NAME)),
+        Row("lily", 31, f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("df metadataColumn - column not found", schema) { (df, f0, f1) =>
+    // Not a column at all
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("foo")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`foo`", "proposal" -> "`_metadata`"))
+
+    // Name exists, but does not reference a metadata column
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.withMetadataColumn("name")
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`name`", "proposal" -> "`_metadata`"))
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - one",
+    schemaWithNameConflicts) { (df, f0, f1) =>
+    // the user data has the schema: name, age, _metadata.id, _metadata.university
+
+    checkAnswer(
+      df.select("name", "age", "_metadata", "__metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata name conflict resolved with leading underscores - several",
+    new StructType()
+      .add(schema("name").copy(name = "_metadata"))
+      .add(schema("age").copy(name = "__metadata"))
+      .add(schema("info").copy(name = "___metadata"))) { (df, f0, f1) =>
+    // the user data has the schema: _metadata, __metadata, ___metadata.id, ___metadata.university
+
+    checkAnswer(
+      df.select("_metadata", "__metadata", "___metadata", "____metadata.file_name"),
+      Seq(
+        Row("jack", 24, Row(12345L, "uom"), f0(METADATA_FILE_NAME)),
+        Row("lily", 31, Row(54321L, "ucb"), f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+

Review Comment:
   Question about a use case with aliasing: If the metadata column is aliased, we still have to use the original name in withMetadata (or the other functions). So
   ```
   df.select("my_metdata", _metadata).withMetadata("another_metadata_name", "my_metadata")
   ```
   Is not possible I believe? I think this is ok, it just reflects that withMetadata is immune to renaming, but could be good to document this clearly



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.

Review Comment:
   I had an instance in the debugger where this was not the case yesterday



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.
+    (metadataOutput ++ output).collectFirst {
+      case MetadataAttributeWithLogicalName(attr, logicalName)
+          if conf.resolver(name, logicalName) => attr
+    }.getOrElse {
+      val availableMetadataColumns = (metadataOutput ++ output).collect {
+        case MetadataAttributeWithLogicalName(_, logicalName) => logicalName

Review Comment:
   How about adding MetadataColumn here as well in case we have a metadata column without logical name? Even though this might not be possible right now, it would hurt debuggability if metadata columns are missing from this output



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1127307622


##########
sql/core/src/main/scala/org/apache/spark/sql/Dataset.scala:
##########
@@ -2714,6 +2726,17 @@ class Dataset[T] private[sql](
    */
   def withColumn(colName: String, col: Column): DataFrame = withColumns(Seq(colName), Seq(col))
 
+  /**
+   * Returns a new Dataset by selecting a metadata column with the given logical name.
+   *
+   * A metadata column can be accessed this way even if the underlying data source defines a data
+   * column with a conflicting name.
+   *
+   * @group untypedrel
+   * @since 4.0.0
+   */
+  def withMetadataColumn(colName: String): DataFrame = withColumn(colName, metadataColumn(colName))

Review Comment:
   This is a weird API as there is no `withColumn` function that takes only a string parameter.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on PR #40300:
URL: https://github.com/apache/spark/pull/40300#issuecomment-1457500143

   It's a good idea to provide an API that allows people to unambiguously reference metadata columns, and I like the new `Dataset.metadataColumn` function. However, I think the prepending underscore approach is a bit hacky. It's too implicit and I'd prefer a more explicit syntax like `SELECT metadata(_metadata) FROM t`. We can discuss this more and invite more SQL experts. Shall we exclude it from this PR for now?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org


[GitHub] [spark] cloud-fan commented on a diff in pull request #40300: [SPARK-42683] Automatically rename conflicting metadata columns

Posted by "cloud-fan (via GitHub)" <gi...@apache.org>.
cloud-fan commented on code in PR #40300:
URL: https://github.com/apache/spark/pull/40300#discussion_r1127321842


##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala:
##########
@@ -42,6 +42,24 @@ abstract class LogicalPlan
    */
   def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
 
+  /**
+   * Finds a metadata attribute of this node by its logical name. This search will work even if the
+   * metadata attribute was renamed because of a conflicting name in the data schema.
+   */
+  def getMetadataAttributeByName(name: String): Attribute = {
+    // NOTE: An already-referenced column might appear in `output` instead of `metadataOutput`.

Review Comment:
   metadata col may appear in `output`, but always appear in `metadataOutput`. I think `outputMetadataAttributes.resolve(nameParts, resolver)` should do the work.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org