You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/13 04:43:57 UTC

[spark] branch master updated: [SPARK-42331][SQL] Fix metadata col can not been resolved

This is an automated email from the ASF dual-hosted git repository.

wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 5705436f70e [SPARK-42331][SQL] Fix metadata col can not been resolved
5705436f70e is described below

commit 5705436f70e6c6d5a127db7773d3627c8e3d695a
Author: ulysses-you <ul...@gmail.com>
AuthorDate: Mon Feb 13 12:43:33 2023 +0800

    [SPARK-42331][SQL] Fix metadata col can not been resolved
    
    ### What changes were proposed in this pull request?
    
    This pr makes metadata output consistent during analysis by checking the output and reuse these if exists.
    
    This pr also deduplicates the metadata output when merging into the output.
    
    ### Why are the changes needed?
    
    Let's say a process of resolving metadata:
    ```
    Project (_metadata.file_size)
      File (_metadata.file_size > 0)
        Relation
    ```
    
    1. `ResolveReferences` resolves _metadata.file_size for `Filter`
    2. `ResolveReferences` can not resolve _metadata.file_size for `Project`, due to Filter is not resolved (data type does not match)
    3. then `AddMetadataColumns` will merge metadata output into output
    4. the next round of `ResolveReferences` can not resolve _metadata.file_size for `Project` since we filter not the confict names(output already contains the metadata output), see code:
        ```
            def isOutputColumn(col: MetadataColumn): Boolean = {
              outputNames.exists(name => resolve(col.name, name))
            }
            // filter out metadata columns that have names conflicting with output columns. if the table
            // has a column "line" and the table can produce a metadata column called "line", then the
            // data column should be returned, not the metadata column.
            hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
       ```
       And we also can not skip metadata column during filter confict name, otherwise the new generated metadata attribute will have different expr id with previous.
    
    One failed example:
    ```scala
    SELECT _metadata.row_index  FROM t WHERE _metadata.row_index >= 0;
    ```
    
    ### Does this PR introduce _any_ user-facing change?
    
    yes, bug fix
    
    ### How was this patch tested?
    
    add test for v1, v2 and streaming relation
    
    Closes #39870 from ulysses-you/SPARK-42331.
    
    Authored-by: ulysses-you <ul...@gmail.com>
    Signed-off-by: Wenchen Fan <we...@databricks.com>
---
 .../sql/catalyst/plans/logical/LogicalPlan.scala   | 23 ++++++++++++++++++
 .../datasources/v2/DataSourceV2Relation.scala      | 17 ++++---------
 .../execution/datasources/LogicalRelation.scala    | 16 ++++---------
 .../execution/streaming/StreamingRelation.scala    | 18 +++++---------
 .../spark/sql/connector/MetadataColumnSuite.scala  | 13 ++++++++++
 .../datasources/FileMetadataStructSuite.scala      | 28 +++++++++++++++++++++-
 6 files changed, 79 insertions(+), 36 deletions(-)

diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 9a7726f6a03..5a7dcff3667 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -23,6 +23,7 @@ import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.{AliasAwareQueryOutputOrdering, QueryPlan}
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.LogicalPlanStats
 import org.apache.spark.sql.catalyst.trees.{BinaryLike, LeafLike, UnaryLike}
+import org.apache.spark.sql.catalyst.util.MetadataColumnHelper
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.types.{DataType, StructType}
 
@@ -317,5 +318,27 @@ object LogicalPlanIntegrity {
  * A logical plan node that can generate metadata columns
  */
 trait ExposesMetadataColumns extends LogicalPlan {
+  protected def metadataOutputWithOutConflicts(
+      metadataOutput: Seq[AttributeReference]): Seq[AttributeReference] = {
+    // If `metadataColFromOutput` is not empty that means `AddMetadataColumns` merged
+    // metadata output into output. We should still return an available metadata output
+    // so that the rule `ResolveReferences` can resolve metadata column correctly.
+    val metadataColFromOutput = output.filter(_.isMetadataCol)
+    if (metadataColFromOutput.isEmpty) {
+      val resolve = conf.resolver
+      val outputNames = outputSet.map(_.name)
+
+      def isOutputColumn(col: AttributeReference): Boolean = {
+        outputNames.exists(name => resolve(col.name, name))
+      }
+      // filter out the metadata struct column if it has the name conflicting with output columns.
+      // if the file has a column "_metadata",
+      // then the data column should be returned not the metadata struct column
+      metadataOutput.filterNot(isOutputColumn)
+    } else {
+      metadataColFromOutput.asInstanceOf[Seq[AttributeReference]]
+    }
+  }
+
   def withMetadataColumns(): LogicalPlan
 }
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 3db2ec6b8d9..51ef3dda817 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -21,7 +21,7 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, AttributeReference, Expression, SortOrder}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, ExposesMetadataColumns, Histogram, HistogramBin, LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.{truncatedString, CharVarcharUtils}
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, FunctionCatalog, Identifier, SupportsMetadataColumns, Table, TableCapability}
 import org.apache.spark.sql.connector.read.{Scan, Statistics => V2Statistics, SupportsReportStatistics}
 import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -54,15 +54,7 @@ case class DataSourceV2Relation(
 
   override lazy val metadataOutput: Seq[AttributeReference] = table match {
     case hasMeta: SupportsMetadataColumns =>
-      val resolve = conf.resolver
-      val outputNames = outputSet.map(_.name)
-      def isOutputColumn(col: MetadataColumn): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
-      }
-      // filter out metadata columns that have names conflicting with output columns. if the table
-      // has a column "line" and the table can produce a metadata column called "line", then the
-      // data column should be returned, not the metadata column.
-      hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
+      metadataOutputWithOutConflicts(hasMeta.metadataColumns.toAttributes)
     case _ =>
       Nil
   }
@@ -103,8 +95,9 @@ case class DataSourceV2Relation(
   }
 
   def withMetadataColumns(): DataSourceV2Relation = {
-    if (metadataOutput.nonEmpty) {
-      DataSourceV2Relation(table, output ++ metadataOutput, catalog, identifier, options)
+    val newMetadata = metadataOutput.filterNot(outputSet.contains)
+    if (newMetadata.nonEmpty) {
+      DataSourceV2Relation(table, output ++ newMetadata, catalog, identifier, options)
     } else {
       this
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 43699c1b6b1..4176389c58f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -70,21 +70,15 @@ case class LogicalRelation(
 
   override lazy val metadataOutput: Seq[AttributeReference] = relation match {
     case relation: HadoopFsRelation =>
-      val resolve = conf.resolver
-      val outputNames = outputSet.map(_.name)
-      def isOutputColumn(col: AttributeReference): Boolean = {
-        outputNames.exists(name => resolve(col.name, name))
-      }
-      // filter out the metadata struct column if it has the name conflicting with output columns.
-      // if the file has a column "_metadata",
-      // then the data column should be returned not the metadata struct column
-      Seq(FileFormat.createFileMetadataCol(relation.fileFormat)).filterNot(isOutputColumn)
+      metadataOutputWithOutConflicts(
+        Seq(FileFormat.createFileMetadataCol(relation.fileFormat)))
     case _ => Nil
   }
 
   override def withMetadataColumns(): LogicalRelation = {
-    if (metadataOutput.nonEmpty) {
-      this.copy(output = output ++ metadataOutput)
+    val newMetadata = metadataOutput.filterNot(outputSet.contains)
+    if (newMetadata.nonEmpty) {
+      this.copy(output = output ++ newMetadata)
     } else {
       this
     }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
index af90b692a70..50da63489d7 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/StreamingRelation.scala
@@ -62,23 +62,17 @@ case class StreamingRelation(dataSource: DataSource, sourceName: String, output:
     dataSource.providingClass match {
       // If the dataSource provided class is a same or subclass of FileFormat class
       case f if classOf[FileFormat].isAssignableFrom(f) =>
-        val resolve = conf.resolver
-        val outputNames = outputSet.map(_.name)
-        def isOutputColumn(col: AttributeReference): Boolean = {
-          outputNames.exists(name => resolve(col.name, name))
-        }
-        // filter out the metadata struct column if it has the name conflicting with output columns.
-        // if the file has a column "_metadata",
-        // then the data column should be returned not the metadata struct column
-        Seq(FileFormat.createFileMetadataCol(
-          dataSource.providingInstance().asInstanceOf[FileFormat])).filterNot(isOutputColumn)
+        metadataOutputWithOutConflicts(
+          Seq(FileFormat.createFileMetadataCol(
+            dataSource.providingInstance().asInstanceOf[FileFormat])))
       case _ => Nil
     }
   }
 
   override def withMetadataColumns(): LogicalPlan = {
-    if (metadataOutput.nonEmpty) {
-      this.copy(output = output ++ metadataOutput)
+    val newMetadata = metadataOutput.filterNot(outputSet.contains)
+    if (newMetadata.nonEmpty) {
+      this.copy(output = output ++ newMetadata)
     } else {
       this
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
index 4f617bc707b..982f61728d9 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/MetadataColumnSuite.scala
@@ -387,6 +387,19 @@ class MetadataColumnSuite extends DatasourceV2SQLBase {
       }
     }
   }
+
+  test("SPARK-42331: Fix metadata col can not been resolved") {
+    withTable(tbl) {
+      prepareTable()
+
+      checkAnswer(
+        spark.table(tbl).where("index = 0").select("index"),
+        Seq(Row(0), Row(0), Row(0)))
+      checkAnswer(
+        spark.table(tbl).where("index = 0").select("_partition"),
+        Seq(Row("3/1"), Row("0/2"), Row("1/3")))
+    }
+  }
 }
 
 class MetadataTestTable(
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
index 208e401f153..a85857a93a3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/FileMetadataStructSuite.scala
@@ -292,7 +292,7 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
     )
   }
 
-  metadataColumnsTest("filter", schema) { (df, f0, _) =>
+  metadataColumnsTest("filter", schema) { (df, f0, f1) =>
     val filteredDF = df.select("name", "age", METADATA_FILE_NAME)
       .where(Column(METADATA_FILE_NAME) === f0(METADATA_FILE_NAME))
 
@@ -313,6 +313,19 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
         Row("jack", 24, f0(METADATA_FILE_NAME))
       )
     )
+
+    checkAnswer(
+      df.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE),
+      Seq(
+        Row(f0(METADATA_FILE_SIZE)),
+        Row(f1(METADATA_FILE_SIZE)))
+    )
+    checkAnswer(
+      df.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_PATH),
+      Seq(
+        Row(f0(METADATA_FILE_PATH)),
+        Row(f1(METADATA_FILE_PATH)))
+    )
   }
 
   metadataColumnsTest("filter on metadata and user data", schema) { (df, _, f1) =>
@@ -576,6 +589,19 @@ class FileMetadataStructSuite extends QueryTest with SharedSparkSession {
         )
       )
 
+      checkAnswer(
+        newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_SIZE),
+        Seq(
+          Row(sourceFileMetadata(METADATA_FILE_SIZE)),
+          Row(sourceFileMetadata(METADATA_FILE_SIZE)))
+      )
+      checkAnswer(
+        newDF.where(s"$METADATA_FILE_SIZE > 0").select(METADATA_FILE_PATH),
+        Seq(
+          Row(sourceFileMetadata(METADATA_FILE_PATH)),
+          Row(sourceFileMetadata(METADATA_FILE_PATH)))
+      )
+
       // Verify self-union
       val streamQuery1 = streamDf.union(streamDf)
         .writeStream.format("json")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org