You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/04/01 00:24:44 UTC
[spark] branch master updated: [SPARK-27266][SQL] Support ANALYZE
TABLE to collect tables stats for cached catalog views
This is an automated email from the ASF dual-hosted git repository.
dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git
The following commit(s) were added to refs/heads/master by this push:
new 885aab4 [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
885aab4 is described below
commit 885aab40a27fb3cac0c26629b35b6ea209e70ca8
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Sun Mar 31 17:24:21 2019 -0700
[SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
## What changes were proposed in this pull request?
The current master doesn't support ANALYZE TABLE to collect tables stats for catalog views even if they are cached as follows;
```scala
scala> sql(s"CREATE VIEW v AS SELECT 1 c")
scala> sql(s"CACHE LAZY TABLE v")
scala> sql(s"ANALYZE TABLE v COMPUTE STATISTICS")
org.apache.spark.sql.AnalysisException: ANALYZE TABLE is not supported on views.;
...
```
Since SPARK-25196 has supported to an ANALYZE command to collect column statistics for cached catalog view, we could support table stats, too.
## How was this patch tested?
Added tests in `StatisticsCollectionSuite` and `InMemoryColumnarQuerySuite`.
Closes #24200 from maropu/SPARK-27266.
Authored-by: Takeshi Yamamuro <ya...@apache.org>
Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
.../sql/execution/columnar/InMemoryRelation.scala | 9 ++++--
.../execution/command/AnalyzeTableCommand.scala | 34 ++++++++++++++--------
.../spark/sql/StatisticsCollectionSuite.scala | 32 ++++++++++++++++++++
.../columnar/InMemoryColumnarQuerySuite.scala | 13 +++++++++
4 files changed, 74 insertions(+), 14 deletions(-)
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 1af5033..3edfd8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -53,6 +53,7 @@ case class CachedRDDBuilder(
@transient @volatile private var _cachedColumnBuffers: RDD[CachedBatch] = null
val sizeInBytesStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
+ val rowCountStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
def cachedColumnBuffers: RDD[CachedBatch] = {
if (_cachedColumnBuffers == null) {
@@ -116,6 +117,7 @@ case class CachedRDDBuilder(
}
sizeInBytesStats.add(totalSize)
+ rowCountStats.add(rowCount)
val stats = InternalRow.fromSeq(
columnBuilders.flatMap(_.columnStats.collectedStatistics))
@@ -200,11 +202,14 @@ case class InMemoryRelation(
}
override def computeStats(): Statistics = {
- if (cacheBuilder.sizeInBytesStats.value == 0L) {
+ if (!cacheBuilder.isCachedColumnBuffersLoaded) {
// Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.
statsOfPlanToCache
} else {
- statsOfPlanToCache.copy(sizeInBytes = cacheBuilder.sizeInBytesStats.value.longValue)
+ statsOfPlanToCache.copy(
+ sizeInBytes = cacheBuilder.sizeInBytesStats.value.longValue,
+ rowCount = Some(cacheBuilder.rowCountStats.value.longValue)
+ )
}
}
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 3076e91..67cfceb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -35,19 +35,29 @@ case class AnalyzeTableCommand(
val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
if (tableMeta.tableType == CatalogTableType.VIEW) {
- throw new AnalysisException("ANALYZE TABLE is not supported on views.")
- }
-
- // Compute stats for the whole table
- val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
- val newRowCount =
- if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+ // Analyzes a catalog view if the view is cached
+ val table = sparkSession.table(tableIdent.quotedString)
+ val cacheManager = sparkSession.sharedState.cacheManager
+ if (cacheManager.lookupCachedData(table.logicalPlan).isDefined) {
+ if (!noscan) {
+ // To collect table stats, materializes an underlying columnar RDD
+ table.count()
+ }
+ } else {
+ throw new AnalysisException("ANALYZE TABLE is not supported on views.")
+ }
+ } else {
+ // Compute stats for the whole table
+ val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
+ val newRowCount =
+ if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
- // Update the metastore if the above statistics of the table are different from those
- // recorded in the metastore.
- val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
- if (newStats.isDefined) {
- sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
+ // Update the metastore if the above statistics of the table are different from those
+ // recorded in the metastore.
+ val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
+ if (newStats.isDefined) {
+ sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
+ }
}
Seq.empty[Row]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index d071efb..90b3586 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -544,4 +544,36 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
assert(getStatAttrNames(s"$database.v") === Set("c"))
}
}
+
+ test("analyzes table statistics in cached catalog view") {
+ def getTableStats(tableName: String): Statistics = {
+ spark.table(tableName).queryExecution.optimizedPlan.stats
+ }
+
+ withTempDatabase { database =>
+ sql(s"CREATE VIEW $database.v AS SELECT 1 c")
+ // Cache data eagerly by default, so this operation collects table stats
+ sql(s"CACHE TABLE $database.v")
+ val stats1 = getTableStats(s"$database.v")
+ assert(stats1.sizeInBytes > 0)
+ assert(stats1.rowCount === Some(1))
+ sql(s"UNCACHE TABLE $database.v")
+
+ // Cache data lazily, then analyze table stats
+ sql(s"CACHE LAZY TABLE $database.v")
+ val stats2 = getTableStats(s"$database.v")
+ assert(stats2.sizeInBytes === OneRowRelation().computeStats().sizeInBytes)
+ assert(stats2.rowCount === None)
+
+ sql(s"ANALYZE TABLE $database.v COMPUTE STATISTICS NOSCAN")
+ val stats3 = getTableStats(s"$database.v")
+ assert(stats3.sizeInBytes === OneRowRelation().computeStats().sizeInBytes)
+ assert(stats3.rowCount === None)
+
+ sql(s"ANALYZE TABLE $database.v COMPUTE STATISTICS")
+ val stats4 = getTableStats(s"$database.v")
+ assert(stats4.sizeInBytes === stats1.sizeInBytes)
+ assert(stats4.rowCount === Some(1))
+ }
+ }
}
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 0a1141c..e40528f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -340,6 +340,19 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
assert(cached.cacheBuilder.sizeInBytesStats.value === expectedAnswer.size * INT.defaultSize)
}
+ test("cached row count should be calculated") {
+ val data = spark.range(6).toDF
+ val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
+ val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan)
+
+ // Materialize the data.
+ val expectedAnswer = data.collect()
+ checkAnswer(cached, expectedAnswer)
+
+ // Check that the right row count was calculated.
+ assert(cached.cacheBuilder.rowCountStats.value === 6)
+ }
+
test("access primitive-type columns in CachedBatch without whole stage codegen") {
// whole stage codegen is not applied to a row with more than WHOLESTAGE_MAX_NUM_FIELDS fields
withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "2") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org