You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2017/08/18 16:54:42 UTC

spark git commit: [SPARK-21213][SQL] Support collecting partition-level statistics: rowCount and sizeInBytes

Repository: spark
Updated Branches:
  refs/heads/master 07a2b8738 -> 23ea89808


[SPARK-21213][SQL] Support collecting partition-level statistics: rowCount and sizeInBytes

## What changes were proposed in this pull request?

Added support for ANALYZE TABLE [db_name].tablename PARTITION (partcol1[=val1], partcol2[=val2], ...) COMPUTE STATISTICS [NOSCAN] SQL command to calculate total number of rows and size in bytes for a subset of partitions. Calculated statistics are stored in Hive Metastore as user-defined properties attached to partition objects. Property names are the same as the ones used to store table-level statistics: spark.sql.statistics.totalSize and spark.sql.statistics.numRows.

When partition specification contains all partition columns with values, the command collects statistics for a single partition that matches the specification. When some partition columns are missing or listed without their values, the command collects statistics for all partitions which match a subset of partition column values specified.

For example, table t has 4 partitions with the following specs:

* Partition1: (ds='2008-04-08', hr=11)
* Partition2: (ds='2008-04-08', hr=12)
* Partition3: (ds='2008-04-09', hr=11)
* Partition4: (ds='2008-04-09', hr=12)

'ANALYZE TABLE t PARTITION (ds='2008-04-09', hr=11)' command will collect statistics only for partition 3.

'ANALYZE TABLE t PARTITION (ds='2008-04-09')' command will collect statistics for partitions 3 and 4.

'ANALYZE TABLE t PARTITION (ds, hr)' command will collect statistics for all four partitions.

When the optional parameter NOSCAN is specified, the command doesn't count number of rows and only gathers size in bytes.

The statistics gathered by ANALYZE TABLE command can be fetched using DESC EXTENDED [db_name.]tablename PARTITION command.

## How was this patch tested?

Added tests.

Author: Masha Basmanova <mb...@fb.com>

Closes #18421 from mbasmanova/mbasmanova-analyze-partition.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/23ea8980
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/23ea8980
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/23ea8980

Branch: refs/heads/master
Commit: 23ea8980809497d0372084adf5936602655e1685
Parents: 07a2b87
Author: Masha Basmanova <mb...@fb.com>
Authored: Fri Aug 18 09:54:39 2017 -0700
Committer: gatorsmile <ga...@gmail.com>
Committed: Fri Aug 18 09:54:39 2017 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  |   7 +-
 .../spark/sql/execution/SparkSqlParser.scala    |  36 ++-
 .../command/AnalyzePartitionCommand.scala       | 149 +++++++++++
 .../execution/command/AnalyzeTableCommand.scala |  28 +-
 .../sql/execution/command/CommandUtils.scala    |  27 +-
 .../inputs/describe-part-after-analyze.sql      |  34 +++
 .../results/describe-part-after-analyze.sql.out | 244 ++++++++++++++++++
 .../sql/execution/SparkSqlParserSuite.scala     |  33 ++-
 .../spark/sql/hive/HiveExternalCatalog.scala    | 169 ++++++++----
 .../spark/sql/hive/client/HiveClientImpl.scala  |   2 +
 .../apache/spark/sql/hive/StatisticsSuite.scala | 254 +++++++++++++++++++
 11 files changed, 888 insertions(+), 95 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 5a8c4e7..1965144 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -91,12 +91,14 @@ object CatalogStorageFormat {
  *
  * @param spec partition spec values indexed by column name
  * @param storage storage format of the partition
- * @param parameters some parameters for the partition, for example, stats.
+ * @param parameters some parameters for the partition
+ * @param stats optional statistics (number of rows, total size, etc.)
  */
 case class CatalogTablePartition(
     spec: CatalogTypes.TablePartitionSpec,
     storage: CatalogStorageFormat,
-    parameters: Map[String, String] = Map.empty) {
+    parameters: Map[String, String] = Map.empty,
+    stats: Option[CatalogStatistics] = None) {
 
   def toLinkedHashMap: mutable.LinkedHashMap[String, String] = {
     val map = new mutable.LinkedHashMap[String, String]()
@@ -106,6 +108,7 @@ case class CatalogTablePartition(
     if (parameters.nonEmpty) {
       map.put("Partition Parameters", s"{${parameters.map(p => p._1 + "=" + p._2).mkString(", ")}}")
     }
+    stats.foreach(s => map.put("Partition Statistics", s.simpleString))
     map
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
index d4414b6..8379e74 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlParser.scala
@@ -90,30 +90,40 @@ class SparkSqlAstBuilder(conf: SQLConf) extends AstBuilder(conf) {
   }
 
   /**
-   * Create an [[AnalyzeTableCommand]] command or an [[AnalyzeColumnCommand]] command.
-   * Example SQL for analyzing table :
+   * Create an [[AnalyzeTableCommand]] command, or an [[AnalyzePartitionCommand]]
+   * or an [[AnalyzeColumnCommand]] command.
+   * Example SQL for analyzing a table or a set of partitions :
    * {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS [NOSCAN];
+   *   ANALYZE TABLE [db_name.]tablename [PARTITION (partcol1[=val1], partcol2[=val2], ...)]
+   *   COMPUTE STATISTICS [NOSCAN];
    * }}}
+   *
    * Example SQL for analyzing columns :
    * {{{
-   *   ANALYZE TABLE table COMPUTE STATISTICS FOR COLUMNS column1, column2;
+   *   ANALYZE TABLE [db_name.]tablename COMPUTE STATISTICS FOR COLUMNS column1, column2;
    * }}}
    */
   override def visitAnalyze(ctx: AnalyzeContext): LogicalPlan = withOrigin(ctx) {
-    if (ctx.partitionSpec != null) {
-      logWarning(s"Partition specification is ignored: ${ctx.partitionSpec.getText}")
+    if (ctx.identifier != null &&
+        ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
+      throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
     }
-    if (ctx.identifier != null) {
-      if (ctx.identifier.getText.toLowerCase(Locale.ROOT) != "noscan") {
-        throw new ParseException(s"Expected `NOSCAN` instead of `${ctx.identifier.getText}`", ctx)
+
+    val table = visitTableIdentifier(ctx.tableIdentifier)
+    if (ctx.identifierSeq() == null) {
+      if (ctx.partitionSpec != null) {
+        AnalyzePartitionCommand(table, visitPartitionSpec(ctx.partitionSpec),
+          noscan = ctx.identifier != null)
+      } else {
+        AnalyzeTableCommand(table, noscan = ctx.identifier != null)
       }
-      AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier))
-    } else if (ctx.identifierSeq() == null) {
-      AnalyzeTableCommand(visitTableIdentifier(ctx.tableIdentifier), noscan = false)
     } else {
+      if (ctx.partitionSpec != null) {
+        logWarning("Partition specification is ignored when collecting column statistics: " +
+          ctx.partitionSpec.getText)
+      }
       AnalyzeColumnCommand(
-        visitTableIdentifier(ctx.tableIdentifier),
+        table,
         visitIdentifierSeq(ctx.identifierSeq()))
     }
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
new file mode 100644
index 0000000..5b54b22
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzePartitionCommand.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.command
+
+import org.apache.spark.sql.{AnalysisException, Column, Row, SparkSession}
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.{NoSuchPartitionException, UnresolvedAttribute}
+import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.CatalogTypes.TablePartitionSpec
+import org.apache.spark.sql.catalyst.expressions.{And, EqualTo, Literal}
+import org.apache.spark.sql.execution.datasources.PartitioningUtils
+
+/**
+ * Analyzes a given set of partitions to generate per-partition statistics, which will be used in
+ * query optimizations.
+ *
+ * When `partitionSpec` is empty, statistics for all partitions are collected and stored in
+ * Metastore.
+ *
+ * When `partitionSpec` mentions only some of the partition columns, all partitions with
+ * matching values for specified columns are processed.
+ *
+ * If `partitionSpec` mentions unknown partition column, an `AnalysisException` is raised.
+ *
+ * By default, total number of rows and total size in bytes are calculated. When `noscan`
+ * is `true`, only total size in bytes is computed.
+ */
+case class AnalyzePartitionCommand(
+    tableIdent: TableIdentifier,
+    partitionSpec: Map[String, Option[String]],
+    noscan: Boolean = true) extends RunnableCommand {
+
+  private def getPartitionSpec(table: CatalogTable): Option[TablePartitionSpec] = {
+    val normalizedPartitionSpec =
+      PartitioningUtils.normalizePartitionSpec(partitionSpec, table.partitionColumnNames,
+        table.identifier.quotedString, conf.resolver)
+
+    // Report an error if partition columns in partition specification do not form
+    // a prefix of the list of partition columns defined in the table schema
+    val isNotSpecified =
+      table.partitionColumnNames.map(normalizedPartitionSpec.getOrElse(_, None).isEmpty)
+    if (isNotSpecified.init.zip(isNotSpecified.tail).contains((true, false))) {
+      val tableId = table.identifier
+      val schemaColumns = table.partitionColumnNames.mkString(",")
+      val specColumns = normalizedPartitionSpec.keys.mkString(",")
+      throw new AnalysisException("The list of partition columns with values " +
+        s"in partition specification for table '${tableId.table}' " +
+        s"in database '${tableId.database.get}' is not a prefix of the list of " +
+        "partition columns defined in the table schema. " +
+        s"Expected a prefix of [${schemaColumns}], but got [${specColumns}].")
+    }
+
+    val filteredSpec = normalizedPartitionSpec.filter(_._2.isDefined).mapValues(_.get)
+    if (filteredSpec.isEmpty) {
+      None
+    } else {
+      Some(filteredSpec)
+    }
+  }
+
+  override def run(sparkSession: SparkSession): Seq[Row] = {
+    val sessionState = sparkSession.sessionState
+    val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
+    val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
+    val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
+    if (tableMeta.tableType == CatalogTableType.VIEW) {
+      throw new AnalysisException("ANALYZE TABLE is not supported on views.")
+    }
+
+    val partitionValueSpec = getPartitionSpec(tableMeta)
+
+    val partitions = sessionState.catalog.listPartitions(tableMeta.identifier, partitionValueSpec)
+
+    if (partitions.isEmpty) {
+      if (partitionValueSpec.isDefined) {
+        throw new NoSuchPartitionException(db, tableIdent.table, partitionValueSpec.get)
+      } else {
+        // the user requested to analyze all partitions for a table which has no partitions
+        // return normally, since there is nothing to do
+        return Seq.empty[Row]
+      }
+    }
+
+    // Compute statistics for individual partitions
+    val rowCounts: Map[TablePartitionSpec, BigInt] =
+      if (noscan) {
+        Map.empty
+      } else {
+        calculateRowCountsPerPartition(sparkSession, tableMeta, partitionValueSpec)
+      }
+
+    // Update the metastore if newly computed statistics are different from those
+    // recorded in the metastore.
+    val newPartitions = partitions.flatMap { p =>
+      val newTotalSize = CommandUtils.calculateLocationSize(
+        sessionState, tableMeta.identifier, p.storage.locationUri)
+      val newRowCount = rowCounts.get(p.spec)
+      val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
+      newStats.map(_ => p.copy(stats = newStats))
+    }
+
+    if (newPartitions.nonEmpty) {
+      sessionState.catalog.alterPartitions(tableMeta.identifier, newPartitions)
+    }
+
+    Seq.empty[Row]
+  }
+
+  private def calculateRowCountsPerPartition(
+      sparkSession: SparkSession,
+      tableMeta: CatalogTable,
+      partitionValueSpec: Option[TablePartitionSpec]): Map[TablePartitionSpec, BigInt] = {
+    val filter = if (partitionValueSpec.isDefined) {
+      val filters = partitionValueSpec.get.map {
+        case (columnName, value) => EqualTo(UnresolvedAttribute(columnName), Literal(value))
+      }
+      filters.reduce(And)
+    } else {
+      Literal.TrueLiteral
+    }
+
+    val tableDf = sparkSession.table(tableMeta.identifier)
+    val partitionColumns = tableMeta.partitionColumnNames.map(Column(_))
+
+    val df = tableDf.filter(Column(filter)).groupBy(partitionColumns: _*).count()
+
+    df.collect().map { r =>
+      val partitionColumnValues = partitionColumns.indices.map(r.get(_).toString)
+      val spec = tableMeta.partitionColumnNames.zip(partitionColumnValues).toMap
+      val count = BigInt(r.getLong(partitionColumns.size))
+      (spec, count)
+    }.toMap
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index cba147c..04715bd 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -19,7 +19,7 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
+import org.apache.spark.sql.catalyst.catalog.CatalogTableType
 
 
 /**
@@ -37,31 +37,15 @@ case class AnalyzeTableCommand(
     if (tableMeta.tableType == CatalogTableType.VIEW) {
       throw new AnalysisException("ANALYZE TABLE is not supported on views.")
     }
+
+    // Compute stats for the whole table
     val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
+    val newRowCount =
+      if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
 
-    val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
-    val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
-    var newStats: Option[CatalogStatistics] = None
-    if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
-      newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
-    }
-    // We only set rowCount when noscan is false, because otherwise:
-    // 1. when total size is not changed, we don't need to alter the table;
-    // 2. when total size is changed, `oldRowCount` becomes invalid.
-    // This is to make sure that we only record the right statistics.
-    if (!noscan) {
-      val newRowCount = sparkSession.table(tableIdentWithDB).count()
-      if (newRowCount >= 0 && newRowCount != oldRowCount) {
-        newStats = if (newStats.isDefined) {
-          newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
-        } else {
-          Some(CatalogStatistics(
-            sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
-        }
-      }
-    }
     // Update the metastore if the above statistics of the table are different from those
     // recorded in the metastore.
+    val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
     if (newStats.isDefined) {
       sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
       // Refresh the cached data source table in the catalog.

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index de45be8..b22958d 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -26,7 +26,7 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.SparkSession
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, CatalogTablePartition}
 import org.apache.spark.sql.internal.SessionState
 
 
@@ -112,4 +112,29 @@ object CommandUtils extends Logging {
     size
   }
 
+  def compareAndGetNewStats(
+      oldStats: Option[CatalogStatistics],
+      newTotalSize: BigInt,
+      newRowCount: Option[BigInt]): Option[CatalogStatistics] = {
+    val oldTotalSize = oldStats.map(_.sizeInBytes.toLong).getOrElse(-1L)
+    val oldRowCount = oldStats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
+    var newStats: Option[CatalogStatistics] = None
+    if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
+      newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
+    }
+    // We only set rowCount when noscan is false, because otherwise:
+    // 1. when total size is not changed, we don't need to alter the table;
+    // 2. when total size is changed, `oldRowCount` becomes invalid.
+    // This is to make sure that we only record the right statistics.
+    if (newRowCount.isDefined) {
+      if (newRowCount.get >= 0 && newRowCount.get != oldRowCount) {
+        newStats = if (newStats.isDefined) {
+          newStats.map(_.copy(rowCount = newRowCount))
+        } else {
+          Some(CatalogStatistics(sizeInBytes = oldTotalSize, rowCount = newRowCount))
+        }
+      }
+    }
+    newStats
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql
new file mode 100644
index 0000000..f4239da
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/inputs/describe-part-after-analyze.sql
@@ -0,0 +1,34 @@
+CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet
+    PARTITIONED BY (ds, hr);
+
+INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10)
+VALUES ('k1', 100), ('k2', 200), ('k3', 300);
+
+INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11)
+VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401);
+
+INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5)
+VALUES ('k1', 102), ('k2', 202);
+
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
+
+-- Collect stats for a single partition
+ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS;
+
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
+
+-- Collect stats for 2 partitions
+ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS;
+
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11);
+
+-- Collect stats for all partitions
+ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS;
+
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10);
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11);
+DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5);
+
+-- DROP TEST TABLES/VIEWS
+DROP TABLE t;

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
----------------------------------------------------------------------
diff --git a/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
new file mode 100644
index 0000000..51dac11
--- /dev/null
+++ b/sql/core/src/test/resources/sql-tests/results/describe-part-after-analyze.sql.out
@@ -0,0 +1,244 @@
+-- Automatically generated by SQLQueryTestSuite
+-- Number of queries: 15
+
+
+-- !query 0
+CREATE TABLE t (key STRING, value STRING, ds STRING, hr INT) USING parquet
+    PARTITIONED BY (ds, hr)
+-- !query 0 schema
+struct<>
+-- !query 0 output
+
+
+
+-- !query 1
+INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=10)
+VALUES ('k1', 100), ('k2', 200), ('k3', 300)
+-- !query 1 schema
+struct<>
+-- !query 1 output
+
+
+
+-- !query 2
+INSERT INTO TABLE t PARTITION (ds='2017-08-01', hr=11)
+VALUES ('k1', 101), ('k2', 201), ('k3', 301), ('k4', 401)
+-- !query 2 schema
+struct<>
+-- !query 2 output
+
+
+
+-- !query 3
+INSERT INTO TABLE t PARTITION (ds='2017-09-01', hr=5)
+VALUES ('k1', 102), ('k2', 202)
+-- !query 3 schema
+struct<>
+-- !query 3 output
+
+
+
+-- !query 4
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10)
+-- !query 4 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 4 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=10]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 5
+ANALYZE TABLE t PARTITION (ds='2017-08-01', hr=10) COMPUTE STATISTICS
+-- !query 5 schema
+struct<>
+-- !query 5 output
+
+
+
+-- !query 6
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10)
+-- !query 6 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 6 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=10]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10	                    
+Partition Statistics	1067 bytes, 3 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 7
+ANALYZE TABLE t PARTITION (ds='2017-08-01') COMPUTE STATISTICS
+-- !query 7 schema
+struct<>
+-- !query 7 output
+
+
+
+-- !query 8
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10)
+-- !query 8 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 8 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=10]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10	                    
+Partition Statistics	1067 bytes, 3 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 9
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11)
+-- !query 9 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 9 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=11]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11	                    
+Partition Statistics	1080 bytes, 4 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 10
+ANALYZE TABLE t PARTITION (ds, hr) COMPUTE STATISTICS
+-- !query 10 schema
+struct<>
+-- !query 10 output
+
+
+
+-- !query 11
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=10)
+-- !query 11 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 11 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=10]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=10	                    
+Partition Statistics	1067 bytes, 3 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 12
+DESC EXTENDED t PARTITION (ds='2017-08-01', hr=11)
+-- !query 12 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 12 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-08-01, hr=11]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-08-01/hr=11	                    
+Partition Statistics	1080 bytes, 4 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 13
+DESC EXTENDED t PARTITION (ds='2017-09-01', hr=5)
+-- !query 13 schema
+struct<col_name:string,data_type:string,comment:string>
+-- !query 13 output
+key                 	string              	                    
+value               	string              	                    
+ds                  	string              	                    
+hr                  	int                 	                    
+# Partition Information	                    	                    
+# col_name          	data_type           	comment             
+ds                  	string              	                    
+hr                  	int                 	                    
+                    	                    	                    
+# Detailed Partition Information	                    	                    
+Database            	default             	                    
+Table               	t                   	                    
+Partition Values    	[ds=2017-09-01, hr=5]	                    
+Location [not included in comparison]sql/core/spark-warehouse/t/ds=2017-09-01/hr=5	                    
+Partition Statistics	1054 bytes, 2 rows  	                    
+                    	                    	                    
+# Storage Information	                    	                    
+Location [not included in comparison]sql/core/spark-warehouse/t
+
+
+-- !query 14
+DROP TABLE t
+-- !query 14 schema
+struct<>
+-- !query 14 output
+

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
index d238c76..fa7a866 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/SparkSqlParserSuite.scala
@@ -259,17 +259,33 @@ class SparkSqlParserSuite extends AnalysisTest {
     assertEqual("analyze table t compute statistics noscan",
       AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
     assertEqual("analyze table t partition (a) compute statistics nOscAn",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+      AnalyzePartitionCommand(TableIdentifier("t"), Map("a" -> None), noscan = true))
 
-    // Partitions specified - we currently parse them but don't do anything with it
+    // Partitions specified
     assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
+        partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
     assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr=11) COMPUTE STATISTICS noscan",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> Some("11"))))
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09') COMPUTE STATISTICS noscan",
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> Some("2008-04-09"))))
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS",
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
+        partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2008-04-09', hr) COMPUTE STATISTICS noscan",
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> Some("2008-04-09"), "hr" -> None)))
+    assertEqual("ANALYZE TABLE t PARTITION(ds, hr=11) COMPUTE STATISTICS noscan",
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> None, "hr" -> Some("11"))))
     assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = false))
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = false,
+        partitionSpec = Map("ds" -> None, "hr" -> None)))
     assertEqual("ANALYZE TABLE t PARTITION(ds, hr) COMPUTE STATISTICS noscan",
-      AnalyzeTableCommand(TableIdentifier("t"), noscan = true))
+      AnalyzePartitionCommand(TableIdentifier("t"), noscan = true,
+        partitionSpec = Map("ds" -> None, "hr" -> None)))
 
     intercept("analyze table t compute statistics xxxx",
       "Expected `NOSCAN` instead of `xxxx`")
@@ -282,6 +298,11 @@ class SparkSqlParserSuite extends AnalysisTest {
 
     assertEqual("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS key, value",
       AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value")))
+
+    // Partition specified - should be ignored
+    assertEqual("ANALYZE TABLE t PARTITION(ds='2017-06-10') " +
+      "COMPUTE STATISTICS FOR COLUMNS key, value",
+      AnalyzeColumnCommand(TableIdentifier("t"), Seq("key", "value")))
   }
 
   test("query organization") {

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index e9d48f9..547447b 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -639,26 +639,17 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     requireTableExists(db, table)
     val rawTable = getRawTable(db, table)
 
-    // convert table statistics to properties so that we can persist them through hive client
-    val statsProperties = new mutable.HashMap[String, String]()
-    if (stats.isDefined) {
-      statsProperties += STATISTICS_TOTAL_SIZE -> stats.get.sizeInBytes.toString()
-      if (stats.get.rowCount.isDefined) {
-        statsProperties += STATISTICS_NUM_ROWS -> stats.get.rowCount.get.toString()
-      }
-
-      // For datasource tables and hive serde tables created by spark 2.1 or higher,
-      // the data schema is stored in the table properties.
-      val schema = restoreTableMetadata(rawTable).schema
+    // For datasource tables and hive serde tables created by spark 2.1 or higher,
+    // the data schema is stored in the table properties.
+    val schema = restoreTableMetadata(rawTable).schema
 
-      val colNameTypeMap: Map[String, DataType] =
-        schema.fields.map(f => (f.name, f.dataType)).toMap
-      stats.get.colStats.foreach { case (colName, colStat) =>
-        colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
-          statsProperties += (columnStatKeyPropName(colName, k) -> v)
-        }
+    // convert table statistics to properties so that we can persist them through hive client
+    var statsProperties =
+      if (stats.isDefined) {
+        statsToProperties(stats.get, schema)
+      } else {
+        new mutable.HashMap[String, String]()
       }
-    }
 
     val oldTableNonStatsProps = rawTable.properties.filterNot(_._1.startsWith(STATISTICS_PREFIX))
     val updatedTable = rawTable.copy(properties = oldTableNonStatsProps ++ statsProperties)
@@ -704,36 +695,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     val version: String = table.properties.getOrElse(CREATED_SPARK_VERSION, "2.2 or prior")
 
     // Restore Spark's statistics from information in Metastore.
-    val statsProps = table.properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
-
-    // Currently we have two sources of statistics: one from Hive and the other from Spark.
-    // In our design, if Spark's statistics is available, we respect it over Hive's statistics.
-    if (statsProps.nonEmpty) {
-      val colStats = new mutable.HashMap[String, ColumnStat]
-
-      // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
-      // but given the number of columns it usually not enormous, this is probably OK as a start.
-      // If we want to map this a linear operation, we'd need a stronger contract between the
-      // naming convention used for serialization.
-      table.schema.foreach { field =>
-        if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) {
-          // If "version" field is defined, then the column stat is defined.
-          val keyPrefix = columnStatKeyPropName(field.name, "")
-          val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
-            (k.drop(keyPrefix.length), v)
-          }
-
-          ColumnStat.fromMap(table.identifier.table, field, colStatMap).foreach {
-            colStat => colStats += field.name -> colStat
-          }
-        }
-      }
-
-      table = table.copy(
-        stats = Some(CatalogStatistics(
-          sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)),
-          rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
-          colStats = colStats.toMap)))
+    val restoredStats =
+      statsFromProperties(table.properties, table.identifier.table, table.schema)
+    if (restoredStats.isDefined) {
+      table = table.copy(stats = restoredStats)
     }
 
     // Get the original table properties as defined by the user.
@@ -1037,17 +1002,92 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     currentFullPath
   }
 
+  private def statsToProperties(
+      stats: CatalogStatistics,
+      schema: StructType): Map[String, String] = {
+
+    var statsProperties: Map[String, String] =
+      Map(STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString())
+    if (stats.rowCount.isDefined) {
+      statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
+    }
+
+    val colNameTypeMap: Map[String, DataType] =
+      schema.fields.map(f => (f.name, f.dataType)).toMap
+    stats.colStats.foreach { case (colName, colStat) =>
+      colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
+        statsProperties += (columnStatKeyPropName(colName, k) -> v)
+      }
+    }
+
+    statsProperties
+  }
+
+  private def statsFromProperties(
+      properties: Map[String, String],
+      table: String,
+      schema: StructType): Option[CatalogStatistics] = {
+
+    val statsProps = properties.filterKeys(_.startsWith(STATISTICS_PREFIX))
+    if (statsProps.isEmpty) {
+      None
+    } else {
+
+      val colStats = new mutable.HashMap[String, ColumnStat]
+
+      // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
+      // but given the number of columns it usually not enormous, this is probably OK as a start.
+      // If we want to map this a linear operation, we'd need a stronger contract between the
+      // naming convention used for serialization.
+      schema.foreach { field =>
+        if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) {
+          // If "version" field is defined, then the column stat is defined.
+          val keyPrefix = columnStatKeyPropName(field.name, "")
+          val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
+            (k.drop(keyPrefix.length), v)
+          }
+
+          ColumnStat.fromMap(table, field, colStatMap).foreach {
+            colStat => colStats += field.name -> colStat
+          }
+        }
+      }
+
+      Some(CatalogStatistics(
+        sizeInBytes = BigInt(statsProps(STATISTICS_TOTAL_SIZE)),
+        rowCount = statsProps.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
+        colStats = colStats.toMap))
+    }
+  }
+
   override def alterPartitions(
       db: String,
       table: String,
       newParts: Seq[CatalogTablePartition]): Unit = withClient {
     val lowerCasedParts = newParts.map(p => p.copy(spec = lowerCasePartitionSpec(p.spec)))
+
+    val rawTable = getRawTable(db, table)
+
+    // For datasource tables and hive serde tables created by spark 2.1 or higher,
+    // the data schema is stored in the table properties.
+    val schema = restoreTableMetadata(rawTable).schema
+
+    // convert partition statistics to properties so that we can persist them through hive api
+    val withStatsProps = lowerCasedParts.map(p => {
+      if (p.stats.isDefined) {
+        val statsProperties = statsToProperties(p.stats.get, schema)
+        p.copy(parameters = p.parameters ++ statsProperties)
+      } else {
+        p
+      }
+    })
+
     // Note: Before altering table partitions in Hive, you *must* set the current database
     // to the one that contains the table of interest. Otherwise you will end up with the
     // most helpful error message ever: "Unable to alter partition. alter is not possible."
     // See HIVE-2742 for more detail.
     client.setCurrentDatabase(db)
-    client.alterPartitions(db, table, lowerCasedParts)
+    client.alterPartitions(db, table, withStatsProps)
   }
 
   override def getPartition(
@@ -1055,7 +1095,34 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       spec: TablePartitionSpec): CatalogTablePartition = withClient {
     val part = client.getPartition(db, table, lowerCasePartitionSpec(spec))
-    part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+    restorePartitionMetadata(part, getTable(db, table))
+  }
+
+  /**
+   * Restores partition metadata from the partition properties.
+   *
+   * Reads partition-level statistics from partition properties, puts these
+   * into [[CatalogTablePartition#stats]] and removes these special entries
+   * from the partition properties.
+   */
+  private def restorePartitionMetadata(
+      partition: CatalogTablePartition,
+      table: CatalogTable): CatalogTablePartition = {
+    val restoredSpec = restorePartitionSpec(partition.spec, table.partitionColumnNames)
+
+    // Restore Spark's statistics from information in Metastore.
+    // Note: partition-level statistics were introduced in 2.3.
+    val restoredStats =
+      statsFromProperties(partition.parameters, table.identifier.table, table.schema)
+    if (restoredStats.isDefined) {
+      partition.copy(
+        spec = restoredSpec,
+        stats = restoredStats,
+        parameters = partition.parameters.filterNot {
+          case (key, _) => key.startsWith(SPARK_SQL_PREFIX) })
+    } else {
+      partition.copy(spec = restoredSpec)
+    }
   }
 
   /**
@@ -1066,7 +1133,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       table: String,
       spec: TablePartitionSpec): Option[CatalogTablePartition] = withClient {
     client.getPartitionOption(db, table, lowerCasePartitionSpec(spec)).map { part =>
-      part.copy(spec = restorePartitionSpec(part.spec, getTable(db, table).partitionColumnNames))
+      restorePartitionMetadata(part, getTable(db, table))
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
index 5e5c0a2..995280e 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/client/HiveClientImpl.scala
@@ -21,6 +21,7 @@ import java.io.{File, PrintStream}
 import java.util.Locale
 
 import scala.collection.JavaConverters._
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 
 import org.apache.hadoop.conf.Configuration
@@ -960,6 +961,7 @@ private[hive] object HiveClientImpl {
     tpart.setTableName(ht.getTableName)
     tpart.setValues(partValues.asJava)
     tpart.setSd(storageDesc)
+    tpart.setParameters(mutable.Map(p.parameters.toSeq: _*).asJava)
     new HivePartition(ht, tpart)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/23ea8980/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 71cf79c..dc61407 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.StringUtils
@@ -256,6 +257,259 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("analyze single partition") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String): CatalogStatistics = {
+      val partition =
+        spark.sessionState.catalog.getPartition(TableIdentifier(tableName), Map("ds" -> ds))
+      partition.stats.get
+    }
+
+    def createPartition(ds: String, query: String): Unit = {
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds') $query")
+    }
+
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+
+      createPartition("2010-01-01", "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+      createPartition("2010-01-03", "SELECT '1', 'A' from src")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN")
+
+      assert(queryStats("2010-01-01").rowCount === None)
+      assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+      assert(queryStats("2010-01-02").rowCount === None)
+      assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS")
+
+      assert(queryStats("2010-01-01").rowCount.get === 500)
+      assert(queryStats("2010-01-01").sizeInBytes === 2000)
+
+      assert(queryStats("2010-01-02").rowCount.get === 2*500)
+      assert(queryStats("2010-01-02").sizeInBytes === 2*2000)
+    }
+  }
+
+  test("analyze a set of partitions") {
+    val tableName = "analyzeTable_part"
+
+    def queryStats(ds: String, hr: String): Option[CatalogStatistics] = {
+      val tableId = TableIdentifier(tableName)
+      val partition =
+        spark.sessionState.catalog.getPartition(tableId, Map("ds" -> ds, "hr" -> hr))
+      partition.stats
+    }
+
+    def assertPartitionStats(
+        ds: String,
+        hr: String,
+        rowCount: Option[BigInt],
+        sizeInBytes: BigInt): Unit = {
+      val stats = queryStats(ds, hr).get
+      assert(stats.rowCount === rowCount)
+      assert(stats.sizeInBytes === sizeInBytes)
+    }
+
+    def createPartition(ds: String, hr: Int, query: String): Unit = {
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query")
+    }
+
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)")
+
+      createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+      createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", 11,
+        "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS NOSCAN")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000)
+      assert(queryStats("2010-01-02", "10") === None)
+      assert(queryStats("2010-01-02", "11") === None)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS NOSCAN")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-01') COMPUTE STATISTICS")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds='2010-01-02') COMPUTE STATISTICS")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "10", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = Some(2*500), sizeInBytes = 2*2000)
+    }
+  }
+
+  test("analyze all partitions") {
+    val tableName = "analyzeTable_part"
+
+    def assertPartitionStats(
+        ds: String,
+        hr: String,
+        rowCount: Option[BigInt],
+        sizeInBytes: BigInt): Unit = {
+      val stats = spark.sessionState.catalog.getPartition(TableIdentifier(tableName),
+        Map("ds" -> ds, "hr" -> hr)).stats.get
+      assert(stats.rowCount === rowCount)
+      assert(stats.sizeInBytes === sizeInBytes)
+    }
+
+    def createPartition(ds: String, hr: Int, query: String): Unit = {
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='$ds', hr=$hr) $query")
+    }
+
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING, hr INT)")
+
+      createPartition("2010-01-01", 10, "SELECT '1', 'A' from src")
+      createPartition("2010-01-01", 11, "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", 10, "SELECT '1', 'A' from src")
+      createPartition("2010-01-02", 11,
+        "SELECT '1', 'A' from src UNION ALL SELECT '1', 'A' from src")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds, hr) COMPUTE STATISTICS NOSCAN")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = None, sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = None, sizeInBytes = 2*2000)
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds, hr) COMPUTE STATISTICS")
+
+      assertPartitionStats("2010-01-01", "10", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-01", "11", rowCount = Some(500), sizeInBytes = 2000)
+      assertPartitionStats("2010-01-02", "11", rowCount = Some(2*500), sizeInBytes = 2*2000)
+    }
+  }
+
+  test("analyze partitions for an empty table") {
+    val tableName = "analyzeTable_part"
+
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+
+      // make sure there is no exception
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds) COMPUTE STATISTICS NOSCAN")
+
+      // make sure there is no exception
+      sql(s"ANALYZE TABLE $tableName PARTITION (ds) COMPUTE STATISTICS")
+    }
+  }
+
+  test("analyze partitions case sensitivity") {
+    val tableName = "analyzeTable_part"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src")
+
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "false") {
+        sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS")
+      }
+
+      withSQLConf(SQLConf.CASE_SENSITIVE.key -> "true") {
+        val message = intercept[AnalysisException] {
+          sql(s"ANALYZE TABLE $tableName PARTITION (DS='2010-01-01') COMPUTE STATISTICS")
+        }.getMessage
+        assert(message.contains(
+          s"DS is not a valid partition column in table `default`.`${tableName.toLowerCase}`"))
+      }
+    }
+  }
+
+  test("analyze partial partition specifications") {
+
+    val tableName = "analyzeTable_part"
+
+    def assertAnalysisException(partitionSpec: String): Unit = {
+      val message = intercept[AnalysisException] {
+        sql(s"ANALYZE TABLE $tableName $partitionSpec COMPUTE STATISTICS")
+      }.getMessage
+      assert(message.contains("The list of partition columns with values " +
+        s"in partition specification for table '${tableName.toLowerCase}' in database 'default' " +
+        "is not a prefix of the list of partition columns defined in the table schema"))
+    }
+
+    withTable(tableName) {
+      sql(
+        s"""
+           |CREATE TABLE $tableName (key STRING, value STRING)
+           |PARTITIONED BY (a STRING, b INT, c STRING)
+         """.stripMargin)
+
+      sql(s"INSERT INTO TABLE $tableName PARTITION (a='a1', b=10, c='c1') SELECT * FROM src")
+
+      sql(s"ANALYZE TABLE $tableName PARTITION (a='a1') COMPUTE STATISTICS")
+      sql(s"ANALYZE TABLE $tableName PARTITION (a='a1', b=10) COMPUTE STATISTICS")
+      sql(s"ANALYZE TABLE $tableName PARTITION (A='a1', b=10) COMPUTE STATISTICS")
+      sql(s"ANALYZE TABLE $tableName PARTITION (b=10, a='a1') COMPUTE STATISTICS")
+      sql(s"ANALYZE TABLE $tableName PARTITION (b=10, A='a1') COMPUTE STATISTICS")
+
+      assertAnalysisException("PARTITION (b=10)")
+      assertAnalysisException("PARTITION (a, b=10)")
+      assertAnalysisException("PARTITION (b=10, c='c1')")
+      assertAnalysisException("PARTITION (a, b=10, c='c1')")
+      assertAnalysisException("PARTITION (c='c1')")
+      assertAnalysisException("PARTITION (a, b, c='c1')")
+      assertAnalysisException("PARTITION (a='a1', c='c1')")
+      assertAnalysisException("PARTITION (a='a1', b, c='c1')")
+    }
+  }
+
+  test("analyze non-existent partition") {
+
+    def assertAnalysisException(analyzeCommand: String, errorMessage: String): Unit = {
+      val message = intercept[AnalysisException] {
+        sql(analyzeCommand)
+      }.getMessage
+      assert(message.contains(errorMessage))
+    }
+
+    val tableName = "analyzeTable_part"
+    withTable(tableName) {
+      sql(s"CREATE TABLE $tableName (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+
+      sql(s"INSERT INTO TABLE $tableName PARTITION (ds='2010-01-01') SELECT * FROM src")
+
+      assertAnalysisException(
+        s"ANALYZE TABLE $tableName PARTITION (hour=20) COMPUTE STATISTICS",
+        s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`"
+      )
+
+      assertAnalysisException(
+        s"ANALYZE TABLE $tableName PARTITION (hour) COMPUTE STATISTICS",
+        s"hour is not a valid partition column in table `default`.`${tableName.toLowerCase}`"
+      )
+
+      intercept[NoSuchPartitionException] {
+        sql(s"ANALYZE TABLE $tableName PARTITION (ds='2011-02-30') COMPUTE STATISTICS")
+      }
+    }
+  }
+
   test("test table-level statistics for hive tables created in HiveExternalCatalog") {
     val textTable = "textTable"
     withTable(textTable) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org