You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by br...@apache.org on 2020/11/18 22:09:08 UTC
[spark] branch master updated: [SPARK-31255][SQL] Add
SupportsMetadataColumns to DSv2
This is an automated email from the ASF dual-hosted git repository.
brkyvz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 1df69f7 [SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2
1df69f7 is described below
commit 1df69f7e324aa799c05f6158e433371c5eeed8ce
Author: Ryan Blue <bl...@apache.org>
AuthorDate: Wed Nov 18 14:07:51 2020 -0800
[SPARK-31255][SQL] Add SupportsMetadataColumns to DSv2
### What changes were proposed in this pull request?
This adds support for metadata columns to DataSourceV2. If a source implements `SupportsMetadataColumns` it must also implement `SupportsPushDownRequiredColumns` to support projecting those columns.
The analyzer is updated to resolve metadata columns from `LogicalPlan.metadataOutput`, and this adds a rule that will add metadata columns to the output of `DataSourceV2Relation` if one is used.
### Why are the changes needed?
This is the solution discussed for exposing additional data in the Kafka source. It is also needed for a generic `MERGE INTO` plan.
### Does this PR introduce any user-facing change?
Yes. Users can project additional columns from sources that implement the new API. This also updates `DescribeTableExec` to show metadata columns.
### How was this patch tested?
Will include new unit tests.
Closes #28027 from rdblue/add-dsv2-metadata-columns.
Authored-by: Ryan Blue <bl...@apache.org>
Signed-off-by: Burak Yavuz <br...@gmail.com>
---
.../sql/connector/catalog/MetadataColumn.java | 58 +++++++++++++++++
.../connector/catalog/SupportsMetadataColumns.java | 37 +++++++++++
.../spark/sql/catalyst/analysis/Analyzer.scala | 24 +++++++
.../sql/catalyst/plans/logical/LogicalPlan.scala | 6 +-
.../plans/logical/basicLogicalOperators.scala | 6 ++
.../datasources/v2/DataSourceV2Implicits.scala | 16 ++++-
.../datasources/v2/DataSourceV2Relation.scala | 26 +++++++-
.../apache/spark/sql/connector/InMemoryTable.scala | 74 ++++++++++++++++++----
.../datasources/v2/DescribeTableExec.scala | 16 ++++-
.../execution/datasources/v2/PushDownUtils.scala | 12 ++--
.../spark/sql/connector/DataSourceV2SQLSuite.scala | 43 +++++++++++++
11 files changed, 296 insertions(+), 22 deletions(-)
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java
new file mode 100644
index 0000000..8aefa28
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/MetadataColumn.java
@@ -0,0 +1,58 @@
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.expressions.Transform;
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * Interface for a metadata column.
+ * <p>
+ * A metadata column can expose additional metadata about a row. For example, rows from Kafka can
+ * use metadata columns to expose a message's topic, partition number, and offset.
+ * <p>
+ * A metadata column could also be the result of a transform applied to a value in the row. For
+ * example, a partition value produced by bucket(id, 16) could be exposed by a metadata column. In
+ * this case, {@link #transform()} should return a non-null {@link Transform} that produced the
+ * metadata column's values.
+ */
+@Evolving
+public interface MetadataColumn {
+ /**
+ * The name of this metadata column.
+ *
+ * @return a String name
+ */
+ String name();
+
+ /**
+ * The data type of values in this metadata column.
+ *
+ * @return a {@link DataType}
+ */
+ DataType dataType();
+
+ /**
+ * @return whether values produced by this metadata column may be null
+ */
+ default boolean isNullable() {
+ return true;
+ }
+
+ /**
+ * Documentation for this metadata column, or null.
+ *
+ * @return a documentation String
+ */
+ default String comment() {
+ return null;
+ }
+
+ /**
+ * The {@link Transform} used to produce this metadata column from data rows, or null.
+ *
+ * @return a {@link Transform} used to produce the column's values, or null if there isn't one
+ */
+ default Transform transform() {
+ return null;
+ }
+}
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java
new file mode 100644
index 0000000..fc31349
--- /dev/null
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/connector/catalog/SupportsMetadataColumns.java
@@ -0,0 +1,37 @@
+package org.apache.spark.sql.connector.catalog;
+
+import org.apache.spark.annotation.Evolving;
+import org.apache.spark.sql.connector.read.SupportsPushDownRequiredColumns;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+
+/**
+ * An interface for exposing data columns for a table that are not in the table schema. For example,
+ * a file source could expose a "file" column that contains the path of the file that contained each
+ * row.
+ * <p>
+ * The columns returned by {@link #metadataColumns()} may be passed as {@link StructField} in
+ * requested projections. Sources that implement this interface and column projection using
+ * {@link SupportsPushDownRequiredColumns} must accept metadata fields passed to
+ * {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
+ * <p>
+ * If a table column and a metadata column have the same name, the metadata column will never be
+ * requested. It is recommended that Table implementations reject data column name that conflict
+ * with metadata column names.
+ */
+@Evolving
+public interface SupportsMetadataColumns extends Table {
+ /**
+ * Metadata columns that are supported by this {@link Table}.
+ * <p>
+ * The columns returned by this method may be passed as {@link StructField} in requested
+ * projections using {@link SupportsPushDownRequiredColumns#pruneColumns(StructType)}.
+ * <p>
+ * If a table column and a metadata column have the same name, the metadata column will never be
+ * requested and is ignored. It is recommended that Table implementations reject data column names
+ * that conflict with metadata column names.
+ *
+ * @return an array of {@link MetadataColumn}
+ */
+ MetadataColumn[] metadataColumns();
+}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
index 14b50f4..8d95d8c 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/analysis/Analyzer.scala
@@ -221,6 +221,7 @@ class Analyzer(override val catalogManager: CatalogManager)
ResolveRelations ::
ResolveTables ::
ResolvePartitionSpec ::
+ AddMetadataColumns ::
ResolveReferences ::
ResolveCreateNamedStruct ::
ResolveDeserializer ::
@@ -917,6 +918,29 @@ class Analyzer(override val catalogManager: CatalogManager)
}
/**
+ * Adds metadata columns to output for child relations when nodes are missing resolved attributes.
+ *
+ * References to metadata columns are resolved using columns from [[LogicalPlan.metadataOutput]],
+ * but the relation's output does not include the metadata columns until the relation is replaced
+ * using [[DataSourceV2Relation.withMetadataColumns()]]. Unless this rule adds metadata to the
+ * relation's output, the analyzer will detect that nothing produces the columns.
+ *
+ * This rule only adds metadata columns when a node is resolved but is missing input from its
+ * children. This ensures that metadata columns are not added to the plan unless they are used. By
+ * checking only resolved nodes, this ensures that * expansion is already done so that metadata
+ * columns are not accidentally selected by *.
+ */
+ object AddMetadataColumns extends Rule[LogicalPlan] {
+ def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperatorsUp {
+ case node if node.resolved && node.children.nonEmpty && node.missingInput.nonEmpty =>
+ node resolveOperatorsUp {
+ case rel: DataSourceV2Relation =>
+ rel.withMetadataColumns()
+ }
+ }
+ }
+
+ /**
* Resolve table relations with concrete relations from v2 catalog.
*
* [[ResolveRelations]] still resolves v1 tables.
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
index 48dfc5fd..ad5c3fd 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/LogicalPlan.scala
@@ -33,6 +33,9 @@ abstract class LogicalPlan
with QueryPlanConstraints
with Logging {
+ /** Metadata fields that can be projected from this node */
+ def metadataOutput: Seq[Attribute] = children.flatMap(_.metadataOutput)
+
/** Returns true if this subtree has data from a streaming data source. */
def isStreaming: Boolean = children.exists(_.isStreaming)
@@ -86,7 +89,8 @@ abstract class LogicalPlan
}
}
- private[this] lazy val childAttributes = AttributeSeq(children.flatMap(_.output))
+ private[this] lazy val childAttributes =
+ AttributeSeq(children.flatMap(c => c.output ++ c.metadataOutput))
private[this] lazy val outputAttributes = AttributeSeq(output)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
index 17bf704..4e7923b 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/basicLogicalOperators.scala
@@ -886,6 +886,12 @@ case class SubqueryAlias(
val qualifierList = identifier.qualifier :+ alias
child.output.map(_.withQualifier(qualifierList))
}
+
+ override def metadataOutput: Seq[Attribute] = {
+ val qualifierList = identifier.qualifier :+ alias
+ child.metadataOutput.map(_.withQualifier(qualifierList))
+ }
+
override def doCanonicalize(): LogicalPlan = child.canonicalized
}
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
index dfacf6e..8d91ea7 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Implicits.scala
@@ -21,7 +21,9 @@ import scala.collection.JavaConverters._
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.analysis.{PartitionSpec, ResolvedPartitionSpec, UnresolvedPartitionSpec}
-import org.apache.spark.sql.connector.catalog.{SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.catalyst.expressions.AttributeReference
+import org.apache.spark.sql.connector.catalog.{MetadataColumn, SupportsAtomicPartitionManagement, SupportsDelete, SupportsPartitionManagement, SupportsRead, SupportsWrite, Table, TableCapability}
+import org.apache.spark.sql.types.{StructField, StructType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
object DataSourceV2Implicits {
@@ -78,6 +80,18 @@ object DataSourceV2Implicits {
def supportsAny(capabilities: TableCapability*): Boolean = capabilities.exists(supports)
}
+ implicit class MetadataColumnsHelper(metadata: Array[MetadataColumn]) {
+ def asStruct: StructType = {
+ val fields = metadata.map { metaCol =>
+ val field = StructField(metaCol.name, metaCol.dataType, metaCol.isNullable)
+ Option(metaCol.comment).map(field.withComment).getOrElse(field)
+ }
+ StructType(fields)
+ }
+
+ def toAttributes: Seq[AttributeReference] = asStruct.toAttributes
+ }
+
implicit class OptionsHelper(options: Map[String, String]) {
def asOptions: CaseInsensitiveStringMap = {
new CaseInsensitiveStringMap(options.asJava)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
index 45d8949..b09ccff 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Relation.scala
@@ -21,10 +21,11 @@ import org.apache.spark.sql.catalyst.analysis.{MultiInstanceRelation, NamedRelat
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
import org.apache.spark.sql.catalyst.util.truncatedString
-import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, Table, TableCapability}
+import org.apache.spark.sql.connector.catalog.{CatalogPlugin, Identifier, MetadataColumn, SupportsMetadataColumns, Table, TableCapability}
import org.apache.spark.sql.connector.read.{Scan, ScanBuilder, Statistics => V2Statistics, SupportsReportStatistics}
import org.apache.spark.sql.connector.read.streaming.{Offset, SparkDataStream}
import org.apache.spark.sql.connector.write.WriteBuilder
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.Utils
@@ -48,6 +49,21 @@ case class DataSourceV2Relation(
import DataSourceV2Implicits._
+ override lazy val metadataOutput: Seq[AttributeReference] = table match {
+ case hasMeta: SupportsMetadataColumns =>
+ val resolve = SQLConf.get.resolver
+ val outputNames = outputSet.map(_.name)
+ def isOutputColumn(col: MetadataColumn): Boolean = {
+ outputNames.exists(name => resolve(col.name, name))
+ }
+ // filter out metadata columns that have names conflicting with output columns. if the table
+ // has a column "line" and the table can produce a metadata column called "line", then the
+ // data column should be returned, not the metadata column.
+ hasMeta.metadataColumns.filterNot(isOutputColumn).toAttributes
+ case _ =>
+ Nil
+ }
+
override def name: String = table.name()
override def skipSchemaResolution: Boolean = table.supports(TableCapability.ACCEPT_ANY_SCHEMA)
@@ -78,6 +94,14 @@ case class DataSourceV2Relation(
override def newInstance(): DataSourceV2Relation = {
copy(output = output.map(_.newInstance()))
}
+
+ def withMetadataColumns(): DataSourceV2Relation = {
+ if (metadataOutput.nonEmpty) {
+ DataSourceV2Relation(table, output ++ metadataOutput, catalog, identifier, options)
+ } else {
+ this
+ }
+ }
}
/**
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
index b032560..3b47271 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/connector/InMemoryTable.scala
@@ -27,6 +27,7 @@ import scala.collection.mutable
import org.scalatest.Assertions._
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.expressions.{GenericInternalRow, JoinedRow}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.sql.connector.expressions.{BucketTransform, DaysTransform, HoursTransform, IdentityTransform, MonthsTransform, Transform, YearsTransform}
@@ -34,8 +35,9 @@ import org.apache.spark.sql.connector.read._
import org.apache.spark.sql.connector.write._
import org.apache.spark.sql.connector.write.streaming.{StreamingDataWriterFactory, StreamingWrite}
import org.apache.spark.sql.sources.{And, EqualTo, Filter, IsNotNull}
-import org.apache.spark.sql.types.{DataType, DateType, StructType, TimestampType}
+import org.apache.spark.sql.types.{DataType, DateType, StringType, StructField, StructType, TimestampType}
import org.apache.spark.sql.util.CaseInsensitiveStringMap
+import org.apache.spark.unsafe.types.UTF8String
/**
* A simple in-memory table. Rows are stored as a buffered group produced by each output task.
@@ -45,7 +47,24 @@ class InMemoryTable(
val schema: StructType,
override val partitioning: Array[Transform],
override val properties: util.Map[String, String])
- extends Table with SupportsRead with SupportsWrite with SupportsDelete {
+ extends Table with SupportsRead with SupportsWrite with SupportsDelete
+ with SupportsMetadataColumns {
+
+ private object PartitionKeyColumn extends MetadataColumn {
+ override def name: String = "_partition"
+ override def dataType: DataType = StringType
+ override def comment: String = "Partition key used to store the row"
+ }
+
+ private object IndexColumn extends MetadataColumn {
+ override def name: String = "index"
+ override def dataType: DataType = StringType
+ override def comment: String = "Metadata column used to conflict with a data column"
+ }
+
+ // purposely exposes a metadata column that conflicts with a data column in some tests
+ override val metadataColumns: Array[MetadataColumn] = Array(IndexColumn, PartitionKeyColumn)
+ private val metadataColumnNames = metadataColumns.map(_.name).toSet -- schema.map(_.name)
private val allowUnsupportedTransforms =
properties.getOrDefault("allow-unsupported-transforms", "false").toBoolean
@@ -146,7 +165,7 @@ class InMemoryTable(
val key = getKey(row)
dataMap += dataMap.get(key)
.map(key -> _.withRow(row))
- .getOrElse(key -> new BufferedRows().withRow(row))
+ .getOrElse(key -> new BufferedRows(key.toArray.mkString("/")).withRow(row))
})
this
}
@@ -160,17 +179,38 @@ class InMemoryTable(
TableCapability.TRUNCATE).asJava
override def newScanBuilder(options: CaseInsensitiveStringMap): ScanBuilder = {
- () => new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]))
+ new InMemoryScanBuilder(schema)
+ }
+
+ class InMemoryScanBuilder(tableSchema: StructType) extends ScanBuilder
+ with SupportsPushDownRequiredColumns {
+ private var schema: StructType = tableSchema
+
+ override def build: Scan =
+ new InMemoryBatchScan(data.map(_.asInstanceOf[InputPartition]), schema)
+
+ override def pruneColumns(requiredSchema: StructType): Unit = {
+ // if metadata columns are projected, return the table schema and metadata columns
+ val hasMetadataColumns = requiredSchema.map(_.name).exists(metadataColumnNames.contains)
+ if (hasMetadataColumns) {
+ schema = StructType(tableSchema ++ metadataColumnNames
+ .flatMap(name => metadataColumns.find(_.name == name))
+ .map(col => StructField(col.name, col.dataType, col.isNullable)))
+ }
+ }
}
- class InMemoryBatchScan(data: Array[InputPartition]) extends Scan with Batch {
+ class InMemoryBatchScan(data: Array[InputPartition], schema: StructType) extends Scan with Batch {
override def readSchema(): StructType = schema
override def toBatch: Batch = this
override def planInputPartitions(): Array[InputPartition] = data
- override def createReaderFactory(): PartitionReaderFactory = BufferedRowsReaderFactory
+ override def createReaderFactory(): PartitionReaderFactory = {
+ val metadataColumns = schema.map(_.name).filter(metadataColumnNames.contains)
+ new BufferedRowsReaderFactory(metadataColumns)
+ }
}
override def newWriteBuilder(info: LogicalWriteInfo): WriteBuilder = {
@@ -340,7 +380,8 @@ object InMemoryTable {
}
}
-class BufferedRows extends WriterCommitMessage with InputPartition with Serializable {
+class BufferedRows(
+ val key: String = "") extends WriterCommitMessage with InputPartition with Serializable {
val rows = new mutable.ArrayBuffer[InternalRow]()
def withRow(row: InternalRow): BufferedRows = {
@@ -349,13 +390,24 @@ class BufferedRows extends WriterCommitMessage with InputPartition with Serializ
}
}
-private object BufferedRowsReaderFactory extends PartitionReaderFactory {
+private class BufferedRowsReaderFactory(
+ metadataColumns: Seq[String]) extends PartitionReaderFactory {
override def createReader(partition: InputPartition): PartitionReader[InternalRow] = {
- new BufferedRowsReader(partition.asInstanceOf[BufferedRows])
+ new BufferedRowsReader(partition.asInstanceOf[BufferedRows], metadataColumns)
}
}
-private class BufferedRowsReader(partition: BufferedRows) extends PartitionReader[InternalRow] {
+private class BufferedRowsReader(
+ partition: BufferedRows,
+ metadataColumns: Seq[String]) extends PartitionReader[InternalRow] {
+ private def addMetadata(row: InternalRow): InternalRow = {
+ val metadataRow = new GenericInternalRow(metadataColumns.map {
+ case "index" => index
+ case "_partition" => UTF8String.fromString(partition.key)
+ }.toArray)
+ new JoinedRow(row, metadataRow)
+ }
+
private var index: Int = -1
override def next(): Boolean = {
@@ -363,7 +415,7 @@ private class BufferedRowsReader(partition: BufferedRows) extends PartitionReade
index < partition.rows.length
}
- override def get(): InternalRow = partition.rows(index)
+ override def get(): InternalRow = addMetadata(partition.rows(index))
override def close(): Unit = {}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
index 81b1c81..0cbcad1 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
@@ -23,7 +23,7 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.catalyst.expressions.{Attribute, GenericRowWithSchema}
-import org.apache.spark.sql.connector.catalog.{CatalogV2Util, Table, TableCatalog}
+import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, Table, TableCatalog}
import org.apache.spark.sql.types.StructType
case class DescribeTableExec(
@@ -41,6 +41,7 @@ case class DescribeTableExec(
addPartitioning(rows)
if (isExtended) {
+ addMetadataColumns(rows)
addTableDetails(rows)
}
rows.toSeq
@@ -72,6 +73,19 @@ case class DescribeTableExec(
}
}
+ private def addMetadataColumns(rows: ArrayBuffer[InternalRow]): Unit = table match {
+ case hasMeta: SupportsMetadataColumns if hasMeta.metadataColumns.nonEmpty =>
+ rows += emptyRow()
+ rows += toCatalystRow("# Metadata Columns", "", "")
+ rows ++= hasMeta.metadataColumns.map { column =>
+ toCatalystRow(
+ column.name,
+ column.dataType.simpleString,
+ Option(column.comment()).getOrElse(""))
+ }
+ case _ =>
+ }
+
private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
rows += emptyRow()
rows += toCatalystRow("# Partitioning", "", "")
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
index 7f6ae20..ce8edce 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/PushDownUtils.scala
@@ -96,13 +96,11 @@ object PushDownUtils extends PredicateHelper {
val exprs = projects ++ filters
val requiredColumns = AttributeSet(exprs.flatMap(_.references))
val neededOutput = relation.output.filter(requiredColumns.contains)
- if (neededOutput != relation.output) {
- r.pruneColumns(neededOutput.toStructType)
- val scan = r.build()
- scan -> toOutputAttrs(scan.readSchema(), relation)
- } else {
- r.build() -> relation.output
- }
+ r.pruneColumns(neededOutput.toStructType)
+ val scan = r.build()
+ // always project, in case the relation's output has been updated and doesn't match
+ // the underlying table schema
+ scan -> toOutputAttrs(scan.readSchema(), relation)
case _ => scanBuilder.build() -> relation.output
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
index 5f7be7c..4eaf582 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/connector/DataSourceV2SQLSuite.scala
@@ -139,6 +139,10 @@ class DataSourceV2SQLSuite
Array("# Partitioning", "", ""),
Array("Part 0", "id", ""),
Array("", "", ""),
+ Array("# Metadata Columns", "", ""),
+ Array("index", "string", "Metadata column used to conflict with a data column"),
+ Array("_partition", "string", "Partition key used to store the row"),
+ Array("", "", ""),
Array("# Detailed Table Information", "", ""),
Array("Name", "testcat.table_name", ""),
Array("Comment", "this is a test table", ""),
@@ -2470,6 +2474,45 @@ class DataSourceV2SQLSuite
}
}
+ test("SPARK-31255: Project a metadata column") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (1, 'a'), (2, 'b'), (3, 'c')")
+
+ checkAnswer(
+ spark.sql(s"SELECT id, data, _partition FROM $t1"),
+ Seq(Row(1, "a", "3/1"), Row(2, "b", "2/2"), Row(3, "c", "2/3")))
+ }
+ }
+
+ test("SPARK-31255: Projects data column when metadata column has the same name") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (index bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, index), index)")
+ sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
+
+ checkAnswer(
+ spark.sql(s"SELECT index, data, _partition FROM $t1"),
+ Seq(Row(3, "c", "2/3"), Row(2, "b", "2/2"), Row(1, "a", "3/1")))
+ }
+ }
+
+ test("SPARK-31255: * expansion does not include metadata columns") {
+ val t1 = s"${catalogAndNamespace}table"
+ withTable(t1) {
+ sql(s"CREATE TABLE $t1 (id bigint, data string) USING $v2Format " +
+ "PARTITIONED BY (bucket(4, id), id)")
+ sql(s"INSERT INTO $t1 VALUES (3, 'c'), (2, 'b'), (1, 'a')")
+
+ checkAnswer(
+ spark.sql(s"SELECT * FROM $t1"),
+ Seq(Row(3, "c"), Row(2, "b"), Row(1, "a")))
+ }
+ }
+
private def testNotSupportedV2Command(sqlCommand: String, sqlParams: String): Unit = {
val e = intercept[AnalysisException] {
sql(s"$sqlCommand $sqlParams")
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org