You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2024/02/20 06:50:04 UTC
(spark) branch master updated: [SPARK-45789][SQL] Support DESCRIBE TABLE for clustering columns
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new f0f35c8b1c8f [SPARK-45789][SQL] Support DESCRIBE TABLE for clustering columns
f0f35c8b1c8f is described below
commit f0f35c8b1c8f3b1d7f7c2b79945eb29ffb3c8f9a
Author: Terry Kim <yu...@gmail.com>
AuthorDate: Tue Feb 20 14:49:49 2024 +0800
[SPARK-45789][SQL] Support DESCRIBE TABLE for clustering columns
### What changes were proposed in this pull request?
This PR proposes to add clustering column info as the output of `DESCRIBE TABLE`.
### Why are the changes needed?
Currently, it's not easy to retrieve clustering column info; you can do it via catalog APIs.
### Does this PR introduce _any_ user-facing change?
Yes. Now, when you run `DESCRIBE TABLE` on clustered tables, you will see the "Clustering Information" as follows:
```
CREATE TABLE tbl (col1 STRING, col2 INT) using parquet CLUSTER BY (col1, col2);
DESC tbl;
+------------------------+---------+-------+
|col_name |data_type|comment|
+------------------------+---------+-------+
|col1 |string |NULL |
|col2 |int |NULL |
|# Clustering Information| | |
|# col_name |data_type|comment|
|col1 |string |NULL |
|col2 |int |NULL |
+------------------------+---------+-------+
```
### How was this patch tested?
Added new unit tests.
### Was this patch authored or co-authored using generative AI tooling?
No
Closes #45077 from imback82/describe_clustered_table.
Authored-by: Terry Kim <yu...@gmail.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../spark/sql/catalyst/catalog/interface.scala | 8 ++++-
.../spark/sql/execution/command/tables.scala | 24 +++++++++++++++
.../datasources/v2/DescribeTableExec.scala | 36 ++++++++++++++++++++--
.../execution/command/DescribeTableSuiteBase.scala | 21 +++++++++++++
4 files changed, 85 insertions(+), 4 deletions(-)
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 0a1a40a88522..10428877ba8d 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -43,7 +43,7 @@ import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUti
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util._
import org.apache.spark.sql.connector.catalog.CatalogManager
-import org.apache.spark.sql.connector.expressions.{FieldReference, NamedReference}
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, FieldReference, NamedReference, Transform}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
@@ -224,6 +224,12 @@ object ClusterBySpec {
ClusterBySpec(normalizedColumns)
}
+
+ def extractClusterBySpec(transforms: Seq[Transform]): Option[ClusterBySpec] = {
+ transforms.collectFirst {
+ case ClusterByTransform(columnNames) => ClusterBySpec(columnNames)
+ }
+ }
}
/**
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 2f8fca7cfd73..fa288fd94ea9 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -642,6 +642,7 @@ case class DescribeTableCommand(
}
describePartitionInfo(metadata, result)
+ describeClusteringInfo(metadata, result)
if (partitionSpec.nonEmpty) {
// Outputs the partition-specific info for the DDL command:
@@ -667,6 +668,29 @@ case class DescribeTableCommand(
}
}
+ private def describeClusteringInfo(
+ table: CatalogTable,
+ buffer: ArrayBuffer[Row]): Unit = {
+ table.clusterBySpec.foreach { clusterBySpec =>
+ append(buffer, "# Clustering Information", "", "")
+ append(buffer, s"# ${output.head.name}", output(1).name, output(2).name)
+ clusterBySpec.columnNames.map { fieldNames =>
+ val nestedField = table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq)
+ assert(nestedField.isDefined,
+ "The clustering column " +
+ s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")} " +
+ s"was not found in the table schema ${table.schema.catalogString}.")
+ nestedField.get
+ }.map { case (path, field) =>
+ append(
+ buffer,
+ (path :+ field.name).map(quoteIfNeeded).mkString("."),
+ field.dataType.simpleString,
+ field.getComment().orNull)
+ }
+ }
+ }
+
private def describeFormattedTableInfo(table: CatalogTable, buffer: ArrayBuffer[Row]): Unit = {
// The following information has been already shown in the previous outputs
val excludedTableInfo = Seq(
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
index a225dffb075b..7f7f280d8cdc 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeTableExec.scala
@@ -21,11 +21,11 @@ import scala.collection.mutable.ArrayBuffer
import scala.jdk.CollectionConverters._
import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.catalyst.catalog.CatalogTableType
+import org.apache.spark.sql.catalyst.catalog.{CatalogTableType, ClusterBySpec}
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.{quoteIfNeeded, ResolveDefaultColumns}
import org.apache.spark.sql.connector.catalog.{CatalogV2Util, SupportsMetadataColumns, SupportsRead, Table, TableCatalog}
-import org.apache.spark.sql.connector.expressions.IdentityTransform
+import org.apache.spark.sql.connector.expressions.{ClusterByTransform, IdentityTransform}
import org.apache.spark.sql.connector.read.SupportsReportStatistics
import org.apache.spark.sql.util.CaseInsensitiveStringMap
import org.apache.spark.util.ArrayImplicits._
@@ -38,6 +38,7 @@ case class DescribeTableExec(
val rows = new ArrayBuffer[InternalRow]()
addSchema(rows)
addPartitioning(rows)
+ addClustering(rows)
if (isExtended) {
addMetadataColumns(rows)
@@ -99,6 +100,32 @@ case class DescribeTableExec(
case _ =>
}
+ private def addClusteringToRows(
+ clusterBySpec: ClusterBySpec,
+ rows: ArrayBuffer[InternalRow]): Unit = {
+ rows += toCatalystRow("# Clustering Information", "", "")
+ rows += toCatalystRow(s"# ${output.head.name}", output(1).name, output(2).name)
+ rows ++= clusterBySpec.columnNames.map { fieldNames =>
+ val nestedField = table.schema.findNestedField(fieldNames.fieldNames.toIndexedSeq)
+ assert(nestedField.isDefined,
+ "The clustering column " +
+ s"${fieldNames.fieldNames.map(quoteIfNeeded).mkString(".")} " +
+ s"was not found in the table schema ${table.schema.catalogString}.")
+ nestedField.get
+ }.map { case (path, field) =>
+ toCatalystRow(
+ (path :+ field.name).map(quoteIfNeeded).mkString("."),
+ field.dataType.simpleString,
+ field.getComment().orNull)
+ }
+ }
+
+ private def addClustering(rows: ArrayBuffer[InternalRow]): Unit = {
+ ClusterBySpec.extractClusterBySpec(table.partitioning.toIndexedSeq).foreach { clusterBySpec =>
+ addClusteringToRows(clusterBySpec, rows)
+ }
+ }
+
private def addTableStats(rows: ArrayBuffer[InternalRow]): Unit = table match {
case read: SupportsRead =>
read.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match {
@@ -117,7 +144,10 @@ case class DescribeTableExec(
}
private def addPartitioning(rows: ArrayBuffer[InternalRow]): Unit = {
- if (table.partitioning.nonEmpty) {
+ // Clustering columns are handled in addClustering().
+ val partitioning = table.partitioning
+ .filter(t => !t.isInstanceOf[ClusterByTransform])
+ if (partitioning.nonEmpty) {
val partitionColumnsOnly = table.partitioning.forall(t => t.isInstanceOf[IdentityTransform])
if (partitionColumnsOnly) {
rows += toCatalystRow("# Partition Information", "", "")
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
index 4a7d5551fe52..c80cedca29f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/command/DescribeTableSuiteBase.scala
@@ -178,4 +178,25 @@ trait DescribeTableSuiteBase extends QueryTest with DDLCommandTestUtils {
assert(errMsg === "DESC TABLE COLUMN does not support nested column: col.x.")
}
}
+
+ test("describe a clustered table") {
+ withNamespaceAndTable("ns", "tbl") { tbl =>
+ sql(s"CREATE TABLE $tbl (col1 STRING COMMENT 'this is comment', col2 struct<x:int, y:int>) " +
+ s"$defaultUsing CLUSTER BY (col1, col2.x)")
+ val descriptionDf = sql(s"DESC $tbl")
+ assert(descriptionDf.schema.map(field => (field.name, field.dataType)) === Seq(
+ ("col_name", StringType),
+ ("data_type", StringType),
+ ("comment", StringType)))
+ QueryTest.checkAnswer(
+ descriptionDf,
+ Seq(
+ Row("col1", "string", "this is comment"),
+ Row("col2", "struct<x:int,y:int>", null),
+ Row("# Clustering Information", "", ""),
+ Row("# col_name", "data_type", "comment"),
+ Row("col1", "string", "this is comment"),
+ Row("col2.x", "int", null)))
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org