You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/04/01 00:24:44 UTC

[spark] branch master updated: [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 885aab4  [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
885aab4 is described below

commit 885aab40a27fb3cac0c26629b35b6ea209e70ca8
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Sun Mar 31 17:24:21 2019 -0700

    [SPARK-27266][SQL] Support ANALYZE TABLE to collect tables stats for cached catalog views
    
    ## What changes were proposed in this pull request?
    The current master doesn't support ANALYZE TABLE to collect tables stats for catalog views even if they are cached as follows;
    
    ```scala
    scala> sql(s"CREATE VIEW v AS SELECT 1 c")
    scala> sql(s"CACHE LAZY TABLE v")
    scala> sql(s"ANALYZE TABLE v COMPUTE STATISTICS")
    org.apache.spark.sql.AnalysisException: ANALYZE TABLE is not supported on views.;
    ...
    ```
    
    Since SPARK-25196 has supported to an ANALYZE command to collect column statistics for cached catalog view, we could support table stats, too.
    
    ## How was this patch tested?
    Added tests in `StatisticsCollectionSuite` and `InMemoryColumnarQuerySuite`.
    
    Closes #24200 from maropu/SPARK-27266.
    
    Authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../sql/execution/columnar/InMemoryRelation.scala  |  9 ++++--
 .../execution/command/AnalyzeTableCommand.scala    | 34 ++++++++++++++--------
 .../spark/sql/StatisticsCollectionSuite.scala      | 32 ++++++++++++++++++++
 .../columnar/InMemoryColumnarQuerySuite.scala      | 13 +++++++++
 4 files changed, 74 insertions(+), 14 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 1af5033..3edfd8f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -53,6 +53,7 @@ case class CachedRDDBuilder(
   @transient @volatile private var _cachedColumnBuffers: RDD[CachedBatch] = null
 
   val sizeInBytesStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
+  val rowCountStats: LongAccumulator = cachedPlan.sqlContext.sparkContext.longAccumulator
 
   def cachedColumnBuffers: RDD[CachedBatch] = {
     if (_cachedColumnBuffers == null) {
@@ -116,6 +117,7 @@ case class CachedRDDBuilder(
           }
 
           sizeInBytesStats.add(totalSize)
+          rowCountStats.add(rowCount)
 
           val stats = InternalRow.fromSeq(
             columnBuilders.flatMap(_.columnStats.collectedStatistics))
@@ -200,11 +202,14 @@ case class InMemoryRelation(
   }
 
   override def computeStats(): Statistics = {
-    if (cacheBuilder.sizeInBytesStats.value == 0L) {
+    if (!cacheBuilder.isCachedColumnBuffersLoaded) {
       // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.
       statsOfPlanToCache
     } else {
-      statsOfPlanToCache.copy(sizeInBytes = cacheBuilder.sizeInBytesStats.value.longValue)
+      statsOfPlanToCache.copy(
+        sizeInBytes = cacheBuilder.sizeInBytesStats.value.longValue,
+        rowCount = Some(cacheBuilder.rowCountStats.value.longValue)
+      )
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 3076e91..67cfceb 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -35,19 +35,29 @@ case class AnalyzeTableCommand(
     val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
     val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
     if (tableMeta.tableType == CatalogTableType.VIEW) {
-      throw new AnalysisException("ANALYZE TABLE is not supported on views.")
-    }
-
-    // Compute stats for the whole table
-    val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
-    val newRowCount =
-      if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
+      // Analyzes a catalog view if the view is cached
+      val table = sparkSession.table(tableIdent.quotedString)
+      val cacheManager = sparkSession.sharedState.cacheManager
+      if (cacheManager.lookupCachedData(table.logicalPlan).isDefined) {
+        if (!noscan) {
+          // To collect table stats, materializes an underlying columnar RDD
+          table.count()
+        }
+      } else {
+        throw new AnalysisException("ANALYZE TABLE is not supported on views.")
+      }
+    } else {
+      // Compute stats for the whole table
+      val newTotalSize = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
+      val newRowCount =
+        if (noscan) None else Some(BigInt(sparkSession.table(tableIdentWithDB).count()))
 
-    // Update the metastore if the above statistics of the table are different from those
-    // recorded in the metastore.
-    val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
-    if (newStats.isDefined) {
-      sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
+      // Update the metastore if the above statistics of the table are different from those
+      // recorded in the metastore.
+      val newStats = CommandUtils.compareAndGetNewStats(tableMeta.stats, newTotalSize, newRowCount)
+      if (newStats.isDefined) {
+        sessionState.catalog.alterTableStats(tableIdentWithDB, newStats)
+      }
     }
 
     Seq.empty[Row]
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index d071efb..90b3586 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -544,4 +544,36 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
       assert(getStatAttrNames(s"$database.v") === Set("c"))
     }
   }
+
+  test("analyzes table statistics in cached catalog view") {
+    def getTableStats(tableName: String): Statistics = {
+      spark.table(tableName).queryExecution.optimizedPlan.stats
+    }
+
+    withTempDatabase { database =>
+      sql(s"CREATE VIEW $database.v AS SELECT 1 c")
+      // Cache data eagerly by default, so this operation collects table stats
+      sql(s"CACHE TABLE $database.v")
+      val stats1 = getTableStats(s"$database.v")
+      assert(stats1.sizeInBytes > 0)
+      assert(stats1.rowCount === Some(1))
+      sql(s"UNCACHE TABLE $database.v")
+
+      // Cache data lazily, then analyze table stats
+      sql(s"CACHE LAZY TABLE $database.v")
+      val stats2 = getTableStats(s"$database.v")
+      assert(stats2.sizeInBytes === OneRowRelation().computeStats().sizeInBytes)
+      assert(stats2.rowCount === None)
+
+      sql(s"ANALYZE TABLE $database.v COMPUTE STATISTICS NOSCAN")
+      val stats3 = getTableStats(s"$database.v")
+      assert(stats3.sizeInBytes === OneRowRelation().computeStats().sizeInBytes)
+      assert(stats3.rowCount === None)
+
+      sql(s"ANALYZE TABLE $database.v COMPUTE STATISTICS")
+      val stats4 = getTableStats(s"$database.v")
+      assert(stats4.sizeInBytes === stats1.sizeInBytes)
+      assert(stats4.rowCount === Some(1))
+    }
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
index 0a1141c..e40528f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/columnar/InMemoryColumnarQuerySuite.scala
@@ -340,6 +340,19 @@ class InMemoryColumnarQuerySuite extends QueryTest with SharedSQLContext {
     assert(cached.cacheBuilder.sizeInBytesStats.value === expectedAnswer.size * INT.defaultSize)
   }
 
+   test("cached row count should be calculated") {
+    val data = spark.range(6).toDF
+    val plan = spark.sessionState.executePlan(data.logicalPlan).sparkPlan
+    val cached = InMemoryRelation(true, 5, MEMORY_ONLY, plan, None, data.logicalPlan)
+
+    // Materialize the data.
+    val expectedAnswer = data.collect()
+    checkAnswer(cached, expectedAnswer)
+
+    // Check that the right row count was calculated.
+    assert(cached.cacheBuilder.rowCountStats.value === 6)
+  }
+
   test("access primitive-type columns in CachedBatch without whole stage codegen") {
     // whole stage codegen is not applied to a row with more than WHOLESTAGE_MAX_NUM_FIELDS fields
     withSQLConf(SQLConf.WHOLESTAGE_MAX_NUM_FIELDS.key -> "2") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org