You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/08 12:44:17 UTC
spark git commit: [SPARK-21083][SQL] Store zero size and row count
when analyzing empty table
Repository: spark
Updated Branches:
refs/heads/master 0b8dd2d08 -> 9fccc3627
[SPARK-21083][SQL] Store zero size and row count when analyzing empty table
## What changes were proposed in this pull request?
We should be able to store zero size and row count after analyzing empty table.
This pr also enhances the test cases for re-analyzing tables.
## How was this patch tested?
Added a new test case and enhanced some test cases.
Author: Zhenhua Wang <wa...@huawei.com>
Closes #18292 from wzhfy/analyzeNewColumn.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9fccc362
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9fccc362
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9fccc362
Branch: refs/heads/master
Commit: 9fccc3627fa41d32fbae6dbbb9bd1521e43eb4f0
Parents: 0b8dd2d
Author: Zhenhua Wang <wa...@huawei.com>
Authored: Sat Jul 8 20:44:12 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Sat Jul 8 20:44:12 2017 +0800
----------------------------------------------------------------------
.../execution/command/AnalyzeTableCommand.scala | 5 +-
.../spark/sql/StatisticsCollectionSuite.scala | 13 +++++
.../apache/spark/sql/hive/StatisticsSuite.scala | 52 +++++++++++++++++---
3 files changed, 59 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/9fccc362/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
index 42e2a9c..cba147c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeTableCommand.scala
@@ -20,7 +20,6 @@ package org.apache.spark.sql.execution.command
import org.apache.spark.sql.{AnalysisException, Row, SparkSession}
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
-import org.apache.spark.sql.execution.SQLExecution
/**
@@ -40,10 +39,10 @@ case class AnalyzeTableCommand(
}
val newTotalSize = CommandUtils.calculateTotalSize(sessionState, tableMeta)
- val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(0L)
+ val oldTotalSize = tableMeta.stats.map(_.sizeInBytes.toLong).getOrElse(-1L)
val oldRowCount = tableMeta.stats.flatMap(_.rowCount.map(_.toLong)).getOrElse(-1L)
var newStats: Option[CatalogStatistics] = None
- if (newTotalSize > 0 && newTotalSize != oldTotalSize) {
+ if (newTotalSize >= 0 && newTotalSize != oldTotalSize) {
newStats = Some(CatalogStatistics(sizeInBytes = newTotalSize))
}
// We only set rowCount when noscan is false, because otherwise:
http://git-wip-us.apache.org/repos/asf/spark/blob/9fccc362/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 843ced7..b80bd80 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -82,6 +82,19 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}
+ test("analyze empty table") {
+ val table = "emptyTable"
+ withTable(table) {
+ sql(s"CREATE TABLE $table (key STRING, value STRING) USING PARQUET")
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS noscan")
+ val fetchedStats1 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = None)
+ assert(fetchedStats1.get.sizeInBytes == 0)
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS")
+ val fetchedStats2 = checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+ assert(fetchedStats2.get.sizeInBytes == 0)
+ }
+ }
+
test("analyze column command - unsupported types and invalid columns") {
val tableName = "column_stats_test1"
withTable(tableName) {
http://git-wip-us.apache.org/repos/asf/spark/blob/9fccc362/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index e00fa64..84bcea3 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -26,6 +26,7 @@ import scala.util.matching.Regex
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -210,27 +211,62 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
}
- test("test elimination of the influences of the old stats") {
+ test("keep existing row count in stats with noscan if table is not changed") {
val textTable = "textTable"
withTable(textTable) {
- sql(s"CREATE TABLE $textTable (key STRING, value STRING) STORED AS TEXTFILE")
+ sql(s"CREATE TABLE $textTable (key STRING, value STRING)")
sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS")
val fetchedStats1 =
checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
- // when the total size is not changed, the old row count is kept
+ // when the table is not changed, total size is the same, and the old row count is kept
val fetchedStats2 =
checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
assert(fetchedStats1 == fetchedStats2)
+ }
+ }
- sql(s"INSERT INTO TABLE $textTable SELECT * FROM src")
- sql(s"ANALYZE TABLE $textTable COMPUTE STATISTICS noscan")
- // update total size and remove the old and invalid row count
+ test("keep existing column stats if table is not changed") {
+ val table = "update_col_stats_table"
+ withTable(table) {
+ sql(s"CREATE TABLE $table (c1 INT, c2 STRING, c3 DOUBLE)")
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
+ val fetchedStats0 =
+ checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
+ assert(fetchedStats0.get.colStats == Map("c1" -> ColumnStat(0, None, None, 0, 4, 4)))
+
+ // Insert new data and analyze: have the latest column stats.
+ sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
+ val fetchedStats1 =
+ checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
+ assert(fetchedStats1.colStats == Map(
+ "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
+ avgLen = 4, maxLen = 4)))
+
+ // Analyze another column: since the table is not changed, the precious column stats are kept.
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2")
+ val fetchedStats2 =
+ checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
+ assert(fetchedStats2.colStats == Map(
+ "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
+ avgLen = 4, maxLen = 4),
+ "c2" -> ColumnStat(distinctCount = 1, min = None, max = None, nullCount = 0,
+ avgLen = 1, maxLen = 1)))
+
+ // Insert new data and analyze: stale column stats are removed and newly collected column
+ // stats are added.
+ sql(s"INSERT INTO TABLE $table SELECT 2, 'b', 20.0")
+ sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1, c3")
val fetchedStats3 =
- checkTableStats(textTable, hasSizeInBytes = true, expectedRowCounts = None)
- assert(fetchedStats3.get.sizeInBytes > fetchedStats2.get.sizeInBytes)
+ checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)).get
+ assert(fetchedStats3.colStats == Map(
+ "c1" -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
+ avgLen = 4, maxLen = 4),
+ "c3" -> ColumnStat(distinctCount = 2, min = Some(10.0), max = Some(20.0), nullCount = 0,
+ avgLen = 8, maxLen = 8)))
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org