You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/09/12 15:59:59 UTC

spark git commit: [SPARK-17642][SQL] support DESC EXTENDED/FORMATTED table column commands

Repository: spark
Updated Branches:
  refs/heads/master 957558235 -> 515910e9b


[SPARK-17642][SQL] support DESC EXTENDED/FORMATTED table column commands

## What changes were proposed in this pull request?

Support DESC (EXTENDED | FORMATTED) ? TABLE COLUMN command.
Support DESC EXTENDED | FORMATTED TABLE COLUMN command to show column-level statistics.
Do NOT support describe nested columns.

## How was this patch tested?

Added test cases.

Author: Zhenhua Wang <wz...@163.com>
Author: Zhenhua Wang <wa...@huawei.com>
Author: wangzhenhua <wa...@huawei.com>

Closes #16422 from wzhfy/descColumn.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/515910e9
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/515910e9
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/515910e9

Branch: refs/heads/master
Commit: 515910e9bdc1d1b7f0fc05cadc6aeb3a58860e2d
Parents: 9575582
Author: Zhenhua Wang <wz...@163.com>
Authored: Tue Sep 12 08:59:52 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Tue Sep 12 08:59:52 2017 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |   2 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  14 +-
 .../spark/sql/execution/command/tables.scala    |  72 +++++++-
 .../sql-tests/inputs/describe-table-column.sql  |  35 ++++
 .../results/describe-table-column.sql.out       | 184 +++++++++++++++++++
 .../apache/spark/sql/SQLQueryTestSuite.scala    |  10 +-
 .../sql/execution/SparkSqlParserSuite.scala     |  28 ++-
 7 files changed, 332 insertions(+), 13 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 239e73e..33bc79a 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -270,7 +270,7 @@ describeFuncName
     ;
 
 describeColName
-    : identifier ('.' (identifier | STRING))*
+    : nameParts+=identifier ('.' nameParts+=identifier)*
     ;
 
 ctes

http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d38919b..6de9ea0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -330,10 +330,16 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
    * Create a [[DescribeTableCommand]] logical plan.
    */
   override def visitDescribeTable(ctx: DescribeTableContext): LogicalPlan = withOrigin(ctx) {
-    // Describe column are not supported yet. Return null and let the parser decide
-    // what to do with this (create an exception or pass it on to a different system).
+    val isExtended = ctx.EXTENDED != null || ctx.FORMATTED != null
     if (ctx.describeColName != null) {
-      null
+      if (ctx.partitionSpec != null) {
+        throw new ParseException("DESC TABLE COLUMN for a specific partition is not supported", ctx)
+      } else {
+        DescribeColumnCommand(
+          visitTableIdentifier(ctx.tableIdentifier),
+          ctx.describeColName.nameParts.asScala.map(_.getText),
+          isExtended)
+      }
     } else {
       val partitionSpec = if (ctx.partitionSpec != null) {
         // According to the syntax, visitPartitionSpec returns `Map[String, Option[String]]`.
@@ -348,7 +354,7 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
       DescribeTableCommand(
         visitTableIdentifier(ctx.tableIdentifier),
         partitionSpec,
-        ctx.EXTENDED != null || ctx.FORMATTED != null)
+        isExtended)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index 1dddc1c..da0c815 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -29,13 +29,13 @@ import org.apache.hadoop.fs.Path
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.CatalogTableType._
 import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
 import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeReference}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.execution.datasources.{DataSource, FileFormat, PartitioningUtils}
+import org.apache.spark.sql.execution.datasources.{DataSource, PartitioningUtils}
 import org.apache.spark.sql.execution.datasources.csv.CSVFileFormat
 import org.apache.spark.sql.execution.datasources.json.JsonFileFormat
 import org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat
@@ -631,6 +631,74 @@ case class DescribeTableCommand(
   }
 }
 
+/**
+ * A command to list the info for a column, including name, data type, column stats and comment.
+ * This function creates a [[DescribeColumnCommand]] logical plan.
+ *
+ * The syntax of using this command in SQL is:
+ * {{{
+ *   DESCRIBE [EXTENDED|FORMATTED] table_name column_name;
+ * }}}
+ */
+case class DescribeColumnCommand(
+    table: TableIdentifier,
+    colNameParts: Seq[String],
+    isExtended: Boolean)
+  extends RunnableCommand {
+
+  override val output: Seq[Attribute] = {
+    Seq(
+      AttributeReference("info_name", StringType, nullable = false,
+        new MetadataBuilder().putString("comment", "name of the column info").build())(),
+      AttributeReference("info_value", StringType, nullable = false,
+        new MetadataBuilder().putString("comment", "value of the column info").build())()
+    )
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val catalog = sparkSession.sessionState.catalog
+    val resolver = sparkSession.sessionState.conf.resolver
+    val relation = sparkSession.table(table).queryExecution.analyzed
+
+    val colName = UnresolvedAttribute(colNameParts).name
+    val field = {
+      relation.resolve(colNameParts, resolver).getOrElse {
+        throw new AnalysisException(s"Column $colName does not exist")
+      }
+    }
+    if (!field.isInstanceOf[Attribute]) {
+      // If the field is not an attribute after `resolve`, then it's a nested field.
+      throw new AnalysisException(
+        s"DESC TABLE COLUMN command does not support nested data types: $colName")
+    }
+
+    val catalogTable = catalog.getTempViewOrPermanentTableMetadata(table)
+    val colStats = catalogTable.stats.map(_.colStats).getOrElse(Map.empty)
+    val cs = colStats.get(field.name)
+
+    val comment = if (field.metadata.contains("comment")) {
+      Option(field.metadata.getString("comment"))
+    } else {
+      None
+    }
+
+    val buffer = ArrayBuffer[Row](
+      Row("col_name", field.name),
+      Row("data_type", field.dataType.catalogString),
+      Row("comment", comment.getOrElse("NULL"))
+    )
+    if (isExtended) {
+      // Show column stats when EXTENDED or FORMATTED is specified.
+      buffer += Row("min", cs.flatMap(_.min.map(_.toString)).getOrElse("NULL"))
+      buffer += Row("max", cs.flatMap(_.max.map(_.toString)).getOrElse("NULL"))
+      buffer += Row("num_nulls", cs.map(_.nullCount.toString).getOrElse("NULL"))
+      buffer += Row("distinct_count", cs.map(_.distinctCount.toString).getOrElse("NULL"))
+      buffer += Row("avg_col_len", cs.map(_.avgLen.toString).getOrElse("NULL"))
+      buffer += Row("max_col_len", cs.map(_.maxLen.toString).getOrElse("NULL"))
+    }
+    buffer
+  }
+}
 
 /**
  * A command for users to get tables in the given database.

http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql b/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql
new file mode 100644
index 0000000..24870de
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/describe-table-column.sql
@@ -0,0 +1,35 @@
+-- Test temp table
+CREATE TEMPORARY VIEW desc_col_temp_table (key int COMMENT 'column_comment') USING PARQUET;
+
+DESC desc_col_temp_table key;
+
+DESC EXTENDED desc_col_temp_table key;
+
+DESC FORMATTED desc_col_temp_table key;
+
+-- Describe a column with qualified name
+DESC FORMATTED desc_col_temp_table desc_col_temp_table.key;
+
+-- Describe a non-existent column
+DESC desc_col_temp_table key1;
+
+-- Test persistent table
+CREATE TABLE desc_col_table (key int COMMENT 'column_comment') USING PARQUET;
+
+ANALYZE TABLE desc_col_table COMPUTE STATISTICS FOR COLUMNS key;
+
+DESC desc_col_table key;
+
+DESC EXTENDED desc_col_table key;
+
+DESC FORMATTED desc_col_table key;
+
+-- Test complex columns
+CREATE TABLE desc_col_complex_table (`a.b` int, col struct<x:int, y:string>) USING PARQUET;
+
+DESC FORMATTED desc_col_complex_table `a.b`;
+
+DESC FORMATTED desc_col_complex_table col;
+
+-- Describe a nested column
+DESC FORMATTED desc_col_complex_table col.x;

http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out
new file mode 100644
index 0000000..a51eef7
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/describe-table-column.sql.out
@@ -0,0 +1,184 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 15
+
+
+-- !query 0
+CREATE TEMPORARY VIEW desc_col_temp_table (key int COMMENT 'column_comment') USING PARQUET
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+DESC desc_col_temp_table key
+-- !query 1 schema
+struct<info_name:string,info_value:string>
+-- !query 1 output
+col_name	key
+data_type	int
+comment	column_comment
+
+
+-- !query 2
+DESC EXTENDED desc_col_temp_table key
+-- !query 2 schema
+struct<info_name:string,info_value:string>
+-- !query 2 output
+col_name	key
+data_type	int
+comment	column_comment
+min	NULL
+max	NULL
+num_nulls	NULL
+distinct_count	NULL
+avg_col_len	NULL
+max_col_len	NULL
+
+
+-- !query 3
+DESC FORMATTED desc_col_temp_table key
+-- !query 3 schema
+struct<info_name:string,info_value:string>
+-- !query 3 output
+col_name	key
+data_type	int
+comment	column_comment
+min	NULL
+max	NULL
+num_nulls	NULL
+distinct_count	NULL
+avg_col_len	NULL
+max_col_len	NULL
+
+
+-- !query 4
+DESC FORMATTED desc_col_temp_table desc_col_temp_table.key
+-- !query 4 schema
+struct<info_name:string,info_value:string>
+-- !query 4 output
+col_name	key
+data_type	int
+comment	column_comment
+min	NULL
+max	NULL
+num_nulls	NULL
+distinct_count	NULL
+avg_col_len	NULL
+max_col_len	NULL
+
+
+-- !query 5
+DESC desc_col_temp_table key1
+-- !query 5 schema
+struct<>
+-- !query 5 output
+org.apache.spark.sql.AnalysisException
+Column key1 does not exist;
+
+
+-- !query 6
+CREATE TABLE desc_col_table (key int COMMENT 'column_comment') USING PARQUET
+-- !query 6 schema
+struct<>
+-- !query 6 output
+
+
+
+-- !query 7
+ANALYZE TABLE desc_col_table COMPUTE STATISTICS FOR COLUMNS key
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+DESC desc_col_table key
+-- !query 8 schema
+struct<info_name:string,info_value:string>
+-- !query 8 output
+col_name	key
+data_type	int
+comment	column_comment
+
+
+-- !query 9
+DESC EXTENDED desc_col_table key
+-- !query 9 schema
+struct<info_name:string,info_value:string>
+-- !query 9 output
+col_name	key
+data_type	int
+comment	column_comment
+min	NULL
+max	NULL
+num_nulls	0
+distinct_count	0
+avg_col_len	4
+max_col_len	4
+
+
+-- !query 10
+DESC FORMATTED desc_col_table key
+-- !query 10 schema
+struct<info_name:string,info_value:string>
+-- !query 10 output
+col_name	key
+data_type	int
+comment	column_comment
+min	NULL
+max	NULL
+num_nulls	0
+distinct_count	0
+avg_col_len	4
+max_col_len	4
+
+
+-- !query 11
+CREATE TABLE desc_col_complex_table (`a.b` int, col struct<x:int, y:string>) USING PARQUET
+-- !query 11 schema
+struct<>
+-- !query 11 output
+
+
+
+-- !query 12
+DESC FORMATTED desc_col_complex_table `a.b`
+-- !query 12 schema
+struct<info_name:string,info_value:string>
+-- !query 12 output
+col_name	a.b
+data_type	int
+comment	NULL
+min	NULL
+max	NULL
+num_nulls	NULL
+distinct_count	NULL
+avg_col_len	NULL
+max_col_len	NULL
+
+
+-- !query 13
+DESC FORMATTED desc_col_complex_table col
+-- !query 13 schema
+struct<info_name:string,info_value:string>
+-- !query 13 output
+col_name	col
+data_type	struct<x:int,y:string>
+comment	NULL
+min	NULL
+max	NULL
+num_nulls	NULL
+distinct_count	NULL
+avg_col_len	NULL
+max_col_len	NULL
+
+
+-- !query 14
+DESC FORMATTED desc_col_complex_table col.x
+-- !query 14 schema
+struct<>
+-- !query 14 output
+org.apache.spark.sql.AnalysisException
+DESC TABLE COLUMN command does not support nested data types: col.x;

http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
index aa000bd..e3901af 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQueryTestSuite.scala
@@ -26,7 +26,7 @@ import org.apache.spark.sql.catalyst.planning.PhysicalOperation
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.rules.RuleExecutor
 import org.apache.spark.sql.catalyst.util.{fileToString, stringToFile}
-import org.apache.spark.sql.execution.command.DescribeTableCommand
+import org.apache.spark.sql.execution.command.{DescribeColumnCommand, DescribeTableCommand}
 import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.types.StructType
 
@@ -214,11 +214,11 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
   /** Executes a query and returns the result as (schema of the output, normalized output). */
   private def getNormalizedResult(session: SparkSession, sql: String): (StructType, Seq[String]) = {
     // Returns true if the plan is supposed to be sorted.
-    def needSort(plan: LogicalPlan): Boolean = plan match {
+    def isSorted(plan: LogicalPlan): Boolean = plan match {
       case _: Join | _: Aggregate | _: Generate | _: Sample | _: Distinct => false
-      case _: DescribeTableCommand => true
+      case _: DescribeTableCommand | _: DescribeColumnCommand => true
       case PhysicalOperation(_, _, Sort(_, true, _)) => true
-      case _ => plan.children.iterator.exists(needSort)
+      case _ => plan.children.iterator.exists(isSorted)
     }
 
     try {
@@ -233,7 +233,7 @@ class SQLQueryTestSuite extends QueryTest with SharedSQLContext {
         .replaceAll("Last Access.*", s"Last Access $notIncludedMsg"))
 
       // If the output is not pre-sorted, sort it.
-      if (needSort(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted)
+      if (isSorted(df.queryExecution.analyzed)) (schema, answer) else (schema, answer.sorted)
 
     } catch {
       case a: AnalysisException =>

http://git-wip-us.apache.org/repos/asf/spark/blob/515910e9/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index fa7a866..107a2f7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -249,8 +249,34 @@ class SparkSqlParserSuite extends AnalysisTest {
     assertEqual("describe table formatted t",
       DescribeTableCommand(
         TableIdentifier("t"), Map.empty, isExtended = true))
+  }
+
+  test("describe table column") {
+    assertEqual("DESCRIBE t col",
+      DescribeColumnCommand(
+        TableIdentifier("t"), Seq("col"), isExtended = false))
+    assertEqual("DESCRIBE t `abc.xyz`",
+      DescribeColumnCommand(
+        TableIdentifier("t"), Seq("abc.xyz"), isExtended = false))
+    assertEqual("DESCRIBE t abc.xyz",
+      DescribeColumnCommand(
+        TableIdentifier("t"), Seq("abc", "xyz"), isExtended = false))
+    assertEqual("DESCRIBE t `a.b`.`x.y`",
+      DescribeColumnCommand(
+        TableIdentifier("t"), Seq("a.b", "x.y"), isExtended = false))
+
+    assertEqual("DESCRIBE TABLE t col",
+      DescribeColumnCommand(
+        TableIdentifier("t"), Seq("col"), isExtended = false))
+    assertEqual("DESCRIBE TABLE EXTENDED t col",
+      DescribeColumnCommand(
+        TableIdentifier("t"), Seq("col"), isExtended = true))
+    assertEqual("DESCRIBE TABLE FORMATTED t col",
+      DescribeColumnCommand(
+        TableIdentifier("t"), Seq("col"), isExtended = true))
 
-    intercept("explain describe tables x", "Unsupported SQL statement")
+    intercept("DESCRIBE TABLE t PARTITION (ds='1970-01-01') col",
+      "DESC TABLE COLUMN for a specific partition is not supported")
   }
 
   test("analyze table statistics") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org