You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by do...@apache.org on 2019/03/21 16:20:56 UTC

[spark] branch master updated: [SPARK-25196][SQL] Extends the analyze column command for cached tables

This is an automated email from the ASF dual-hosted git repository.

dongjoon pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/spark.git


The following commit(s) were added to refs/heads/master by this push:
     new 0627850  [SPARK-25196][SQL] Extends the analyze column command for cached tables
0627850 is described below

commit 0627850b7e1845147060bc298afb23d5939d4a87
Author: Takeshi Yamamuro <ya...@apache.org>
AuthorDate: Thu Mar 21 09:20:35 2019 -0700

    [SPARK-25196][SQL] Extends the analyze column command for cached tables
    
    ## What changes were proposed in this pull request?
    This pr extended `ANALYZE` commands to analyze column stats for cached table.
    
    In common use cases, users read catalog table data, join/aggregate them, and then cache the result for following reuse. Since we are only allowed to analyze column statistics in catalog tables via ANALYZE commands, the current optimization depends on non-existing or inaccurate column statistics of cached data. So, it would be great if we could analyze cached data as follows;
    
    ```scala
    scala> def printColumnStats(tableName: String) = {
         |   spark.table(tableName).queryExecution.optimizedPlan.stats.attributeStats.foreach {
         |     case (k, v) => println(s"[$k]: $v")
         |   }
         | }
    
    scala> sql("SET spark.sql.cbo.enabled=true")
    scala> sql("SET spark.sql.statistics.histogram.enabled=true")
    
    scala> spark.range(1000).selectExpr("id % 33 AS c0", "rand() AS c1", "0 AS c2").write.saveAsTable("t")
    scala> sql("ANALYZE TABLE t COMPUTE STATISTICS FOR COLUMNS c0, c1, c2")
    scala> spark.table("t").groupBy("c0").agg(count("c1").as("v1"), sum("c2").as("v2")).createTempView("temp")
    
    // Prints column statistics in catalog table `t`
    scala> printColumnStats("t")
    [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;9f7c1c)),2)
    [c1#7074]: ColumnStat(Some(944),Some(3.2108484832404915E-4),Some(0.997584797423909),Some(0),Some(8),Some(8),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;60a386b1)),2)
    [c2#7075]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(4),Some(4),Some(Histogram(3.937007874015748,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;5ffd29e8)),2)
    
    // Prints column statistics on cached table `temp`
    scala> sql("CACHE TABLE temp")
    scala> printColumnStats("temp")
    <No Column Statistics>
    
    // Analyzes columns `v1` and `v2` on cached table `temp`
    scala> sql("ANALYZE TABLE temp COMPUTE STATISTICS FOR COLUMNS v1, v2")
    
    // Then, prints again
    scala> printColumnStats("temp")
    [v1#7084L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;49f7bb6f)),2)
    [v2#7086L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;12701677)),2)
    
    // Analyzes one left column and prints again
    scala> sql("ANALYZE TABLE temp COMPUTE STATISTICS FOR COLUMNS c0")
    scala> printColumnStats("temp")
    [v1#7084L]: ColumnStat(Some(2),Some(30),Some(31),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;49f7bb6f)),2)
    [v2#7086L]: ColumnStat(Some(1),Some(0),Some(0),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;12701677)),2)
    [c0#7073L]: ColumnStat(Some(33),Some(0),Some(32),Some(0),Some(8),Some(8),Some(Histogram(0.12992125984251968,[Lorg.apache.spark.sql.catalyst.plans.logical.HistogramBin;1f5c1b81)),2)
    ```
    
    ## How was this patch tested?
    Added tests in `CachedTableSuite` and `StatisticsCollectionSuite`.
    
    Closes #24047 from maropu/SPARK-25196-4.
    
    Authored-by: Takeshi Yamamuro <ya...@apache.org>
    Signed-off-by: Dongjoon Hyun <dh...@apple.com>
---
 .../apache/spark/sql/execution/CacheManager.scala  | 19 ++++-
 .../sql/execution/columnar/InMemoryRelation.scala  |  4 +-
 .../execution/command/AnalyzeColumnCommand.scala   | 86 +++++++++++++++++-----
 .../spark/sql/execution/command/CommandUtils.scala |  6 +-
 .../org/apache/spark/sql/CachedTableSuite.scala    | 29 ++++++++
 .../spark/sql/StatisticsCollectionSuite.scala      | 74 +++++++++++++++++++
 6 files changed, 192 insertions(+), 26 deletions(-)

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
index 5a11a8f..612f693 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/CacheManager.scala
@@ -25,9 +25,10 @@ import org.apache.hadoop.fs.{FileSystem, Path}
 
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.{Dataset, SparkSession}
-import org.apache.spark.sql.catalyst.expressions.SubqueryExpression
+import org.apache.spark.sql.catalyst.expressions.{Attribute, AttributeMap, SubqueryExpression}
 import org.apache.spark.sql.catalyst.plans.logical.{IgnoreCachedData, LogicalPlan, ResolvedHint}
 import org.apache.spark.sql.execution.columnar.InMemoryRelation
+import org.apache.spark.sql.execution.command.CommandUtils
 import org.apache.spark.sql.execution.datasources.{HadoopFsRelation, LogicalRelation}
 import org.apache.spark.storage.StorageLevel
 import org.apache.spark.storage.StorageLevel.MEMORY_AND_DISK
@@ -154,6 +155,22 @@ class CacheManager extends Logging {
     }
   }
 
+  // Analyzes column statistics in the given cache data
+  private[sql] def analyzeColumnCacheQuery(
+      sparkSession: SparkSession,
+      cachedData: CachedData,
+      column: Seq[Attribute]): Unit = {
+    val relation = cachedData.cachedRepresentation
+    val (rowCount, newColStats) =
+      CommandUtils.computeColumnStats(sparkSession, relation, column)
+    val oldStats = relation.statsOfPlanToCache
+    val newStats = oldStats.copy(
+      rowCount = Some(rowCount),
+      attributeStats = AttributeMap((oldStats.attributeStats ++ newColStats).toSeq)
+    )
+    relation.statsOfPlanToCache = newStats
+  }
+
   /**
    * Tries to re-cache all the cache entries that refer to the given plan.
    */
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
index 7180853..fcc3468 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/columnar/InMemoryRelation.scala
@@ -159,7 +159,7 @@ case class InMemoryRelation(
     output: Seq[Attribute],
     @transient cacheBuilder: CachedRDDBuilder,
     override val outputOrdering: Seq[SortOrder])(
-    statsOfPlanToCache: Statistics)
+    @volatile var statsOfPlanToCache: Statistics)
   extends logical.LeafNode with MultiInstanceRelation {
 
   override protected def innerChildren: Seq[SparkPlan] = Seq(cachedPlan)
@@ -181,7 +181,7 @@ case class InMemoryRelation(
       // Underlying columnar RDD hasn't been materialized, use the stats from the plan to cache.
       statsOfPlanToCache
     } else {
-      Statistics(sizeInBytes = cacheBuilder.sizeInBytesStats.value.longValue)
+      statsOfPlanToCache.copy(sizeInBytes = cacheBuilder.sizeInBytesStats.value.longValue)
     }
   }
 
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 5d91f33..5017893 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.command
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions.Attribute
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
@@ -39,30 +40,41 @@ case class AnalyzeColumnCommand(
     require(columnNames.isDefined ^ allColumns, "Parameter `columnNames` or `allColumns` are " +
       "mutually exclusive. Only one of them should be specified.")
     val sessionState = sparkSession.sessionState
-    val db = tableIdent.database.getOrElse(sessionState.catalog.getCurrentDatabase)
-    val tableIdentWithDB = TableIdentifier(tableIdent.table, Some(db))
-    val tableMeta = sessionState.catalog.getTableMetadata(tableIdentWithDB)
-    if (tableMeta.tableType == CatalogTableType.VIEW) {
-      throw new AnalysisException("ANALYZE TABLE is not supported on views.")
-    }
-    val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
-    val relation = sparkSession.table(tableIdent).logicalPlan
-    val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns)
 
-    // Compute stats for the computed list of columns.
-    val (rowCount, newColStats) =
-      CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze)
+    tableIdent.database match {
+      case Some(db) if db == sparkSession.sharedState.globalTempViewManager.database =>
+        val plan = sessionState.catalog.getGlobalTempView(tableIdent.identifier).getOrElse {
+          throw new NoSuchTableException(db = db, table = tableIdent.identifier)
+        }
+        analyzeColumnInTempView(plan, sparkSession)
+      case Some(_) =>
+        analyzeColumnInCatalog(sparkSession)
+      case None =>
+        sessionState.catalog.getTempView(tableIdent.identifier) match {
+          case Some(tempView) => analyzeColumnInTempView(tempView, sparkSession)
+          case _ => analyzeColumnInCatalog(sparkSession)
+        }
+    }
 
-    // We also update table-level stats in order to keep them consistent with column-level stats.
-    val statistics = CatalogStatistics(
-      sizeInBytes = sizeInBytes,
-      rowCount = Some(rowCount),
-      // Newly computed column stats should override the existing ones.
-      colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColStats)
+    Seq.empty[Row]
+  }
 
-    sessionState.catalog.alterTableStats(tableIdentWithDB, Some(statistics))
+  private def analyzeColumnInCachedData(plan: LogicalPlan, sparkSession: SparkSession): Boolean = {
+    val cacheManager = sparkSession.sharedState.cacheManager
+    cacheManager.lookupCachedData(plan).map { cachedData =>
+      val columnsToAnalyze = getColumnsToAnalyze(
+        tableIdent, cachedData.plan, columnNames, allColumns)
+      cacheManager.analyzeColumnCacheQuery(sparkSession, cachedData, columnsToAnalyze)
+      cachedData
+    }.isDefined
+  }
 
-    Seq.empty[Row]
+  private def analyzeColumnInTempView(plan: LogicalPlan, sparkSession: SparkSession): Unit = {
+    if (!analyzeColumnInCachedData(plan, sparkSession)) {
+      val catalog = sparkSession.sessionState.catalog
+      val db = tableIdent.database.getOrElse(catalog.getCurrentDatabase)
+      throw new NoSuchTableException(db = db, table = tableIdent.identifier)
+    }
   }
 
   private def getColumnsToAnalyze(
@@ -89,6 +101,40 @@ case class AnalyzeColumnCommand(
     columnsToAnalyze
   }
 
+  private def analyzeColumnInCatalog(sparkSession: SparkSession): Unit = {
+    val sessionState = sparkSession.sessionState
+    val tableMeta = sessionState.catalog.getTableMetadata(tableIdent)
+    if (tableMeta.tableType == CatalogTableType.VIEW) {
+      // Analyzes a catalog view if the view is cached
+      val plan = sparkSession.table(tableIdent.quotedString).logicalPlan
+      if (!analyzeColumnInCachedData(plan, sparkSession)) {
+        throw new AnalysisException("ANALYZE TABLE is not supported on views.")
+      }
+    } else {
+      val sizeInBytes = CommandUtils.calculateTotalSize(sparkSession, tableMeta)
+      val relation = sparkSession.table(tableIdent).logicalPlan
+      val columnsToAnalyze = getColumnsToAnalyze(tableIdent, relation, columnNames, allColumns)
+
+      // Compute stats for the computed list of columns.
+      val (rowCount, newColStats) =
+        CommandUtils.computeColumnStats(sparkSession, relation, columnsToAnalyze)
+
+      val newColCatalogStats = newColStats.map {
+        case (attr, columnStat) =>
+          attr.name -> columnStat.toCatalogColumnStat(attr.name, attr.dataType)
+      }
+
+      // We also update table-level stats in order to keep them consistent with column-level stats.
+      val statistics = CatalogStatistics(
+        sizeInBytes = sizeInBytes,
+        rowCount = Some(rowCount),
+        // Newly computed column stats should override the existing ones.
+        colStats = tableMeta.stats.map(_.colStats).getOrElse(Map.empty) ++ newColCatalogStats)
+
+      sessionState.catalog.alterTableStats(tableIdent, Some(statistics))
+    }
+  }
+
   /** Returns true iff the we support gathering column statistics on column of the given type. */
   private def supportsType(dataType: DataType): Boolean = dataType match {
     case _: IntegralType => true
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
index 0ea928b..dea1e01 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/CommandUtils.scala
@@ -168,7 +168,7 @@ object CommandUtils extends Logging {
   private[sql] def computeColumnStats(
       sparkSession: SparkSession,
       relation: LogicalPlan,
-      columns: Seq[Attribute]): (Long, Map[String, CatalogColumnStat]) = {
+      columns: Seq[Attribute]): (Long, Map[Attribute, ColumnStat]) = {
     val conf = sparkSession.sessionState.conf
 
     // Collect statistics per column.
@@ -195,8 +195,8 @@ object CommandUtils extends Logging {
     val rowCount = statsRow.getLong(0)
     val columnStats = columns.zipWithIndex.map { case (attr, i) =>
       // according to `statExprs`, the stats struct always have 7 fields.
-      (attr.name, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
-        attributePercentiles.get(attr)).toCatalogColumnStat(attr.name, attr.dataType))
+      (attr, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
+        attributePercentiles.get(attr)))
     }.toMap
     (rowCount, columnStats)
   }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
index 2141be4..47e745f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/CachedTableSuite.scala
@@ -946,4 +946,33 @@ class CachedTableSuite extends QueryTest with SQLTestUtils with SharedSQLContext
     // Clean-up
     df.unpersist()
   }
+
+  test("analyzes column statistics in cached query") {
+    def query(): DataFrame = {
+      spark.range(100)
+        .selectExpr("id % 3 AS c0", "id % 5 AS c1", "2 AS c2")
+        .groupBy("c0")
+        .agg(avg("c1").as("v1"), sum("c2").as("v2"))
+    }
+    // First, checks if there is no column statistic in cached query
+    val queryStats1 = query().cache.queryExecution.optimizedPlan.stats.attributeStats
+    assert(queryStats1.map(_._1.name).isEmpty)
+
+    val cacheManager = spark.sharedState.cacheManager
+    val cachedData = cacheManager.lookupCachedData(query().logicalPlan)
+    assert(cachedData.isDefined)
+    val queryAttrs = cachedData.get.plan.output
+    assert(queryAttrs.size === 3)
+    val (c0, v1, v2) = (queryAttrs(0), queryAttrs(1), queryAttrs(2))
+
+    // Analyzes one column in the query output
+    cacheManager.analyzeColumnCacheQuery(spark, cachedData.get, v1 :: Nil)
+    val queryStats2 = query().queryExecution.optimizedPlan.stats.attributeStats
+    assert(queryStats2.map(_._1.name).toSet === Set("v1"))
+
+    // Analyzes two more columns
+    cacheManager.analyzeColumnCacheQuery(spark, cachedData.get, c0 :: v2 :: Nil)
+    val queryStats3 = query().queryExecution.optimizedPlan.stats.attributeStats
+    assert(queryStats3.map(_._1.name).toSet === Set("c0", "v1", "v2"))
+  }
 }
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 7ba9f9f..b76678f 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -24,6 +24,7 @@ import java.util.concurrent.TimeUnit
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.analysis.NoSuchTableException
 import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.{DateTimeTestUtils, DateTimeUtils}
@@ -470,4 +471,77 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
       }
     }
   }
+
+  def getStatAttrNames(tableName: String): Set[String] = {
+    val queryStats = spark.table(tableName).queryExecution.optimizedPlan.stats.attributeStats
+    queryStats.map(_._1.name).toSet
+  }
+
+  test("analyzes column statistics in cached query") {
+    withTempView("cachedQuery") {
+      sql(
+        """CACHE TABLE cachedQuery AS
+          |  SELECT c0, avg(c1) AS v1, avg(c2) AS v2
+          |  FROM (SELECT id % 3 AS c0, id % 5 AS c1, 2 AS c2 FROM range(1, 30))
+          |  GROUP BY c0
+        """.stripMargin)
+
+      // Analyzes one column in the cached logical plan
+      sql("ANALYZE TABLE cachedQuery COMPUTE STATISTICS FOR COLUMNS v1")
+      assert(getStatAttrNames("cachedQuery") === Set("v1"))
+
+      // Analyzes two more columns
+      sql("ANALYZE TABLE cachedQuery COMPUTE STATISTICS FOR COLUMNS c0, v2")
+      assert(getStatAttrNames("cachedQuery")  === Set("c0", "v1", "v2"))
+    }
+  }
+
+  test("analyzes column statistics in cached local temporary view") {
+    withTempView("tempView") {
+      // Analyzes in a temporary view
+      sql("CREATE TEMPORARY VIEW tempView AS SELECT * FROM range(1, 30)")
+      val errMsg = intercept[AnalysisException] {
+        sql("ANALYZE TABLE tempView COMPUTE STATISTICS FOR COLUMNS id")
+      }.getMessage
+      assert(errMsg.contains(s"Table or view 'tempView' not found in database 'default'"))
+
+      // Cache the view then analyze it
+      sql("CACHE TABLE tempView")
+      assert(getStatAttrNames("tempView") !== Set("id"))
+      sql("ANALYZE TABLE tempView COMPUTE STATISTICS FOR COLUMNS id")
+      assert(getStatAttrNames("tempView") === Set("id"))
+    }
+  }
+
+  test("analyzes column statistics in cached global temporary view") {
+    withGlobalTempView("gTempView") {
+      val globalTempDB = spark.sharedState.globalTempViewManager.database
+      val errMsg1 = intercept[NoSuchTableException] {
+        sql(s"ANALYZE TABLE $globalTempDB.gTempView COMPUTE STATISTICS FOR COLUMNS id")
+      }.getMessage
+      assert(errMsg1.contains(s"Table or view 'gTempView' not found in database '$globalTempDB'"))
+      // Analyzes in a global temporary view
+      sql("CREATE GLOBAL TEMP VIEW gTempView AS SELECT * FROM range(1, 30)")
+      val errMsg2 = intercept[AnalysisException] {
+        sql(s"ANALYZE TABLE $globalTempDB.gTempView COMPUTE STATISTICS FOR COLUMNS id")
+      }.getMessage
+      assert(errMsg2.contains(s"Table or view 'gTempView' not found in database '$globalTempDB'"))
+
+      // Cache the view then analyze it
+      sql(s"CACHE TABLE $globalTempDB.gTempView")
+      assert(getStatAttrNames(s"$globalTempDB.gTempView") !== Set("id"))
+      sql(s"ANALYZE TABLE $globalTempDB.gTempView COMPUTE STATISTICS FOR COLUMNS id")
+      assert(getStatAttrNames(s"$globalTempDB.gTempView") === Set("id"))
+    }
+  }
+
+  test("analyzes column statistics in cached catalog view") {
+    withTempDatabase { database =>
+      sql(s"CREATE VIEW $database.v AS SELECT 1 c")
+      sql(s"CACHE TABLE $database.v")
+      assert(getStatAttrNames(s"$database.v") !== Set("id"))
+      sql(s"ANALYZE TABLE $database.v COMPUTE STATISTICS FOR COLUMNS c")
+      assert(getStatAttrNames(s"$database.v") !== Set("id"))
+    }
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org