You are viewing a plain text version of this content. The canonical link for it is here.
Posted to reviews@spark.apache.org by "olaky (via GitHub)" <gi...@apache.org> on 2023/03/13 19:16:28 UTC

[GitHub] [spark] olaky commented on a diff in pull request #40321: [SPARK-42704] SubqueryAlias propagates metadata columns that child outputs

olaky commented on code in PR #40321:
URL: https://github.com/apache/spark/pull/40321#discussion_r1134507196


##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -281,6 +281,53 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
     )
   }
 
+  metadataColumnsTest("metadata propagates through projections automatically",
+    schema) { (df, f0, f1) =>
+
+    checkAnswer(
+      df.select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata propagates through subqueries only manually",
+    schema) { (df, f0, f1) =>
+
+    // Metadata columns do not automatically propagate through subqueries.
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select("name").as("s").select("name")
+          .withColumn("m", col("_metadata").getField("file_name"))
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`_metadata`", "proposal" -> "`s`.`name`"))
+
+    // A metadata column manually propagated through a subquery is available.
+    checkAnswer(
+      df.select("name", "_metadata").as("s").select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+
+    // A metadata column manually propagated multiple subqueries is available.
+    checkAnswer(
+      df.select("name", "_metadata").as("s")
+        .select("name", "_metadata").as("t").select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+

Review Comment:
   Maybe one case where this is a node between the subquery and the leaf? So maybe
   
   df.select("name", "_metadata").as("s").filter(...).select("name", "_metadata").as("t").select("name")



##########
sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala:
##########
@@ -1685,14 +1685,13 @@ case class SubqueryAlias(
   }
 
   override def metadataOutput: Seq[Attribute] = {
-    // Propagate metadata columns from leaf nodes through a chain of `SubqueryAlias`.
-    if (child.isInstanceOf[LeafNode] || child.isInstanceOf[SubqueryAlias]) {
-      val qualifierList = identifier.qualifier :+ alias
-      val nonHiddenMetadataOutput = child.metadataOutput.filter(!_.qualifiedAccessOnly)
-      nonHiddenMetadataOutput.map(_.withQualifier(qualifierList))
-    } else {
-      Nil
-    }
+    // Propagate any metadata column that is already in the child's output. Also propagate
+    // non-hidden metadata columns from leaf nodes through a chain of `SubqueryAlias`.
+    val keepChildMetadata = child.isInstanceOf[LeafNode] || child.isInstanceOf[SubqueryAlias]
+    val qualifierList = identifier.qualifier :+ alias
+    child.metadataOutput
+      .filter { a => child.outputSet.contains(a) || (keepChildMetadata && !a.qualifiedAccessOnly) }

Review Comment:
   Thinking out loud: Is it possible that not the attribute, but for example an attribute reference to the attribute is part of the outputSet?



##########
sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala:
##########
@@ -281,6 +281,53 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
     )
   }
 
+  metadataColumnsTest("metadata propagates through projections automatically",
+    schema) { (df, f0, f1) =>
+
+    checkAnswer(
+      df.select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+  }
+
+  metadataColumnsTest("metadata propagates through subqueries only manually",
+    schema) { (df, f0, f1) =>
+
+    // Metadata columns do not automatically propagate through subqueries.
+    checkError(
+      exception = intercept[AnalysisException] {
+        df.select("name").as("s").select("name")
+          .withColumn("m", col("_metadata").getField("file_name"))
+      },
+      errorClass = "UNRESOLVED_COLUMN.WITH_SUGGESTION",
+      parameters = Map("objectName" -> "`_metadata`", "proposal" -> "`s`.`name`"))
+
+    // A metadata column manually propagated through a subquery is available.
+    checkAnswer(
+      df.select("name", "_metadata").as("s").select("name")
+        .withColumn("m", col("_metadata").getField("file_name")),
+      Seq(
+        Row("jack", f0(METADATA_FILE_NAME)),
+        Row("lily", f1(METADATA_FILE_NAME))
+      )
+    )
+
+    // A metadata column manually propagated multiple subqueries is available.

Review Comment:
   ```suggestion
       // A metadata column manually propagated through multiple subqueries is available.
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: reviews-unsubscribe@spark.apache.org
For additional commands, e-mail: reviews-help@spark.apache.org