You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/09/28 22:03:18 UTC

spark git commit: [SPARK-25458][SQL] Support FOR ALL COLUMNS in ANALYZE TABLE

Repository: spark
Updated Branches:
  refs/heads/master b7d80349b -> 7deef7a49


[SPARK-25458][SQL] Support FOR ALL COLUMNS in ANALYZE TABLE

## What changes were proposed in this pull request?
**Description from the JIRA :**
Currently, to collect the statistics of all the columns, users need to specify the names of all the columns when calling the command "ANALYZE TABLE ... FOR COLUMNS...". This is not user friendly. Instead, we can introduce the following SQL command to achieve it without specifying the column names.

```
   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR ALL COLUMNS;
```

## How was this patch tested?
Added new tests in SparkSqlParserSuite and StatisticsSuite

Closes #22566 from dilipbiswal/SPARK-25458.

Authored-by: Dilip Biswal <db...@us.ibm.com>
Signed-off-by: gatorsmile <ga...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/7deef7a4
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/7deef7a4
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/7deef7a4

Branch: refs/heads/master
Commit: 7deef7a49b95c5de5af10419ece8c6a36d96ac61
Parents: b7d8034
Author: Dilip Biswal <db...@us.ibm.com>
Authored: Fri Sep 28 15:03:06 2018 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Sep 28 15:03:06 2018 -0700

----------------------------------------------------------------------
 .../apache/spark/sql/catalyst/parser/SqlBase.g4 |  2 +-
 .../spark/sql/execution/SparkSqlParser.scala    | 26 ++++++---
 .../command/AnalyzeColumnCommand.scala          | 61 ++++++++++++--------
 .../sql/execution/SparkSqlParserSuite.scala     | 14 ++++-
 .../apache/spark/sql/hive/StatisticsSuite.scala | 47 ++++++++++++++-
 5 files changed, 115 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/7deef7a4/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4 b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
index 94283f5..16665eb 100644
--- a/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
+++ b/sql/catalyst/src/main/antlr4/org/apache/spark/sql/catalyst/parser/SqlBase.g4
@@ -99,7 +99,7 @@ statement
     | CREATE TABLE (IF NOT EXISTS)? target=tableIdentifier
         LIKE source=tableIdentifier locationSpec?                      #createTableLike
     | ANALYZE TABLE tableIdentifier partitionSpec? COMPUTE STATISTICS
-        (identifier | FOR COLUMNS identifierSeq)?                      #analyze
+        (identifier | FOR COLUMNS identifierSeq | FOR ALL COLUMNS)?    #analyze
     | ALTER TABLE tableIdentifier
         ADD COLUMNS '(' columns=colTypeList ')'                        #addTableColumns
     | ALTER (TABLE | VIEW) from=tableIdentifier

http://git-wip-us.apache.org/repos/asf/spark/blob/7deef7a4/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index 89cb637..4ed14d3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -102,15 +102,29 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
    * {{{
    *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
    * }}}
+   *
+   * Example SQL for analyzing all columns of a table:
+   * {{{
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR ALL COLUMNS;
+   * }}}
    */
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
+    def checkPartitionSpec(): Unit = {
+      if (ctx.partitionSpec != null) {
+        logWarning("Partition specification is ignored when collecting column statistics: " +
+          ctx.partitionSpec.getText)
+      }
+    }
     if (ctx.identifier != null &&
         ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
       throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
     }
 
     val table = visitTableIdentifier(ctx.tableIdentifier)
-    if (ctx.identifierSeq() == null) {
+    if (ctx.ALL() != null) {
+      checkPartitionSpec()
+      AnalyzeColumnCommand(table, None, allColumns = true)
+    } else if (ctx.identifierSeq() == null) {
       if (ctx.partitionSpec != null) {
         AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec),
           noscan = ctx.identifier != null)
@@ -118,13 +132,9 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
         AnalyzeTableCommand(table, noscan = ctx.identifier != null)
       }
     } else {
-      if (ctx.partitionSpec != null) {
-        logWarning("Partition specification is ignored when collecting column statistics: " +
-          ctx.partitionSpec.getText)
-      }
-      AnalyzeColumnCommand(
-        table,
-        visitIdentifierSeq(ctx.identifierSeq()))
+      checkPartitionSpec()
+      AnalyzeColumnCommand(table,
+        Option(visitIdentifierSeq(ctx.identifierSeq())), allColumns = false)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/7deef7a4/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 3fea6d7..93447a5 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -33,13 +33,17 @@ import org.apache.spark.sql.types._
 
 /**
  * Analyzes the given columns of the given table to generate statistics, which will be used in
- * query optimizations.
+ * query optimizations. Parameter `allColumns` may be specified to generate statistics of all the
+ * columns of a given table.
  */
 case class AnalyzeColumnCommand(
     tableIdent: TableIdentifier,
-    columnNames: Seq[String]) extends RunnableCommand {
+    columnNames: Option[Seq[String]],
+    allColumns: Boolean) extends RunnableCommand {
 
   override def run(sparkSession: SparkSession): Seq[Row] = {
+    require((columnNames.isDefined ^ allColumns), "Parameter `columnNames` or `allColumns` are " +
+      "mutually exclusive. Only one of them should be specified.")
     val sessionState = sparkSession.sessionState
     val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
@@ -48,9 +52,12 @@ case class AnalyzeColumnCommand(
       throw new AnalysisException("ANALYZE TABLE is not supported on views.")
     }
     val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
+    val relation = sparkSession.table(tableIdent).logicalPlan
+    val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns)
 
-    // Compute stats for each column
-    val (rowCount, newColStats) = computeColumnStats(sparkSession, tableIdentWithDB, columnNames)
+    // Compute stats for the computed list of columns.
+    val (rowCount, newColStats) =
+      computeColumnStats(sparkSession, relation, columnsToAnalyze)
 
     // We also update table-level stats in order to keep them consistent with column-level stats.
     val statistics = CatalogStatistics(
@@ -64,31 +71,39 @@ case class AnalyzeColumnCommand(
     Seq.empty[Row]
   }
 
-  /**
-   * Compute stats for the given columns.
-   * @return (row count, map from column name to CatalogColumnStats)
-   */
-  private def computeColumnStats(
-      sparkSession: SparkSession,
+  private def getColumnsToAnalyze(
       tableIdent: TableIdentifier,
-      columnNames: Seq[String]): (Long, Map[String, CatalogColumnStat]) = {
-
-    val conf = sparkSession.sessionState.conf
-    val relation = sparkSession.table(tableIdent).logicalPlan
-    // Resolve the column names and dedup using AttributeSet
-    val attributesToAnalyze = columnNames.map { col =>
-      val exprOption = relation.output.find(attr => conf.resolver(attr.name, col))
-      exprOption.getOrElse(throw new AnalysisException(s"Column $col does not exist."))
+      relation: LogicalPlan,
+      columnNames: Option[Seq[String]],
+      allColumns: Boolean = false): Seq[Attribute] = {
+    val columnsToAnalyze = if (allColumns) {
+      relation.output
+    } else {
+      columnNames.get.map { col =>
+        val exprOption = relation.output.find(attr => conf.resolver(attr.name, col))
+        exprOption.getOrElse(throw new AnalysisException(s"Column $col does not exist."))
+      }
     }
-
     // Make sure the column types are supported for stats gathering.
-    attributesToAnalyze.foreach { attr =>
+    columnsToAnalyze.foreach { attr =>
       if (!supportsType(attr.dataType)) {
         throw new AnalysisException(
           s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
             "and Spark does not support statistics collection on this column type.")
       }
     }
+    columnsToAnalyze
+  }
+
+  /**
+   * Compute stats for the given columns.
+   * @return (row count, map from column name to CatalogColumnStats)
+   */
+  private def computeColumnStats(
+      sparkSession: SparkSession,
+      relation: LogicalPlan,
+      columns: Seq[Attribute]): (Long, Map[String, CatalogColumnStat]) = {
+    val conf = sparkSession.sessionState.conf
 
     // Collect statistics per column.
     // If no histogram is required, we run a job to compute basic column stats such as
@@ -99,20 +114,20 @@ case class AnalyzeColumnCommand(
     // 2. use the percentiles as value intervals of bins, e.g. [p(0), p(1/n)],
     // [p(1/n), p(2/n)], ..., [p((n-1)/n), p(1)], and then count ndv in each bin.
     // Basic column stats will be computed together in the second job.
-    val attributePercentiles = computePercentiles(attributesToAnalyze, sparkSession, relation)
+    val attributePercentiles = computePercentiles(columns, sparkSession, relation)
 
     // The first element in the result will be the overall row count, the following elements
     // will be structs containing all column stats.
     // The layout of each struct follows the layout of the ColumnStats.
     val expressions = Count(Literal(1)).toAggregateExpression() +:
-      attributesToAnalyze.map(statExprs(_, conf, attributePercentiles))
+      columns.map(statExprs(_, conf, attributePercentiles))
 
     val namedExpressions = expressions.map(e => Alias(e, e.toString)())
     val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation))
       .executedPlan.executeTake(1).head
 
     val rowCount = statsRow.getLong(0)
-    val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) =>
+    val columnStats = columns.zipWithIndex.map { case (attr, i) =>
       // according to `statExprs`, the stats struct always have 7 fields.
       (attr.name, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
         attributePercentiles.get(attr)).toCatalogColumnStat(attr.name, attr.dataType))

http://git-wip-us.apache.org/repos/asf/spark/blob/7deef7a4/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index 28a060a..31b9bcd 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -323,12 +323,22 @@ class SparkSqlParserSuite extends AnalysisTest {
     intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS", "")
 
     assertEqual("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS key, value",
-      AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value")))
+      AnalyzeColumnCommand(TableIdentifier("t"), Option(Seq("key", "value")), allColumns = false))
 
     // Partition specified - should be ignored
     assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " +
       "COMPUTE STATISTICS FOR COLUMNS key, value",
-      AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value")))
+      AnalyzeColumnCommand(TableIdentifier("t"), Option(Seq("key", "value")), allColumns = false))
+
+    // Partition specified should be ignored in case of COMPUTE STATISTICS FOR ALL COLUMNS
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " +
+      "COMPUTE STATISTICS FOR ALL COLUMNS",
+      AnalyzeColumnCommand(TableIdentifier("t"), None, allColumns = true))
+
+    intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR ALL COLUMNS key, value",
+      "mismatched input 'key' expecting <EOF>")
+    intercept("ANALYZE TABLE t COMPUTE STATISTICS FOR ALL",
+      "missing 'COLUMNS' at '<EOF>'")
   }
 
   test("query organization") {

http://git-wip-us.apache.org/repos/asf/spark/blob/7deef7a4/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index d8ffb29..57f1c24 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -32,7 +32,7 @@ import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, HistogramBin, HistogramSerializer}
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, StringUtils}
-import org.apache.spark.sql.execution.command.{CommandUtils, DDLUtils}
+import org.apache.spark.sql.execution.command.{AnalyzeColumnCommand, CommandUtils, DDLUtils}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.HiveExternalCatalog._
@@ -653,6 +653,51 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("collecting statistics for all columns") {
+    val table = "update_col_stats_table"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (c1 INT, c2 STRING, c3 DOUBLE)")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR ALL COLUMNS")
+      val fetchedStats0 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+      assert(fetchedStats0.get.colStats == Map(
+        "c1" -> CatalogColumnStat(distinctCount = Some(0), min = None, max = None,
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        "c3" -> CatalogColumnStat(distinctCount = Some(0), min = None, max = None,
+          nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+        "c2" -> CatalogColumnStat(distinctCount = Some(0), min = None, max = None,
+          nullCount = Some(0), avgLen = Some(20), maxLen = Some(20))))
+
+      // Insert new data and analyze: have the latest column stats.
+      sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
+      sql(s"INSERT INTO TABLE $table SELECT 1, 'b', null")
+
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR ALL COLUMNS")
+      val fetchedStats1 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
+      assert(fetchedStats1.get.colStats == Map(
+        "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        "c3" -> CatalogColumnStat(distinctCount = Some(1), min = Some("10.0"), max = Some("10.0"),
+          nullCount = Some(1), avgLen = Some(8), maxLen = Some(8)),
+        "c2" -> CatalogColumnStat(distinctCount = Some(2), min = None, max = None,
+          nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))))
+    }
+  }
+
+  test("analyze column command paramaters validation") {
+    val e1 = intercept[IllegalArgumentException] {
+      AnalyzeColumnCommand(TableIdentifier("test"), Option(Seq("c1")), true).run(spark)
+    }
+    assert(e1.getMessage.contains("Parameter `columnNames` or `allColumns` are" +
+      " mutually exclusive"))
+    val e2 = intercept[IllegalArgumentException] {
+      AnalyzeColumnCommand(TableIdentifier("test"), None, false).run(spark)
+    }
+    assert(e1.getMessage.contains("Parameter `columnNames` or `allColumns` are" +
+      " mutually exclusive"))
+  }
+
   private def createNonPartitionedTable(
       tabName: String,
       analyzedBySpark: Boolean = true,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org