You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2020/11/18 22:09:08 UTC

[spark] branch master updated: [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2

This is an automated email from the ASF dual-hosted git repository.

brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 1df69f7  [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2
1df69f7 is described below

commit 1df69f7e324aa799c05f6158e433371c5eeed8ce
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Wed Nov 18 14:07:51 2020 -0800

    [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2
    
    ### What changes were proposed in this pull request?
    
    This adds support for metadata columns to DataSourceV2. If a source implements `SupportsMetadataColumns` it must also implement `SupportsPushDownRequiredColumns` to support projecting those columns.
    
    The analyzer is updated to resolve metadata columns from `LogicalPlan.metadataOutput`, and this adds a rule that will add metadata columns to the output of `DataSourceV2Relation` if one is used.
    
    ### Why are the changes needed?
    
    This is the solution discussed for exposing additional data in the Kafka source. It is also needed for a generic `MERGE INTO` plan.
    
    ### Does this PR introduce any user-facing change?
    
    Yes. Users can project additional columns from sources that implement the new API. This also updates `DescribeTableExec` to show metadata columns.
    
    ### How was this patch tested?
    
    Will include new unit tests.
    
    Closes #28027 from rdblue/add-dsv2-metadata-columns.
    
    Authored-by: Ryan Blue <bl...@apache.org>
    Signed-off-by: Burak Yavuz <br...@gmail.com>
---
 .../sql/connector/catalog/MetadataColumn.java      | 58 +++++++++++++++++
 .../connector/catalog/SupportsMetadataColumns.java | 37 +++++++++++
 .../spark/sql/catalyst/analysis/Analyzer.scala     | 24 +++++++
 .../sql/catalyst/plans/logical/LogicalPlan.scala   |  6 +-
 .../plans/logical/basicLogicalOperators.scala      |  6 ++
 .../datasources/v2/DataSourceV2Implicits.scala     | 16 ++++-
 .../datasources/v2/DataSourceV2Relation.scala      | 26 +++++++-
 .../apache/spark/sql/connector/InMemoryTable.scala | 74 ++++++++++++++++++----
 .../datasources/v2/DescribeTableExec.scala         | 16 ++++-
 .../execution/datasources/v2/PushDownUtils.scala   | 12 ++--
 .../spark/sql/connector/DataSourceV2SQLSuite.scala | 43 +++++++++++++
 11 files changed, 296 insertions(+), 22 deletions(-)

diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java
new file mode 100644
index 0000000..8aefa28
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java
@@ -0,0 +1,58 @@
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Interface for a metadata column.
+ * <p>
+ * A metadata column can expose additional metadata about a row. For example, rows from Kafka can
+ * use metadata columns to expose a message's topic, partition number, and offset.
+ * <p>
+ * A metadata column could also be the result of a transform applied to a value in the row. For
+ * example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In
+ * this case, {@link #transform()} should return a non-null {@link Transform} that produced the
+ * metadata column's values.
+ */
+@Evolving
+public interface MetadataColumn {
+  /**
+   * The name of this metadata column.
+   *
+   * @return a String name
+   */
+  String name();
+
+  /**
+   * The data type of values in this metadata column.
+   *
+   * @return a {@link DataType}
+   */
+  DataType dataType();
+
+  /**
+   * @return whether values produced by this metadata column may be null
+   */
+  default boolean isNullable() {
+    return true;
+  }
+
+  /**
+   * Documentation for this metadata column, or null.
+   *
+   * @return a documentation String
+   */
+  default String comment() {
+    return null;
+  }
+
+  /**
+   * The {@link Transform} used to produce this metadata column from data rows, or null.
+   *
+   * @return a {@link Transform} used to produce the column's values, or null if there isn't one
+   */
+  default Transform transform() {
+    return null;
+  }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java
new file mode 100644
index 0000000..fc31349
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java
@@ -0,0 +1,37 @@
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface for exposing data columns for a table that are not in the table schema. For example,
+ * a file source could expose a "file" column that contains the path of the file that contained each
+ * row.
+ * <p>
+ * The columns returned by {@link #metadataColumns()} may be passed as {@link StructField} in
+ * requested projections. Sources that implement this interface and column projection using
+ * {@link SupportsPushDownRequiredColumns} must accept metadata fields passed to
+ * {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
+ * <p>
+ * If a table column and a metadata column have the same name, the metadata column will never be
+ * requested. It is recommended that Table implementations reject data column name that conflict
+ * with metadata column names.
+ */
+@Evolving
+public interface SupportsMetadataColumns extends Table {
+  /**
+   * Metadata columns that are supported by this {@link Table}.
+   * <p>
+   * The columns returned by this method may be passed as {@link StructField} in requested
+   * projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
+   * <p>
+   * If a table column and a metadata column have the same name, the metadata column will never be
+   * requested and is ignored. It is recommended that Table implementations reject data column names
+   * that conflict with metadata column names.
+   *
+   * @return an array of {@link MetadataColumn}
+   */
+  MetadataColumn[] metadataColumns();
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 14b50f4..8d95d8c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -221,6 +221,7 @@ class Analyzer(override val catalogManager: CatalogManager)
       ResolveRelations ::
       ResolveTables ::
       ResolvePartitionSpec ::
+      AddMetadataColumns ::
       ResolveReferences ::
       ResolveCreateNamedStruct ::
       ResolveDeserializer ::
@@ -917,6 +918,29 @@ class Analyzer(override val catalogManager: CatalogManager)
   }
 
   /**
+   * Adds metadata columns to output for child relations when nodes are missing resolved attributes.
+   *
+   * References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
+   * but the relation's output does not include the metadata columns until the relation is replaced
+   * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
+   * relation's output, the analyzer will detect that nothing produces the columns.
+   *
+   * This rule only adds metadata columns when a node is resolved but is missing input from its
+   * children. This ensures that metadata columns are not added to the plan unless they are used. By
+   * checking only resolved nodes, this ensures that * expansion is already done so that metadata
+   * columns are not accidentally selected by *.
+   */
+  object AddMetadataColumns extends Rule[LogicalPlan] {
+    def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+      case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty =>
+        node resolveOperatorsUp {
+          case rel: DataSourceV2Relation =>
+            rel.withMetadataColumns()
+        }
+    }
+  }
+
+  /**
    * Resolve table relations with concrete relations from v2 catalog.
    *
    * [[ResolveRelations]] still resolves v1 tables.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 48dfc5fd..ad5c3fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -33,6 +33,9 @@ abstract class LogicalPlan
   with QueryPlanConstraints
   with Logging {
 
+  /** Metadata fields that can be projected from this node */
+  def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
+
   /** Returns true if this subtree has data from a streaming data source. */
   def isStreaming: Boolean = children.exists(_.isStreaming)
 
@@ -86,7 +89,8 @@ abstract class LogicalPlan
     }
   }
 
-  private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output))
+  private[this] lazy val childAttributes =
+    AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput))
 
   private[this] lazy val outputAttributes = AttributeSeq(output)
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 17bf704..4e7923b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -886,6 +886,12 @@ case class SubqueryAlias(
     val qualifierList = identifier.qualifier :+ alias
     child.output.map(_.withQualifier(qualifierList))
   }
+
+  override def metadataOutput: Seq[Attribute] = {
+    val qualifierList = identifier.qualifier :+ alias
+    child.metadataOutput.map(_.withQualifier(qualifierList))
+  }
+
   override def doCanonicalize(): LogicalPlan = child.canonicalized
 }
 
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index dfacf6e..8d91ea7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -21,7 +21,9 @@ import scala.collection.JavaConverters._
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
-import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.types.{StructField, StructType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
 object DataSourceV2Implicits {
@@ -78,6 +80,18 @@ object DataSourceV2Implicits {
     def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
   }
 
+  implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) {
+    def asStruct: StructType = {
+      val fields = metadata.map { metaCol =>
+        val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable)
+        Option(metaCol.comment).map(field.withComment).getOrElse(field)
+      }
+      StructType(fields)
+    }
+
+    def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
+  }
+
   implicit class OptionsHelper(options: Map[String, String]) {
     def asOptions: CaseInsensitiveStringMap = {
       new CaseInsensitiveStringMap(options.asJava)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 45d8949..b09ccff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -21,10 +21,11 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
 import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
 import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2Statistics, SupportsReportStatistics}
 import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
 import org.apache.spark.sql.connector.write.WriteBuilder
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 import org.apache.spark.util.Utils
 
@@ -48,6 +49,21 @@ case class DataSourceV2Relation(
 
   import DataSourceV2Implicits._
 
+  override lazy val metadataOutput: Seq[AttributeReference] = table match {
+    case hasMeta: SupportsMetadataColumns =>
+      val resolve = SQLConf.get.resolver
+      val outputNames = outputSet.map(_.name)
+      def isOutputColumn(col: MetadataColumn): Boolean = {
+        outputNames.exists(name => resolve(col.name, name))
+      }
+      // filter out metadata columns that have names conflicting with output columns. if the table
+      // has a column "line" and the table can produce a metadata column called "line", then the
+      // data column should be returned, not the metadata column.
+      hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
+    case _ =>
+      Nil
+  }
+
   override def name: String = table.name()
 
   override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
@@ -78,6 +94,14 @@ case class DataSourceV2Relation(
   override def newInstance(): DataSourceV2Relation = {
     copy(output = output.map(_.newInstance()))
   }
+
+  def withMetadataColumns(): DataSourceV2Relation = {
+    if (metadataOutput.nonEmpty) {
+      DataSourceV2Relation(table, output ++ metadataOutput, catalog, identifier, options)
+    } else {
+      this
+    }
+  }
 }
 
 /**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index b032560..3b47271 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable
 import org.scalatest.Assertions._
 
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.connector.catalog._
 import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, HoursTransform, IdentityTransform, MonthsTransform, Transform, YearsTransform}
@@ -34,8 +35,9 @@ import org.apache.spark.sql.connector.read._
 import org.apache.spark.sql.connector.write._
 import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
 import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
-import org.apache.spark.sql.types.{DataType, DateType, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType}
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * A simple in-memory table. Rows are stored as a buffered group produced by each output task.
@@ -45,7 +47,24 @@ class InMemoryTable(
     val schema: StructType,
     override val partitioning: Array[Transform],
     override val properties: util.Map[String, String])
-  extends Table with SupportsRead with SupportsWrite with SupportsDelete {
+  extends Table with SupportsRead with SupportsWrite with SupportsDelete
+      with SupportsMetadataColumns {
+
+  private object PartitionKeyColumn extends MetadataColumn {
+    override def name: String = "_partition"
+    override def dataType: DataType = StringType
+    override def comment: String = "Partition key used to store the row"
+  }
+
+  private object IndexColumn extends MetadataColumn {
+    override def name: String = "index"
+    override def dataType: DataType = StringType
+    override def comment: String = "Metadata column used to conflict with a data column"
+  }
+
+  // purposely exposes a metadata column that conflicts with a data column in some tests
+  override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
+  private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)
 
   private val allowUnsupportedTransforms =
     properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
@@ -146,7 +165,7 @@ class InMemoryTable(
       val key = getKey(row)
       dataMap += dataMap.get(key)
         .map(key -> _.withRow(row))
-        .getOrElse(key -> new BufferedRows().withRow(row))
+        .getOrElse(key -> new BufferedRows(key.toArray.mkString("/")).withRow(row))
     })
     this
   }
@@ -160,17 +179,38 @@ class InMemoryTable(
     TableCapability.TRUNCATE).asJava
 
   override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
-    () => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]))
+    new InMemoryScanBuilder(schema)
+  }
+
+  class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
+      with SupportsPushDownRequiredColumns {
+    private var schema: StructType = tableSchema
+
+    override def build: Scan =
+      new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema)
+
+    override def pruneColumns(requiredSchema: StructType): Unit = {
+      // if metadata columns are projected, return the table schema and metadata columns
+      val hasMetadataColumns = requiredSchema.map(_.name).exists(metadataColumnNames.contains)
+      if (hasMetadataColumns) {
+        schema = StructType(tableSchema ++ metadataColumnNames
+            .flatMap(name => metadataColumns.find(_.name == name))
+            .map(col => StructField(col.name, col.dataType, col.isNullable)))
+      }
+    }
   }
 
-  class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch {
+  class InMemoryBatchScan(data: Array[InputPartition], schema: StructType) extends Scan with Batch {
     override def readSchema(): StructType = schema
 
     override def toBatch: Batch = this
 
     override def planInputPartitions(): Array[InputPartition] = data
 
-    override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
+    override def createReaderFactory(): PartitionReaderFactory = {
+      val metadataColumns = schema.map(_.name).filter(metadataColumnNames.contains)
+      new BufferedRowsReaderFactory(metadataColumns)
+    }
   }
 
   override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
@@ -340,7 +380,8 @@ object InMemoryTable {
   }
 }
 
-class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
+class BufferedRows(
+    val key: String = "") extends WriterCommitMessage with InputPartition with Serializable {
   val rows = new mutable.ArrayBuffer[InternalRow]()
 
   def withRow(row: InternalRow): BufferedRows = {
@@ -349,13 +390,24 @@ class BufferedRows extends WriterCommitMessage with InputPartition with Serializ
   }
 }
 
-private object BufferedRowsReaderFactory extends PartitionReaderFactory {
+private class BufferedRowsReaderFactory(
+    metadataColumns: Seq[String]) extends PartitionReaderFactory {
   override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
-    new BufferedRowsReader(partition.asInstanceOf[BufferedRows])
+    new BufferedRowsReader(partition.asInstanceOf[BufferedRows], metadataColumns)
   }
 }
 
-private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] {
+private class BufferedRowsReader(
+    partition: BufferedRows,
+    metadataColumns: Seq[String]) extends PartitionReader[InternalRow] {
+  private def addMetadata(row: InternalRow): InternalRow = {
+    val metadataRow = new GenericInternalRow(metadataColumns.map {
+      case "index" => index
+      case "_partition" => UTF8String.fromString(partition.key)
+    }.toArray)
+    new JoinedRow(row, metadataRow)
+  }
+
   private var index: Int = -1
 
   override def next(): Boolean = {
@@ -363,7 +415,7 @@ private class BufferedRowsReader(partition: BufferedRows) extends PartitionReade
     index < partition.rows.length
   }
 
-  override def get(): InternalRow = partition.rows(index)
+  override def get(): InternalRow = addMetadata(partition.rows(index))
 
   override def close(): Unit = {}
 }
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
index 81b1c81..0cbcad1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.encoders.RowEncoder
 import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table, TableCatalog}
 import org.apache.spark.sql.types.StructType
 
 case class DescribeTableExec(
@@ -41,6 +41,7 @@ case class DescribeTableExec(
     addPartitioning(rows)
 
     if (isExtended) {
+      addMetadataColumns(rows)
       addTableDetails(rows)
     }
     rows.toSeq
@@ -72,6 +73,19 @@ case class DescribeTableExec(
     }
   }
 
+  private def addMetadataColumns(rows: ArrayBuffer[InternalRow]): Unit = table match {
+    case hasMeta: SupportsMetadataColumns if hasMeta.metadataColumns.nonEmpty =>
+      rows += emptyRow()
+      rows += toCatalystRow("# Metadata Columns", "", "")
+      rows ++= hasMeta.metadataColumns.map { column =>
+        toCatalystRow(
+          column.name,
+          column.dataType.simpleString,
+          Option(column.comment()).getOrElse(""))
+      }
+    case _ =>
+  }
+
   private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
     rows += emptyRow()
     rows += toCatalystRow("# Partitioning", "", "")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index 7f6ae20..ce8edce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -96,13 +96,11 @@ object PushDownUtils extends PredicateHelper {
         val exprs = projects ++ filters
         val requiredColumns = AttributeSet(exprs.flatMap(_.references))
         val neededOutput = relation.output.filter(requiredColumns.contains)
-        if (neededOutput != relation.output) {
-          r.pruneColumns(neededOutput.toStructType)
-          val scan = r.build()
-          scan -> toOutputAttrs(scan.readSchema(), relation)
-        } else {
-          r.build() -> relation.output
-        }
+        r.pruneColumns(neededOutput.toStructType)
+        val scan = r.build()
+        // always project, in case the relation's output has been updated and doesn't match
+        // the underlying table schema
+        scan -> toOutputAttrs(scan.readSchema(), relation)
 
       case _ => scanBuilder.build() -> relation.output
     }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 5f7be7c..4eaf582 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -139,6 +139,10 @@ class DataSourceV2SQLSuite
       Array("# Partitioning", "", ""),
       Array("Part 0", "id", ""),
       Array("", "", ""),
+      Array("# Metadata Columns", "", ""),
+      Array("index", "string", "Metadata column used to conflict with a data column"),
+      Array("_partition", "string", "Partition key used to store the row"),
+      Array("", "", ""),
       Array("# Detailed Table Information", "", ""),
       Array("Name", "testcat.table_name", ""),
       Array("Comment", "this is a test table", ""),
@@ -2470,6 +2474,45 @@ class DataSourceV2SQLSuite
     }
   }
 
+  test("SPARK-31255: Project a metadata column") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+          "PARTITIONED BY (bucket(4, id), id)")
+      sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+      checkAnswer(
+        spark.sql(s"SELECT id, data, _partition FROM $t1"),
+        Seq(Row(1, "a", "3/1"), Row(2, "b", "2/2"), Row(3, "c", "2/3")))
+    }
+  }
+
+  test("SPARK-31255: Projects data column when metadata column has the same name") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (index bigint, data string) USING $v2Format " +
+          "PARTITIONED BY (bucket(4, index), index)")
+      sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
+
+      checkAnswer(
+        spark.sql(s"SELECT index, data, _partition FROM $t1"),
+        Seq(Row(3, "c", "2/3"), Row(2, "b", "2/2"), Row(1, "a", "3/1")))
+    }
+  }
+
+  test("SPARK-31255: * expansion does not include metadata columns") {
+    val t1 = s"${catalogAndNamespace}table"
+    withTable(t1) {
+      sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+          "PARTITIONED BY (bucket(4, id), id)")
+      sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
+
+      checkAnswer(
+        spark.sql(s"SELECT * FROM $t1"),
+        Seq(Row(3, "c"), Row(2, "b"), Row(1, "a")))
+    }
+  }
+
   private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
     val e = intercept[AnalysisException] {
       sql(s"$sqlCommand $sqlParams")


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org