You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2023/02/28 01:35:25 UTC
[spark] branch master updated: [SPARK-39859][SQL][FOLLOWUP] Only get ColStats when isExtended is true in Describe Column
This is an automated email from the ASF dual-hosted git repository.
wenchen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new cf591580b08 [SPARK-39859][SQL][FOLLOWUP] Only get ColStats when isExtended is true in Describe Column
cf591580b08 is described below
commit cf591580b08889384633c093972e45c289bce979
Author: huaxingao <hu...@apple.com>
AuthorDate: Tue Feb 28 09:35:08 2023 +0800
[SPARK-39859][SQL][FOLLOWUP] Only get ColStats when isExtended is true in Describe Column
### What changes were proposed in this pull request?
get ColStats in `DescribeColumnExec` when `isExtended` is true
### Why are the changes needed?
To make code cleaner
### Does this PR introduce _any_ user-facing change?
No
### How was this patch tested?
existing test
Closes #40139 from huaxingao/describe_followup.
Authored-by: huaxingao <hu...@apple.com>
Signed-off-by: Wenchen Fan <we...@databricks.com>
---
.../datasources/v2/DataSourceV2Strategy.scala | 11 +--
.../datasources/v2/DescribeColumnExec.scala | 78 +++++++++++++---------
2 files changed, 49 insertions(+), 40 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
index 71ffe65b42a..4d84c42bc5b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DataSourceV2Strategy.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.connector.catalog.CatalogV2Util.structTypeToV2Column
import org.apache.spark.sql.connector.catalog.index.SupportsIndex
import org.apache.spark.sql.connector.expressions.{FieldReference, LiteralValue}
import org.apache.spark.sql.connector.expressions.filter.{And => V2And, Not => V2Not, Or => V2Or, Predicate}
-import org.apache.spark.sql.connector.read.{LocalScan, SupportsReportStatistics}
+import org.apache.spark.sql.connector.read.LocalScan
import org.apache.spark.sql.connector.read.streaming.{ContinuousStream, MicroBatchStream}
import org.apache.spark.sql.connector.write.V1Write
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
@@ -340,14 +340,7 @@ class DataSourceV2Strategy(session: SparkSession) extends Strategy with Predicat
case DescribeColumn(r: ResolvedTable, column, isExtended, output) =>
column match {
case c: Attribute =>
- val colStats =
- r.table.asReadable.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match {
- case s: SupportsReportStatistics =>
- val stats = s.estimateStatistics()
- Some(stats.columnStats().get(FieldReference.column(c.name)))
- case _ => None
- }
- DescribeColumnExec(output, c, isExtended, colStats) :: Nil
+ DescribeColumnExec(output, c, isExtended, r.table) :: Nil
case nested =>
throw QueryCompilationErrors.commandNotSupportNestedColumnError(
"DESC TABLE COLUMN", toPrettySQL(nested))
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala
index 491c214080a..61ccda3fc95 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/DescribeColumnExec.scala
@@ -22,13 +22,16 @@ import scala.collection.mutable.ArrayBuffer
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.expressions.Attribute
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
-import org.apache.spark.sql.connector.read.colstats.ColumnStatistics
+import org.apache.spark.sql.connector.catalog.{SupportsRead, Table}
+import org.apache.spark.sql.connector.expressions.FieldReference
+import org.apache.spark.sql.connector.read.SupportsReportStatistics
+import org.apache.spark.sql.util.CaseInsensitiveStringMap
case class DescribeColumnExec(
override val output: Seq[Attribute],
column: Attribute,
isExtended: Boolean,
- colStats: Option[ColumnStatistics] = None) extends LeafV2CommandExec {
+ table: Table) extends LeafV2CommandExec {
override protected def run(): Seq[InternalRow] = {
val rows = new ArrayBuffer[InternalRow]()
@@ -44,41 +47,54 @@ case class DescribeColumnExec(
CharVarcharUtils.getRawType(column.metadata).getOrElse(column.dataType).catalogString)
rows += toCatalystRow("comment", comment)
- if (isExtended && colStats.nonEmpty) {
- if (colStats.get.min().isPresent) {
- rows += toCatalystRow("min", colStats.get.min().toString)
- } else {
- rows += toCatalystRow("min", "NULL")
+ if (isExtended) {
+ val colStats = table match {
+ case read: SupportsRead =>
+ read.newScanBuilder(CaseInsensitiveStringMap.empty()).build() match {
+ case s: SupportsReportStatistics =>
+ val stats = s.estimateStatistics()
+ Some(stats.columnStats().get(FieldReference.column(column.name)))
+ case _ => None
+ }
+ case _ => None
}
- if (colStats.get.max().isPresent) {
- rows += toCatalystRow("max", colStats.get.max().toString)
- } else {
- rows += toCatalystRow("max", "NULL")
- }
+ if (colStats.nonEmpty) {
+ if (colStats.get.min().isPresent) {
+ rows += toCatalystRow("min", colStats.get.min().toString)
+ } else {
+ rows += toCatalystRow("min", "NULL")
+ }
- if (colStats.get.nullCount().isPresent) {
- rows += toCatalystRow("num_nulls", colStats.get.nullCount().getAsLong.toString)
- } else {
- rows += toCatalystRow("num_nulls", "NULL")
- }
+ if (colStats.get.max().isPresent) {
+ rows += toCatalystRow("max", colStats.get.max().toString)
+ } else {
+ rows += toCatalystRow("max", "NULL")
+ }
- if (colStats.get.distinctCount().isPresent) {
- rows += toCatalystRow("distinct_count", colStats.get.distinctCount().getAsLong.toString)
- } else {
- rows += toCatalystRow("distinct_count", "NULL")
- }
+ if (colStats.get.nullCount().isPresent) {
+ rows += toCatalystRow("num_nulls", colStats.get.nullCount().getAsLong.toString)
+ } else {
+ rows += toCatalystRow("num_nulls", "NULL")
+ }
- if (colStats.get.avgLen().isPresent) {
- rows += toCatalystRow("avg_col_len", colStats.get.avgLen().getAsLong.toString)
- } else {
- rows += toCatalystRow("avg_col_len", "NULL")
- }
+ if (colStats.get.distinctCount().isPresent) {
+ rows += toCatalystRow("distinct_count", colStats.get.distinctCount().getAsLong.toString)
+ } else {
+ rows += toCatalystRow("distinct_count", "NULL")
+ }
+
+ if (colStats.get.avgLen().isPresent) {
+ rows += toCatalystRow("avg_col_len", colStats.get.avgLen().getAsLong.toString)
+ } else {
+ rows += toCatalystRow("avg_col_len", "NULL")
+ }
- if (colStats.get.maxLen().isPresent) {
- rows += toCatalystRow("max_col_len", colStats.get.maxLen().getAsLong.toString)
- } else {
- rows += toCatalystRow("max_col_len", "NULL")
+ if (colStats.get.maxLen().isPresent) {
+ rows += toCatalystRow("max_col_len", colStats.get.maxLen().getAsLong.toString)
+ } else {
+ rows += toCatalystRow("max_col_len", "NULL")
+ }
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org