You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2016/12/24 07:35:00 UTC

spark git commit: [SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and convert it to Statistics in relations

Repository: spark
Updated Branches:
  refs/heads/master a848f0ba8 -> 3cff81615


[SPARK-18911][SQL] Define CatalogStatistics to interact with metastore and convert it to Statistics in relations

## What changes were proposed in this pull request?

Statistics in LogicalPlan should use attributes to refer to columns rather than column names, because two columns from two relations can have the same column name. But CatalogTable doesn't have the concepts of attribute or broadcast hint in Statistics. Therefore, putting Statistics in CatalogTable is confusing.

We define a different statistic structure in CatalogTable, which is only responsible for interacting with metastore, and is converted to statistics in LogicalPlan when it is used.

## How was this patch tested?

add test cases

Author: wangzhenhua <wa...@huawei.com>
Author: Zhenhua Wang <wz...@163.com>

Closes #16323 from wzhfy/nameToAttr.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/3cff8161
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/3cff8161
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/3cff8161

Branch: refs/heads/master
Commit: 3cff8161578b65139c9740fed694d4b3c81fa74a
Parents: a848f0b
Author: wangzhenhua <wa...@huawei.com>
Authored: Sat Dec 24 15:34:44 2016 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Dec 24 15:34:44 2016 +0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  | 34 +++++++++++++--
 .../sql/catalyst/plans/logical/Statistics.scala |  4 +-
 .../command/AnalyzeColumnCommand.scala          |  4 +-
 .../execution/command/AnalyzeTableCommand.scala | 10 ++---
 .../execution/datasources/LogicalRelation.scala |  2 +-
 .../spark/sql/StatisticsCollectionSuite.scala   | 45 +++++++++++++++++++-
 .../spark/sql/hive/HiveExternalCatalog.scala    |  4 +-
 .../spark/sql/hive/MetastoreRelation.scala      |  2 +-
 .../apache/spark/sql/hive/StatisticsSuite.scala | 13 +++---
 9 files changed, 95 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 5b5378c..24d75ab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,8 +21,8 @@ import java.util.Date
 
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
-import org.apache.spark.sql.catalyst.expressions.{Attribute, Cast, Literal}
-import org.apache.spark.sql.catalyst.plans.logical.{LeafNode, LogicalPlan, Statistics}
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, Cast, Literal}
+import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
 import org.apache.spark.sql.types.{StructField, StructType}
 
@@ -171,7 +171,7 @@ case class CatalogTable(
     createTime: Long = System.currentTimeMillis,
     lastAccessTime: Long = -1,
     properties: Map[String, String] = Map.empty,
-    stats: Option[Statistics] = None,
+    stats: Option[CatalogStatistics] = None,
     viewOriginalText: Option[String] = None,
     viewText: Option[String] = None,
     comment: Option[String] = None,
@@ -247,6 +247,34 @@ case class CatalogTable(
 }
 
 
+/**
+ * This class of statistics is used in [[CatalogTable]] to interact with metastore.
+ * We define this new class instead of directly using [[Statistics]] here because there are no
+ * concepts of attributes or broadcast hint in catalog.
+ */
+case class CatalogStatistics(
+    sizeInBytes: BigInt,
+    rowCount: Option[BigInt] = None,
+    colStats: Map[String, ColumnStat] = Map.empty) {
+
+  /**
+   * Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
+   * on column names.
+   */
+  def toPlanStats(planOutput: Seq[Attribute]): Statistics = {
+    val matched = planOutput.flatMap(a => colStats.get(a.name).map(a -> _))
+    Statistics(sizeInBytes = sizeInBytes, rowCount = rowCount,
+      attributeStats = AttributeMap(matched))
+  }
+
+  /** Readable string representation for the CatalogStatistics. */
+  def simpleString: String = {
+    val rowCountString = if (rowCount.isDefined) s", ${rowCount.get} rows" else ""
+    s"$sizeInBytes bytes$rowCountString"
+  }
+}
+
+
 case class CatalogTableType private(name: String)
 object CatalogTableType {
   val EXTERNAL = new CatalogTableType("EXTERNAL")

http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 465fbab..91404d4 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -41,13 +41,13 @@ import org.apache.spark.sql.types._
  * @param sizeInBytes Physical size in bytes. For leaf operators this defaults to 1, otherwise it
  *                    defaults to the product of children's `sizeInBytes`.
  * @param rowCount Estimated number of rows.
- * @param colStats Column-level statistics.
+ * @param attributeStats Statistics for Attributes.
  * @param isBroadcastable If true, output is small enough to be used in a broadcast join.
  */
 case class Statistics(
     sizeInBytes: BigInt,
     rowCount: Option[BigInt] = None,
-    colStats: Map[String, ColumnStat] = Map.empty,
+    attributeStats: AttributeMap[ColumnStat] = AttributeMap(Nil),
     isBroadcastable: Boolean = false) {
 
   override def toString: String = "Statistics(" + simpleString + ")"

http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 9dffe36..1340c9b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -21,7 +21,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -64,7 +64,7 @@ case class AnalyzeColumnCommand(
       AnalyzeColumnCommand.computeColumnStats(sparkSession, tableIdent.table, relation, columnNames)
 
     // We also update table-level stats in order to keep them consistent with column-level stats.
-    val statistics = Statistics(
+    val statistics = CatalogStatistics(
       sizeInBytes = sizeInBytes,
       rowCount = Some(rowCount),
       // Newly computed column stats should override the existing ones.

http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 52a8fc8..4a994e3 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -25,8 +25,7 @@ import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{AnalysisException, Dataset, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.EliminateSubqueryAliases
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogTable}
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.SessionState
 
@@ -62,9 +61,9 @@ case class AnalyzeTableCommand(
     def updateTableStats(catalogTable: CatalogTable, newTotalSize: Long): Unit = {
       val oldTotalSize = catalogTable.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
       val oldRowCount = catalogTable.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
-      var newStats: Option[Statistics] = None
+      var newStats: Option[CatalogStatistics] = None
       if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
-        newStats = Some(Statistics(sizeInBytes = newTotalSize))
+        newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
       }
       // We only set rowCount when noscan is false, because otherwise:
       // 1. when total size is not changed, we don't need to alter the table;
@@ -76,7 +75,8 @@ case class AnalyzeTableCommand(
           newStats = if (newStats.isDefined) {
             newStats.map(_.copy(rowCount = Some(BigInt(newRowCount))))
           } else {
-            Some(Statistics(sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
+            Some(CatalogStatistics(
+              sizeInBytes = oldTotalSize, rowCount = Some(BigInt(newRowCount))))
           }
         }
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
index 7c28d48..3fd4038 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/LogicalRelation.scala
@@ -73,7 +73,7 @@ case class LogicalRelation(
   override lazy val cleanArgs: Seq[Any] = Seq(relation)
 
   @transient override lazy val statistics: Statistics = {
-    catalogTable.flatMap(_.stats.map(_.copy(sizeInBytes = relation.sizeInBytes))).getOrElse(
+    catalogTable.flatMap(_.stats.map(_.toPlanStats(output))).getOrElse(
       Statistics(sizeInBytes = relation.sizeInBytes))
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index c663b31..18abb18 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -24,6 +24,7 @@ import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.StaticSQLConf
@@ -39,7 +40,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
   import testImplicits._
 
   private def checkTableStats(tableName: String, expectedRowCount: Option[Int])
-    : Option[Statistics] = {
+    : Option[CatalogStatistics] = {
     val df = spark.table(tableName)
     val stats = df.queryExecution.analyzed.collect { case rel: LogicalRelation =>
       assert(rel.catalogTable.get.stats.flatMap(_.rowCount) === expectedRowCount)
@@ -260,4 +261,46 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
       }
     }
   }
+
+  // This test will be run twice: with and without Hive support
+  test("conversion from CatalogStatistics to Statistics") {
+    withTable("ds_tbl", "hive_tbl") {
+      // Test data source table
+      checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
+      // Test hive serde table
+      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+        checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
+      }
+    }
+  }
+
+  private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = {
+    // Create an empty table and run analyze command on it.
+    val createTableSql = if (isDatasourceTable) {
+      s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET"
+    } else {
+      s"CREATE TABLE $tableName (c1 INT, c2 STRING)"
+    }
+    sql(createTableSql)
+    // Analyze only one column.
+    sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
+    val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
+      case catalogRel: CatalogRelation => (catalogRel, catalogRel.catalogTable)
+      case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
+    }.head
+    val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
+    // Check catalog statistics
+    assert(catalogTable.stats.isDefined)
+    assert(catalogTable.stats.get.sizeInBytes == 0)
+    assert(catalogTable.stats.get.rowCount == Some(0))
+    assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
+
+    // Check relation statistics
+    assert(relation.statistics.sizeInBytes == 0)
+    assert(relation.statistics.rowCount == Some(0))
+    assert(relation.statistics.attributeStats.size == 1)
+    val (attribute, colStat) = relation.statistics.attributeStats.head
+    assert(attribute.name == "c1")
+    assert(colStat == emptyColStat)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 9c19a0e..fde6d4a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -37,7 +37,7 @@ import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
 import org.apache.spark.sql.catalyst.catalog._
 import org.apache.spark.sql.catalyst.catalog.ExternalCatalogUtils.escapePathName
 import org.apache.spark.sql.catalyst.expressions._
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Statistics}
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.CaseInsensitiveMap
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.PartitioningUtils
@@ -656,7 +656,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       }
 
       table = table.copy(
-        stats = Some(Statistics(
+        stats = Some(CatalogStatistics(
           sizeInBytes = BigInt(table.properties(STATISTICS_TOTAL_SIZE)),
           rowCount = table.properties.get(STATISTICS_NUM_ROWS).map(BigInt(_)),
           colStats = colStats.toMap)))

http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
index 3bbac05..2e60cba 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/MetastoreRelation.scala
@@ -113,7 +113,7 @@ private[hive] case class MetastoreRelation(
   }
 
   @transient override lazy val statistics: Statistics = {
-    catalogTable.stats.getOrElse(Statistics(
+    catalogTable.stats.map(_.toPlanStats(output)).getOrElse(Statistics(
       sizeInBytes = {
         val totalSize = hiveQlTable.getParameters.get(StatsSetupConst.TOTAL_SIZE)
         val rawDataSize = hiveQlTable.getParameters.get(StatsSetupConst.RAW_DATA_SIZE)

http://git-wip-us.apache.org/repos/asf/spark/blob/3cff8161/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 5ae202f..8803ea3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -23,7 +23,7 @@ import scala.reflect.ClassTag
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.plans.logical.Statistics
+import org.apache.spark.sql.catalyst.catalog.CatalogStatistics
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.execution.joins._
@@ -152,7 +152,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
   }
 
   private def checkTableStats(
-      stats: Option[Statistics],
+      stats: Option[CatalogStatistics],
       hasSizeInBytes: Boolean,
       expectedRowCounts: Option[Int]): Unit = {
     if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
@@ -168,7 +168,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       tableName: String,
       isDataSourceTable: Boolean,
       hasSizeInBytes: Boolean,
-      expectedRowCounts: Option[Int]): Option[Statistics] = {
+      expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
     val df = sql(s"SELECT * FROM $tableName")
     val stats = df.queryExecution.analyzed.collect {
       case rel: MetastoreRelation =>
@@ -435,10 +435,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
   }
 
   /** Used to test refreshing cached metadata once table stats are updated. */
-  private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean): (Statistics, Statistics) = {
+  private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean)
+    : (CatalogStatistics, CatalogStatistics) = {
     val tableName = "tbl"
-    var statsBeforeUpdate: Statistics = null
-    var statsAfterUpdate: Statistics = null
+    var statsBeforeUpdate: CatalogStatistics = null
+    var statsAfterUpdate: CatalogStatistics = null
     withTable(tableName) {
       val tableIndent = TableIdentifier(tableName, Some("default"))
       val catalog = spark.sessionState.catalog.asInstanceOf[HiveSessionCatalog]


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org