You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/12 15:59:59 UTC
spark git commit: [SPARK-17642][SQL] support DESC EXTENDED/FORMATTED
table column commands
Repository: spark
Updated Branches:
refs/heads/master 957558235 -> 515910e9b
[SPARK-17642][SQL] support DESC EXTENDED/FORMATTED table column commands
## What changes were proposed in this pull request?
Support DESC (EXTENDED | FORMATTED) ? TABLE COLUMN command.
Support DESC EXTENDED | FORMATTED TABLE COLUMN command to show column-level statistics.
Do NOT support describe nested columns.
## How was this patch tested?
Added test cases.
Author: Zhenhua Wang <wz...@163.com>
Author: Zhenhua Wang <wa...@huawei.com>
Author: wangzhenhua <wa...@huawei.com>
Closes #16422 from wzhfy/descColumn.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/515910e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/515910e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/515910e9
Branch: refs/heads/master
Commit: 515910e9bdc1d1b7f0fc05cadc6aeb3a58860e2d
Parents: 9575582
Author: Zhenhua Wang <wz...@163.com>
Authored: Tue Sep 12 08:59:52 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Sep 12 08:59:52 2017 -0700
----------------------------------------------------------------------
.../apache/spark/sql/catalyst/parser/SqlBase.g4 | 2 +-
.../spark/sql/execution/SparkSqlParser.scala | 14 +-
.../spark/sql/execution/command/tables.scala | 72 +++++++-
.../sql-tests/inputs/describe-table-column.sql | 35 ++++
.../results/describe-table-column.sql.out | 184 +++++++++++++++++++
.../apache/spark/sql/SQLQueryTestSuite.scala | 10 +-
.../sql/execution/SparkSqlParserSuite.scala | 28 ++-
7 files changed, 332 insertions(+), 13 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 239e73e..33bc79a 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -270,7 +270,7 @@ describeFuncName
;
describeColName
- : identifier ('.' (identifier | STRING))*
+ : nameParts+=identifier ('.' nameParts+=identifier)*
;
ctes
http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d38919b..6de9ea0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -330,10 +330,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
* Create a [[DescribeTableCommand]] logical plan.
*/
override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
- // Describe column are not supported yet. Return null and let the parser decide
- // what to do with this (create an exception or pass it on to a different system).
+ val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null
if (ctx.describeColName != null) {
- null
+ if (ctx.partitionSpec != null) {
+ throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx)
+ } else {
+ DescribeColumnCommand(
+ visitTableIdentifier(ctx.tableIdentifier),
+ ctx.describeColName.nameParts.asScala.map(_.getText),
+ isExtended)
+ }
} else {
val partitionSpec = if (ctx.partitionSpec != null) {
// According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`.
@@ -348,7 +354,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
DescribeTableCommand(
visitTableIdentifier(ctx.tableIdentifier),
partitionSpec,
- ctx.EXTENDED != null || ctx.FORMATTED != null)
+ isExtended)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 1dddc1c..da0c815 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,13 +29,13 @@ import org.apache.hadoop.fs.Path
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
import org.apache.spark.sql.catalyst.catalog._
import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -631,6 +631,74 @@ case class DescribeTableCommand(
}
}
+/**
+ * A command to list the info for a column, including name, data type, column stats and comment.
+ * This function creates a [[DescribeColumnCommand]] logical plan.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ * DESCRIBE [EXTENDED|FORMATTED] table_name column_name;
+ * }}}
+ */
+case class DescribeColumnCommand(
+ table: TableIdentifier,
+ colNameParts: Seq[String],
+ isExtended: Boolean)
+ extends RunnableCommand {
+
+ override val output: Seq[Attribute] = {
+ Seq(
+ AttributeReference("info_name", StringType, nullable = false,
+ new MetadataBuilder().putString("comment", "name of the column info").build())(),
+ AttributeReference("info_value", StringType, nullable = false,
+ new MetadataBuilder().putString("comment", "value of the column info").build())()
+ )
+ }
+
+ override def run(sparkSession: SparkSession): Seq[Row] = {
+ val catalog = sparkSession.sessionState.catalog
+ val resolver = sparkSession.sessionState.conf.resolver
+ val relation = sparkSession.table(table).queryExecution.analyzed
+
+ val colName = UnresolvedAttribute(colNameParts).name
+ val field = {
+ relation.resolve(colNameParts, resolver).getOrElse {
+ throw new AnalysisException(s"Column $colName does not exist")
+ }
+ }
+ if (!field.isInstanceOf[Attribute]) {
+ // If the field is not an attribute after `resolve`, then it's a nested field.
+ throw new AnalysisException(
+ s"DESC TABLE COLUMN command does not support nested data types: $colName")
+ }
+
+ val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
+ val colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty)
+ val cs = colStats.get(field.name)
+
+ val comment = if (field.metadata.contains("comment")) {
+ Option(field.metadata.getString("comment"))
+ } else {
+ None
+ }
+
+ val buffer = ArrayBuffer[Row](
+ Row("col_name", field.name),
+ Row("data_type", field.dataType.catalogString),
+ Row("comment", comment.getOrElse("NULL"))
+ )
+ if (isExtended) {
+ // Show column stats when EXTENDED or FORMATTED is specified.
+ buffer += Row("min", cs.flatMap(_.min.map(_.toString)).getOrElse("NULL"))
+ buffer += Row("max", cs.flatMap(_.max.map(_.toString)).getOrElse("NULL"))
+ buffer += Row("num_nulls", cs.map(_.nullCount.toString).getOrElse("NULL"))
+ buffer += Row("distinct_count", cs.map(_.distinctCount.toString).getOrElse("NULL"))
+ buffer += Row("avg_col_len", cs.map(_.avgLen.toString).getOrElse("NULL"))
+ buffer += Row("max_col_len", cs.map(_.maxLen.toString).getOrElse("NULL"))
+ }
+ buffer
+ }
+}
/**
* A command for users to get tables in the given database.
http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql b/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql
new file mode 100644
index 0000000..24870de
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql
@@ -0,0 +1,35 @@
+-- Test temp table
+CREATE TEMPORARY VIEW desc_col_temp_table (key int COMMENT 'column_comment') USING PARQUET;
+
+DESC desc_col_temp_table key;
+
+DESC EXTENDED desc_col_temp_table key;
+
+DESC FORMATTED desc_col_temp_table key;
+
+-- Describe a column with qualified name
+DESC FORMATTED desc_col_temp_table desc_col_temp_table.key;
+
+-- Describe a non-existent column
+DESC desc_col_temp_table key1;
+
+-- Test persistent table
+CREATE TABLE desc_col_table (key int COMMENT 'column_comment') USING PARQUET;
+
+ANALYZE TABLE desc_col_table COMPUTE STATISTICS FOR COLUMNS key;
+
+DESC desc_col_table key;
+
+DESC EXTENDED desc_col_table key;
+
+DESC FORMATTED desc_col_table key;
+
+-- Test complex columns
+CREATE TABLE desc_col_complex_table (`a.b` int, col struct<x:int, y:string>) USING PARQUET;
+
+DESC FORMATTED desc_col_complex_table `a.b`;
+
+DESC FORMATTED desc_col_complex_table col;
+
+-- Describe a nested column
+DESC FORMATTED desc_col_complex_table col.x;
http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out
new file mode 100644
index 0000000..a51eef7
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out
@@ -0,0 +1,184 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 15
+
+
+-- !query 0
+CREATE TEMPORARY VIEW desc_col_temp_table (key int COMMENT 'column_comment') USING PARQUET
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+DESC desc_col_temp_table key
+-- !query 1 schema
+struct<info_name:string,info_value:string>
+-- !query 1 output
+col_name key
+data_type int
+comment column_comment
+
+
+-- !query 2
+DESC EXTENDED desc_col_temp_table key
+-- !query 2 schema
+struct<info_name:string,info_value:string>
+-- !query 2 output
+col_name key
+data_type int
+comment column_comment
+min NULL
+max NULL
+num_nulls NULL
+distinct_count NULL
+avg_col_len NULL
+max_col_len NULL
+
+
+-- !query 3
+DESC FORMATTED desc_col_temp_table key
+-- !query 3 schema
+struct<info_name:string,info_value:string>
+-- !query 3 output
+col_name key
+data_type int
+comment column_comment
+min NULL
+max NULL
+num_nulls NULL
+distinct_count NULL
+avg_col_len NULL
+max_col_len NULL
+
+
+-- !query 4
+DESC FORMATTED desc_col_temp_table desc_col_temp_table.key
+-- !query 4 schema
+struct<info_name:string,info_value:string>
+-- !query 4 output
+col_name key
+data_type int
+comment column_comment
+min NULL
+max NULL
+num_nulls NULL
+distinct_count NULL
+avg_col_len NULL
+max_col_len NULL
+
+
+-- !query 5
+DESC desc_col_temp_table key1
+-- !query 5 schema
+struct<>
+-- !query 5 output
+org.apache.spark.sql.AnalysisException
+Column key1 does not exist;
+
+
+-- !query 6
+CREATE TABLE desc_col_table (key int COMMENT 'column_comment') USING PARQUET
+-- !query 6 schema
+struct<>
+-- !query 6 output
+
+
+
+-- !query 7
+ANALYZE TABLE desc_col_table COMPUTE STATISTICS FOR COLUMNS key
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+DESC desc_col_table key
+-- !query 8 schema
+struct<info_name:string,info_value:string>
+-- !query 8 output
+col_name key
+data_type int
+comment column_comment
+
+
+-- !query 9
+DESC EXTENDED desc_col_table key
+-- !query 9 schema
+struct<info_name:string,info_value:string>
+-- !query 9 output
+col_name key
+data_type int
+comment column_comment
+min NULL
+max NULL
+num_nulls 0
+distinct_count 0
+avg_col_len 4
+max_col_len 4
+
+
+-- !query 10
+DESC FORMATTED desc_col_table key
+-- !query 10 schema
+struct<info_name:string,info_value:string>
+-- !query 10 output
+col_name key
+data_type int
+comment column_comment
+min NULL
+max NULL
+num_nulls 0
+distinct_count 0
+avg_col_len 4
+max_col_len 4
+
+
+-- !query 11
+CREATE TABLE desc_col_complex_table (`a.b` int, col struct<x:int, y:string>) USING PARQUET
+-- !query 11 schema
+struct<>
+-- !query 11 output
+
+
+
+-- !query 12
+DESC FORMATTED desc_col_complex_table `a.b`
+-- !query 12 schema
+struct<info_name:string,info_value:string>
+-- !query 12 output
+col_name a.b
+data_type int
+comment NULL
+min NULL
+max NULL
+num_nulls NULL
+distinct_count NULL
+avg_col_len NULL
+max_col_len NULL
+
+
+-- !query 13
+DESC FORMATTED desc_col_complex_table col
+-- !query 13 schema
+struct<info_name:string,info_value:string>
+-- !query 13 output
+col_name col
+data_type struct<x:int,y:string>
+comment NULL
+min NULL
+max NULL
+num_nulls NULL
+distinct_count NULL
+avg_col_len NULL
+max_col_len NULL
+
+
+-- !query 14
+DESC FORMATTED desc_col_complex_table col.x
+-- !query 14 schema
+struct<>
+-- !query 14 output
+org.apache.spark.sql.AnalysisException
+DESC TABLE COLUMN command does not support nested data types: col.x;
http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index aa000bd..e3901af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.rules.RuleExecutor
import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
-import org.apache.spark.sql.execution.command.DescribeTableCommand
+import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeTableCommand}
import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.types.StructType
@@ -214,11 +214,11 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
/** Executes a query and returns the result as (schema of the output, normalized output). */
private def getNormalizedResult(session: SparkSession, sql: String): (StructType, Seq[String]) = {
// Returns true if the plan is supposed to be sorted.
- def needSort(plan: LogicalPlan): Boolean = plan match {
+ def isSorted(plan: LogicalPlan): Boolean = plan match {
case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
- case _: DescribeTableCommand => true
+ case _: DescribeTableCommand | _: DescribeColumnCommand => true
case PhysicalOperation(_, _, Sort(_, true, _)) => true
- case _ => plan.children.iterator.exists(needSort)
+ case _ => plan.children.iterator.exists(isSorted)
}
try {
@@ -233,7 +233,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
.replaceAll("Last Access.*", s"Last Access $notIncludedMsg"))
// If the output is not pre-sorted, sort it.
- if (needSort(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted)
+ if (isSorted(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted)
} catch {
case a: AnalysisException =>
http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index fa7a866..107a2f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -249,8 +249,34 @@ class SparkSqlParserSuite extends AnalysisTest {
assertEqual("describe table formatted t",
DescribeTableCommand(
TableIdentifier("t"), Map.empty, isExtended = true))
+ }
+
+ test("describe table column") {
+ assertEqual("DESCRIBE t col",
+ DescribeColumnCommand(
+ TableIdentifier("t"), Seq("col"), isExtended = false))
+ assertEqual("DESCRIBE t `abc.xyz`",
+ DescribeColumnCommand(
+ TableIdentifier("t"), Seq("abc.xyz"), isExtended = false))
+ assertEqual("DESCRIBE t abc.xyz",
+ DescribeColumnCommand(
+ TableIdentifier("t"), Seq("abc", "xyz"), isExtended = false))
+ assertEqual("DESCRIBE t `a.b`.`x.y`",
+ DescribeColumnCommand(
+ TableIdentifier("t"), Seq("a.b", "x.y"), isExtended = false))
+
+ assertEqual("DESCRIBE TABLE t col",
+ DescribeColumnCommand(
+ TableIdentifier("t"), Seq("col"), isExtended = false))
+ assertEqual("DESCRIBE TABLE EXTENDED t col",
+ DescribeColumnCommand(
+ TableIdentifier("t"), Seq("col"), isExtended = true))
+ assertEqual("DESCRIBE TABLE FORMATTED t col",
+ DescribeColumnCommand(
+ TableIdentifier("t"), Seq("col"), isExtended = true))
- intercept("explain describe tables x", "Unsupported SQL statement")
+ intercept("DESCRIBE TABLE t PARTITION (ds='1970-01-01') col",
+ "DESC TABLE COLUMN for a specific partition is not supported")
}
test("analyze table statistics") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org