You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/06 08:00:36 UTC
spark git commit: [SPARK-21324][TEST] Improve statistics test suites
Repository: spark
Updated Branches:
refs/heads/master 6ff05a66f -> b8e4d567a
[SPARK-21324][TEST] Improve statistics test suites
## What changes were proposed in this pull request?
1. move `StatisticsCollectionTestBase` to a separate file.
2. move some test cases to `StatisticsCollectionSuite` so that `hive/StatisticsSuite` only keeps tests that need hive support.
3. clear up some test cases.
## How was this patch tested?
Existing tests.
Author: wangzhenhua <wa...@huawei.com>
Author: Zhenhua Wang <wz...@163.com>
Closes #18545 from wzhfy/cleanStatSuites.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8e4d567
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8e4d567
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8e4d567
Branch: refs/heads/master
Commit: b8e4d567a7d6c2ff277700d4e7707e57e87c7808
Parents: 6ff05a66
Author: wangzhenhua <wa...@huawei.com>
Authored: Thu Jul 6 16:00:31 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 6 16:00:31 2017 +0800
----------------------------------------------------------------------
.../spark/sql/StatisticsCollectionSuite.scala | 193 ++++---------------
.../sql/StatisticsCollectionTestBase.scala | 192 ++++++++++++++++++
.../apache/spark/sql/hive/StatisticsSuite.scala | 124 ++++--------
3 files changed, 258 insertions(+), 251 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/b8e4d567/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index d9392de..843ced7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -17,19 +17,12 @@
package org.apache.spark.sql
-import java.{lang => jl}
-import java.sql.{Date, Timestamp}
-
import scala.collection.mutable
-import scala.util.Random
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
import org.apache.spark.sql.test.SQLTestData.ArrayData
import org.apache.spark.sql.types._
@@ -58,6 +51,37 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}
+ test("analyzing views is not supported") {
+ def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
+ val err = intercept[AnalysisException] {
+ sql(analyzeCommand)
+ }
+ assert(err.message.contains("ANALYZE TABLE is not supported"))
+ }
+
+ val tableName = "tbl"
+ withTable(tableName) {
+ spark.range(10).write.saveAsTable(tableName)
+ val viewName = "view"
+ withView(viewName) {
+ sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName")
+ assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
+ assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
+ }
+ }
+ }
+
+ test("statistics collection of a table with zero column") {
+ val table_no_cols = "table_no_cols"
+ withTable(table_no_cols) {
+ val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
+ val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
+ dfNoCols.write.format("json").saveAsTable(table_no_cols)
+ sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
+ checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10))
+ }
+ }
+
test("analyze column command - unsupported types and invalid columns") {
val tableName = "column_stats_test1"
withTable(tableName) {
@@ -239,154 +263,3 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}
-
-
-/**
- * The base for test cases that we want to include in both the hive module (for verifying behavior
- * when using the Hive external catalog) as well as in the sql/core module.
- */
-abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
- import testImplicits._
-
- private val dec1 = new java.math.BigDecimal("1.000000000000000000")
- private val dec2 = new java.math.BigDecimal("8.000000000000000000")
- private val d1 = Date.valueOf("2016-05-08")
- private val d2 = Date.valueOf("2016-05-09")
- private val t1 = Timestamp.valueOf("2016-05-08 00:00:01")
- private val t2 = Timestamp.valueOf("2016-05-09 00:00:02")
-
- /**
- * Define a very simple 3 row table used for testing column serialization.
- * Note: last column is seq[int] which doesn't support stats collection.
- */
- protected val data = Seq[
- (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long,
- jl.Double, jl.Float, java.math.BigDecimal,
- String, Array[Byte], Date, Timestamp,
- Seq[Int])](
- (false, 1.toByte, 1.toShort, 1, 1L, 1.0, 1.0f, dec1, "s1", "b1".getBytes, d1, t1, null),
- (true, 2.toByte, 3.toShort, 4, 5L, 6.0, 7.0f, dec2, "ss9", "bb0".getBytes, d2, t2, null),
- (null, null, null, null, null, null, null, null, null, null, null, null, null)
- )
-
- /** A mapping from column to the stats collected. */
- protected val stats = mutable.LinkedHashMap(
- "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1),
- "cbyte" -> ColumnStat(2, Some(1.toByte), Some(2.toByte), 1, 1, 1),
- "cshort" -> ColumnStat(2, Some(1.toShort), Some(3.toShort), 1, 2, 2),
- "cint" -> ColumnStat(2, Some(1), Some(4), 1, 4, 4),
- "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8),
- "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8),
- "cfloat" -> ColumnStat(2, Some(1.0f), Some(7.0f), 1, 4, 4),
- "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 16, 16),
- "cstring" -> ColumnStat(2, None, None, 1, 3, 3),
- "cbinary" -> ColumnStat(2, None, None, 1, 3, 3),
- "cdate" -> ColumnStat(2, Some(DateTimeUtils.fromJavaDate(d1)),
- Some(DateTimeUtils.fromJavaDate(d2)), 1, 4, 4),
- "ctimestamp" -> ColumnStat(2, Some(DateTimeUtils.fromJavaTimestamp(t1)),
- Some(DateTimeUtils.fromJavaTimestamp(t2)), 1, 8, 8)
- )
-
- private val randomName = new Random(31)
-
- def checkTableStats(
- tableName: String,
- hasSizeInBytes: Boolean,
- expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
- val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats
- if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
- assert(stats.isDefined)
- assert(stats.get.sizeInBytes >= 0)
- assert(stats.get.rowCount === expectedRowCounts)
- } else {
- assert(stats.isEmpty)
- }
-
- stats
- }
-
- /**
- * Compute column stats for the given DataFrame and compare it with colStats.
- */
- def checkColStats(
- df: DataFrame,
- colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
- val tableName = "column_stats_test_" + randomName.nextInt(1000)
- withTable(tableName) {
- df.write.saveAsTable(tableName)
-
- // Collect statistics
- sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " +
- colStats.keys.mkString(", "))
-
- // Validate statistics
- val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
- assert(table.stats.isDefined)
- assert(table.stats.get.colStats.size == colStats.size)
-
- colStats.foreach { case (k, v) =>
- withClue(s"column $k") {
- assert(table.stats.get.colStats(k) == v)
- }
- }
- }
- }
-
- // This test will be run twice: with and without Hive support
- test("SPARK-18856: non-empty partitioned table should not report zero size") {
- withTable("ds_tbl", "hive_tbl") {
- spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl")
- val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats
- assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
-
- if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
- sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)")
- sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1")
- val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats
- assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
- }
- }
- }
-
- // This test will be run twice: with and without Hive support
- test("conversion from CatalogStatistics to Statistics") {
- withTable("ds_tbl", "hive_tbl") {
- // Test data source table
- checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
- // Test hive serde table
- if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
- checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
- }
- }
- }
-
- private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = {
- // Create an empty table and run analyze command on it.
- val createTableSql = if (isDatasourceTable) {
- s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET"
- } else {
- s"CREATE TABLE $tableName (c1 INT, c2 STRING)"
- }
- sql(createTableSql)
- // Analyze only one column.
- sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
- val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
- case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
- case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
- }.head
- val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
- // Check catalog statistics
- assert(catalogTable.stats.isDefined)
- assert(catalogTable.stats.get.sizeInBytes == 0)
- assert(catalogTable.stats.get.rowCount == Some(0))
- assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
-
- // Check relation statistics
- assert(relation.stats.sizeInBytes == 0)
- assert(relation.stats.rowCount == Some(0))
- assert(relation.stats.attributeStats.size == 1)
- val (attribute, colStat) = relation.stats.attributeStats.head
- assert(attribute.name == "c1")
- assert(colStat == emptyColStat)
- }
-}
http://git-wip-us.apache.org/repos/asf/spark/blob/b8e4d567/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
new file mode 100644
index 0000000..4156976
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl}
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.Decimal
+
+
+/**
+ * The base for statistics test cases that we want to include in both the hive module (for
+ * verifying behavior when using the Hive external catalog) as well as in the sql/core module.
+ */
+abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
+ import testImplicits._
+
+ private val dec1 = new java.math.BigDecimal("1.000000000000000000")
+ private val dec2 = new java.math.BigDecimal("8.000000000000000000")
+ private val d1 = Date.valueOf("2016-05-08")
+ private val d2 = Date.valueOf("2016-05-09")
+ private val t1 = Timestamp.valueOf("2016-05-08 00:00:01")
+ private val t2 = Timestamp.valueOf("2016-05-09 00:00:02")
+
+ /**
+ * Define a very simple 3 row table used for testing column serialization.
+ * Note: last column is seq[int] which doesn't support stats collection.
+ */
+ protected val data = Seq[
+ (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long,
+ jl.Double, jl.Float, java.math.BigDecimal,
+ String, Array[Byte], Date, Timestamp,
+ Seq[Int])](
+ (false, 1.toByte, 1.toShort, 1, 1L, 1.0, 1.0f, dec1, "s1", "b1".getBytes, d1, t1, null),
+ (true, 2.toByte, 3.toShort, 4, 5L, 6.0, 7.0f, dec2, "ss9", "bb0".getBytes, d2, t2, null),
+ (null, null, null, null, null, null, null, null, null, null, null, null, null)
+ )
+
+ /** A mapping from column to the stats collected. */
+ protected val stats = mutable.LinkedHashMap(
+ "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1),
+ "cbyte" -> ColumnStat(2, Some(1.toByte), Some(2.toByte), 1, 1, 1),
+ "cshort" -> ColumnStat(2, Some(1.toShort), Some(3.toShort), 1, 2, 2),
+ "cint" -> ColumnStat(2, Some(1), Some(4), 1, 4, 4),
+ "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8),
+ "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8),
+ "cfloat" -> ColumnStat(2, Some(1.0f), Some(7.0f), 1, 4, 4),
+ "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 16, 16),
+ "cstring" -> ColumnStat(2, None, None, 1, 3, 3),
+ "cbinary" -> ColumnStat(2, None, None, 1, 3, 3),
+ "cdate" -> ColumnStat(2, Some(DateTimeUtils.fromJavaDate(d1)),
+ Some(DateTimeUtils.fromJavaDate(d2)), 1, 4, 4),
+ "ctimestamp" -> ColumnStat(2, Some(DateTimeUtils.fromJavaTimestamp(t1)),
+ Some(DateTimeUtils.fromJavaTimestamp(t2)), 1, 8, 8)
+ )
+
+ private val randomName = new Random(31)
+
+ def getCatalogTable(tableName: String): CatalogTable = {
+ spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+ }
+
+ def getCatalogStatistics(tableName: String): CatalogStatistics = {
+ getCatalogTable(tableName).stats.get
+ }
+
+ def checkTableStats(
+ tableName: String,
+ hasSizeInBytes: Boolean,
+ expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
+ val stats = getCatalogTable(tableName).stats
+ if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
+ assert(stats.isDefined)
+ assert(stats.get.sizeInBytes >= 0)
+ assert(stats.get.rowCount === expectedRowCounts)
+ } else {
+ assert(stats.isEmpty)
+ }
+
+ stats
+ }
+
+ /**
+ * Compute column stats for the given DataFrame and compare it with colStats.
+ */
+ def checkColStats(
+ df: DataFrame,
+ colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
+ val tableName = "column_stats_test_" + randomName.nextInt(1000)
+ withTable(tableName) {
+ df.write.saveAsTable(tableName)
+
+ // Collect statistics
+ sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " +
+ colStats.keys.mkString(", "))
+
+ // Validate statistics
+ val table = getCatalogTable(tableName)
+ assert(table.stats.isDefined)
+ assert(table.stats.get.colStats.size == colStats.size)
+
+ colStats.foreach { case (k, v) =>
+ withClue(s"column $k") {
+ assert(table.stats.get.colStats(k) == v)
+ }
+ }
+ }
+ }
+
+ // This test will be run twice: with and without Hive support
+ test("SPARK-18856: non-empty partitioned table should not report zero size") {
+ withTable("ds_tbl", "hive_tbl") {
+ spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl")
+ val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats
+ assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
+
+ if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+ sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)")
+ sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1")
+ val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats
+ assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
+ }
+ }
+ }
+
+ // This test will be run twice: with and without Hive support
+ test("conversion from CatalogStatistics to Statistics") {
+ withTable("ds_tbl", "hive_tbl") {
+ // Test data source table
+ checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
+ // Test hive serde table
+ if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+ checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
+ }
+ }
+ }
+
+ private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = {
+ // Create an empty table and run analyze command on it.
+ val createTableSql = if (isDatasourceTable) {
+ s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET"
+ } else {
+ s"CREATE TABLE $tableName (c1 INT, c2 STRING)"
+ }
+ sql(createTableSql)
+ // Analyze only one column.
+ sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
+ val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
+ case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
+ case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
+ }.head
+ val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
+ // Check catalog statistics
+ assert(catalogTable.stats.isDefined)
+ assert(catalogTable.stats.get.sizeInBytes == 0)
+ assert(catalogTable.stats.get.rowCount == Some(0))
+ assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
+
+ // Check relation statistics
+ assert(relation.stats.sizeInBytes == 0)
+ assert(relation.stats.rowCount == Some(0))
+ assert(relation.stats.attributeStats.size == 1)
+ val (attribute, colStat) = relation.stats.attributeStats.head
+ assert(attribute.name == "c1")
+ assert(colStat == emptyColStat)
+ }
+}
http://git-wip-us.apache.org/repos/asf/spark/blob/b8e4d567/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index c601038..e00fa64 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -25,7 +25,7 @@ import scala.util.matching.Regex
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -33,7 +33,6 @@ import org.apache.spark.sql.execution.joins._
import org.apache.spark.sql.hive.HiveExternalCatalog._
import org.apache.spark.sql.hive.test.TestHiveSingleton
import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
@@ -82,58 +81,42 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes
// Non-partitioned table
- sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
- sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
- sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
+ val nonPartTable = "non_part_table"
+ withTable(nonPartTable) {
+ sql(s"CREATE TABLE $nonPartTable (key STRING, value STRING)")
+ sql(s"INSERT INTO TABLE $nonPartTable SELECT * FROM src")
+ sql(s"INSERT INTO TABLE $nonPartTable SELECT * FROM src")
- sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan")
+ sql(s"ANALYZE TABLE $nonPartTable COMPUTE STATISTICS noscan")
- assert(queryTotalSize("analyzeTable") === BigInt(11624))
-
- sql("DROP TABLE analyzeTable").collect()
+ assert(queryTotalSize(nonPartTable) === BigInt(11624))
+ }
// Partitioned table
- sql(
- """
- |CREATE TABLE analyzeTable_part (key STRING, value STRING) PARTITIONED BY (ds STRING)
- """.stripMargin).collect()
- sql(
- """
- |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-01')
- |SELECT * FROM src
- """.stripMargin).collect()
- sql(
- """
- |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-02')
- |SELECT * FROM src
- """.stripMargin).collect()
- sql(
- """
- |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-03')
- |SELECT * FROM src
- """.stripMargin).collect()
+ val partTable = "part_table"
+ withTable(partTable) {
+ sql(s"CREATE TABLE $partTable (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+ sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-01') SELECT * FROM src")
+ sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-02') SELECT * FROM src")
+ sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-03') SELECT * FROM src")
- assert(queryTotalSize("analyzeTable_part") === spark.sessionState.conf.defaultSizeInBytes)
+ assert(queryTotalSize(partTable) === spark.sessionState.conf.defaultSizeInBytes)
- sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")
+ sql(s"ANALYZE TABLE $partTable COMPUTE STATISTICS noscan")
- assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
-
- sql("DROP TABLE analyzeTable_part").collect()
+ assert(queryTotalSize(partTable) === BigInt(17436))
+ }
// Try to analyze a temp table
- sql("""SELECT * FROM src""").createOrReplaceTempView("tempTable")
- intercept[AnalysisException] {
- sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
+ withView("tempTable") {
+ sql("""SELECT * FROM src""").createOrReplaceTempView("tempTable")
+ intercept[AnalysisException] {
+ sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
+ }
}
- spark.sessionState.catalog.dropTable(
- TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
}
test("SPARK-21079 - analyze table with location different than that of individual partitions") {
- def queryTotalSize(tableName: String): BigInt =
- spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes
-
val tableName = "analyzeTable_part"
withTable(tableName) {
withTempPath { path =>
@@ -148,15 +131,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
- assert(queryTotalSize(tableName) === BigInt(17436))
+ assert(getCatalogStatistics(tableName).sizeInBytes === BigInt(17436))
}
}
}
test("SPARK-21079 - analyze partitioned table with only a subset of partitions visible") {
- def queryTotalSize(tableName: String): BigInt =
- spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes
-
val sourceTableName = "analyzeTable_part"
val tableName = "analyzeTable_part_vis"
withTable(sourceTableName, tableName) {
@@ -188,39 +168,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
// Register only one of the partitions found on disk
val ds = partitionDates.head
- sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds')").collect()
+ sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds')")
// Analyze original table - expect 3 partitions
sql(s"ANALYZE TABLE $sourceTableName COMPUTE STATISTICS noscan")
- assert(queryTotalSize(sourceTableName) === BigInt(3 * 5812))
+ assert(getCatalogStatistics(sourceTableName).sizeInBytes === BigInt(3 * 5812))
// Analyze partial-copy table - expect only 1 partition
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
- assert(queryTotalSize(tableName) === BigInt(5812))
+ assert(getCatalogStatistics(tableName).sizeInBytes === BigInt(5812))
}
}
}
- test("analyzing views is not supported") {
- def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
- val err = intercept[AnalysisException] {
- sql(analyzeCommand)
- }
- assert(err.message.contains("ANALYZE TABLE is not supported"))
- }
-
- val tableName = "tbl"
- withTable(tableName) {
- spark.range(10).write.saveAsTable(tableName)
- val viewName = "view"
- withView(viewName) {
- sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName")
- assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
- assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
- }
- }
- }
-
test("test table-level statistics for hive tables created in HiveExternalCatalog") {
val textTable = "textTable"
withTable(textTable) {
@@ -290,8 +250,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
if (analyzedByHive) hiveClient.runSqlHive(s"ANALYZE TABLE $tabName COMPUTE STATISTICS")
val describeResult1 = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
- val tableMetadata =
- spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)).properties
+ val tableMetadata = getCatalogTable(tabName).properties
// statistics info is not contained in the metadata of the original table
assert(Seq(StatsSetupConst.COLUMN_STATS_ACCURATE,
StatsSetupConst.NUM_FILES,
@@ -327,8 +286,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val tabName = "tab1"
withTable(tabName) {
createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false)
- checkTableStats(
- tabName, hasSizeInBytes = true, expectedRowCounts = None)
+ checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None)
// ALTER TABLE SET TBLPROPERTIES invalidates some contents of Hive specific statistics
// This is triggered by the Hive alterTable API
@@ -370,10 +328,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
test("alter table should not have the side effect to store statistics in Spark side") {
- def getCatalogTable(tableName: String): CatalogTable = {
- spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
- }
-
val table = "alter_table_side_effect"
withTable(table) {
sql(s"CREATE TABLE $table (i string, j string)")
@@ -637,12 +591,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
// the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it
// for robustness
- withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+ withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") {
checkTableStats(parquetTable, hasSizeInBytes = false, expectedRowCounts = None)
sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
}
- withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") {
+ withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") {
// We still can get tableSize from Hive before Analyze
checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = None)
sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
@@ -759,8 +713,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val parquetTable = "parquetTable"
withTable(parquetTable) {
sql(createTableCmd)
- val catalogTable = spark.sessionState.catalog.getTableMetadata(
- TableIdentifier(parquetTable))
+ val catalogTable = getCatalogTable(parquetTable)
assert(DDLUtils.isDatasourceTable(catalogTable))
// Add a filter to avoid creating too many partitions
@@ -795,17 +748,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
"partitioned data source table",
"CREATE TABLE parquetTable (key STRING, value STRING) USING PARQUET PARTITIONED BY (key)")
- test("statistics collection of a table with zero column") {
- val table_no_cols = "table_no_cols"
- withTable(table_no_cols) {
- val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
- val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
- dfNoCols.write.format("json").saveAsTable(table_no_cols)
- sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
- checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10))
- }
- }
-
/** Used to test refreshing cached metadata once table stats are updated. */
private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean)
: (CatalogStatistics, CatalogStatistics) = {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org