You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/08 12:44:17 UTC

spark git commit: [SPARK-21083][SQL] Store zero size and row count when analyzing empty table

Repository: spark
Updated Branches:
  refs/heads/master 0b8dd2d08 -> 9fccc3627


[SPARK-21083][SQL] Store zero size and row count when analyzing empty table

## What changes were proposed in this pull request?

We should be able to store zero size and row count after analyzing empty table.

This pr also enhances the test cases for re-analyzing tables.

## How was this patch tested?

Added a new test case and enhanced some test cases.

Author: Zhenhua Wang <wa...@huawei.com>

Closes #18292 from wzhfy/analyzeNewColumn.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fccc362
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fccc362
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fccc362

Branch: refs/heads/master
Commit: 9fccc3627fa41d32fbae6dbbb9bd1521e43eb4f0
Parents: 0b8dd2d
Author: Zhenhua Wang <wa...@huawei.com>
Authored: Sat Jul 8 20:44:12 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Jul 8 20:44:12 2017 +0800

----------------------------------------------------------------------
 .../execution/command/AnalyzeTableCommand.scala |  5 +-
 .../spark/sql/StatisticsCollectionSuite.scala   | 13 +++++
 .../apache/spark/sql/hive/StatisticsSuite.scala | 52 +++++++++++++++++---
 3 files changed, 59 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9fccc362/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 42e2a9c..cba147c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command
 import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
-import org.apache.spark.sql.execution.SQLExecution
 
 
 /**
@@ -40,10 +39,10 @@ case class AnalyzeTableCommand(
     }
     val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
 
-    val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
+    val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
     val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
     var newStats: Option[CatalogStatistics] = None
-    if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+    if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
       newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
     }
     // We only set rowCount when noscan is false, because otherwise:

http://git-wip-us.apache.org/repos/asf/spark/blob/9fccc362/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 843ced7..b80bd80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -82,6 +82,19 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
     }
   }
 
+  test("analyze empty table") {
+    val table = "emptyTable"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (key STRING, value STRING) USING PARQUET")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS noscan")
+      val fetchedStats1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+      assert(fetchedStats1.get.sizeInBytes == 0)
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+      val fetchedStats2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+      assert(fetchedStats2.get.sizeInBytes == 0)
+    }
+  }
+
   test("analyze column command - unsupported types and invalid columns") {
     val tableName = "column_stats_test1"
     withTable(tableName) {

http://git-wip-us.apache.org/repos/asf/spark/blob/9fccc362/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e00fa64..84bcea3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,6 +26,7 @@ import scala.util.matching.Regex
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -210,27 +211,62 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
-  test("test elimination of the influences of the old stats") {
+  test("keep existing row count in stats with noscan if table is not changed") {
     val textTable = "textTable"
     withTable(textTable) {
-      sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
+      sql(s"CREATE TABLE $textTable (key STRING, value STRING)")
       sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
       sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
       val fetchedStats1 =
         checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
 
       sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
-      // when the total size is not changed, the old row count is kept
+      // when the table is not changed, total size is the same, and the old row count is kept
       val fetchedStats2 =
         checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
       assert(fetchedStats1 == fetchedStats2)
+    }
+  }
 
-      sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
-      sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
-      // update total size and remove the old and invalid row count
+  test("keep existing column stats if table is not changed") {
+    val table = "update_col_stats_table"
+    withTable(table) {
+      sql(s"CREATE TABLE $table (c1 INT, c2 STRING, c3 DOUBLE)")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
+      val fetchedStats0 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+      assert(fetchedStats0.get.colStats == Map("c1" -> ColumnStat(0, None, None, 0, 4, 4)))
+
+      // Insert new data and analyze: have the latest column stats.
+      sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
+      val fetchedStats1 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
+      assert(fetchedStats1.colStats == Map(
+        "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
+          avgLen = 4, maxLen = 4)))
+
+      // Analyze another column: since the table is not changed, the precious column stats are kept.
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2")
+      val fetchedStats2 =
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
+      assert(fetchedStats2.colStats == Map(
+        "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
+          avgLen = 4, maxLen = 4),
+        "c2" -> ColumnStat(distinctCount = 1, min = None, max = None, nullCount = 0,
+          avgLen = 1, maxLen = 1)))
+
+      // Insert new data and analyze: stale column stats are removed and newly collected column
+      // stats are added.
+      sql(s"INSERT INTO TABLE $table SELECT 2, 'b', 20.0")
+      sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1, c3")
       val fetchedStats3 =
-        checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None)
-      assert(fetchedStats3.get.sizeInBytes > fetchedStats2.get.sizeInBytes)
+        checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)).get
+      assert(fetchedStats3.colStats == Map(
+        "c1" -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
+          avgLen = 4, maxLen = 4),
+        "c3" -> ColumnStat(distinctCount = 2, min = Some(10.0), max = Some(20.0), nullCount = 0,
+          avgLen = 8, maxLen = 8)))
     }
   }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org