You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/07/06 08:00:36 UTC

spark git commit: [SPARK-21324][TEST] Improve statistics test suites

Repository: spark
Updated Branches:
  refs/heads/master 6ff05a66f -> b8e4d567a


[SPARK-21324][TEST] Improve statistics test suites

## What changes were proposed in this pull request?

1. move `StatisticsCollectionTestBase` to a separate file.
2. move some test cases to `StatisticsCollectionSuite` so that `hive/StatisticsSuite` only keeps tests that need hive support.
3. clear up some test cases.

## How was this patch tested?

Existing tests.

Author: wangzhenhua <wa...@huawei.com>
Author: Zhenhua Wang <wz...@163.com>

Closes #18545 from wzhfy/cleanStatSuites.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/b8e4d567
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/b8e4d567
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/b8e4d567

Branch: refs/heads/master
Commit: b8e4d567a7d6c2ff277700d4e7707e57e87c7808
Parents: 6ff05a66
Author: wangzhenhua <wa...@huawei.com>
Authored: Thu Jul 6 16:00:31 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Jul 6 16:00:31 2017 +0800

----------------------------------------------------------------------
 .../spark/sql/StatisticsCollectionSuite.scala   | 193 ++++---------------
 .../sql/StatisticsCollectionTestBase.scala      | 192 ++++++++++++++++++
 .../apache/spark/sql/hive/StatisticsSuite.scala | 124 ++++--------
 3 files changed, 258 insertions(+), 251 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/b8e4d567/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index d9392de..843ced7 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -17,19 +17,12 @@
 
 package org.apache.spark.sql
 
-import java.{lang => jl}
-import java.sql.{Date, Timestamp}
-
 import scala.collection.mutable
-import scala.util.Random
 
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
 import org.apache.spark.sql.catalyst.plans.logical._
-import org.apache.spark.sql.catalyst.util.DateTimeUtils
-import org.apache.spark.sql.execution.datasources.LogicalRelation
-import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
-import org.apache.spark.sql.test.{SharedSQLContext, SQLTestUtils}
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.test.SharedSQLContext
 import org.apache.spark.sql.test.SQLTestData.ArrayData
 import org.apache.spark.sql.types._
 
@@ -58,6 +51,37 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
     }
   }
 
+  test("analyzing views is not supported") {
+    def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
+      val err = intercept[AnalysisException] {
+        sql(analyzeCommand)
+      }
+      assert(err.message.contains("ANALYZE TABLE is not supported"))
+    }
+
+    val tableName = "tbl"
+    withTable(tableName) {
+      spark.range(10).write.saveAsTable(tableName)
+      val viewName = "view"
+      withView(viewName) {
+        sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName")
+        assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
+        assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
+      }
+    }
+  }
+
+  test("statistics collection of a table with zero column") {
+    val table_no_cols = "table_no_cols"
+    withTable(table_no_cols) {
+      val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
+      val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
+      dfNoCols.write.format("json").saveAsTable(table_no_cols)
+      sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
+      checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10))
+    }
+  }
+
   test("analyze column command - unsupported types and invalid columns") {
     val tableName = "column_stats_test1"
     withTable(tableName) {
@@ -239,154 +263,3 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
   }
 
 }
-
-
-/**
- * The base for test cases that we want to include in both the hive module (for verifying behavior
- * when using the Hive external catalog) as well as in the sql/core module.
- */
-abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
-  import testImplicits._
-
-  private val dec1 = new java.math.BigDecimal("1.000000000000000000")
-  private val dec2 = new java.math.BigDecimal("8.000000000000000000")
-  private val d1 = Date.valueOf("2016-05-08")
-  private val d2 = Date.valueOf("2016-05-09")
-  private val t1 = Timestamp.valueOf("2016-05-08 00:00:01")
-  private val t2 = Timestamp.valueOf("2016-05-09 00:00:02")
-
-  /**
-   * Define a very simple 3 row table used for testing column serialization.
-   * Note: last column is seq[int] which doesn't support stats collection.
-   */
-  protected val data = Seq[
-    (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long,
-      jl.Double, jl.Float, java.math.BigDecimal,
-      String, Array[Byte], Date, Timestamp,
-      Seq[Int])](
-    (false, 1.toByte, 1.toShort, 1, 1L, 1.0, 1.0f, dec1, "s1", "b1".getBytes, d1, t1, null),
-    (true, 2.toByte, 3.toShort, 4, 5L, 6.0, 7.0f, dec2, "ss9", "bb0".getBytes, d2, t2, null),
-    (null, null, null, null, null, null, null, null, null, null, null, null, null)
-  )
-
-  /** A mapping from column to the stats collected. */
-  protected val stats = mutable.LinkedHashMap(
-    "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1),
-    "cbyte" -> ColumnStat(2, Some(1.toByte), Some(2.toByte), 1, 1, 1),
-    "cshort" -> ColumnStat(2, Some(1.toShort), Some(3.toShort), 1, 2, 2),
-    "cint" -> ColumnStat(2, Some(1), Some(4), 1, 4, 4),
-    "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8),
-    "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8),
-    "cfloat" -> ColumnStat(2, Some(1.0f), Some(7.0f), 1, 4, 4),
-    "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 16, 16),
-    "cstring" -> ColumnStat(2, None, None, 1, 3, 3),
-    "cbinary" -> ColumnStat(2, None, None, 1, 3, 3),
-    "cdate" -> ColumnStat(2, Some(DateTimeUtils.fromJavaDate(d1)),
-      Some(DateTimeUtils.fromJavaDate(d2)), 1, 4, 4),
-    "ctimestamp" -> ColumnStat(2, Some(DateTimeUtils.fromJavaTimestamp(t1)),
-      Some(DateTimeUtils.fromJavaTimestamp(t2)), 1, 8, 8)
-  )
-
-  private val randomName = new Random(31)
-
-  def checkTableStats(
-      tableName: String,
-      hasSizeInBytes: Boolean,
-      expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
-    val stats = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName)).stats
-    if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
-      assert(stats.isDefined)
-      assert(stats.get.sizeInBytes >= 0)
-      assert(stats.get.rowCount === expectedRowCounts)
-    } else {
-      assert(stats.isEmpty)
-    }
-
-    stats
-  }
-
-  /**
-   * Compute column stats for the given DataFrame and compare it with colStats.
-   */
-  def checkColStats(
-      df: DataFrame,
-      colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
-    val tableName = "column_stats_test_" + randomName.nextInt(1000)
-    withTable(tableName) {
-      df.write.saveAsTable(tableName)
-
-      // Collect statistics
-      sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " +
-        colStats.keys.mkString(", "))
-
-      // Validate statistics
-      val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
-      assert(table.stats.isDefined)
-      assert(table.stats.get.colStats.size == colStats.size)
-
-      colStats.foreach { case (k, v) =>
-        withClue(s"column $k") {
-          assert(table.stats.get.colStats(k) == v)
-        }
-      }
-    }
-  }
-
-  // This test will be run twice: with and without Hive support
-  test("SPARK-18856: non-empty partitioned table should not report zero size") {
-    withTable("ds_tbl", "hive_tbl") {
-      spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl")
-      val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats
-      assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
-
-      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
-        sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)")
-        sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1")
-        val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats
-        assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
-      }
-    }
-  }
-
-  // This test will be run twice: with and without Hive support
-  test("conversion from CatalogStatistics to Statistics") {
-    withTable("ds_tbl", "hive_tbl") {
-      // Test data source table
-      checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
-      // Test hive serde table
-      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
-        checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
-      }
-    }
-  }
-
-  private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = {
-    // Create an empty table and run analyze command on it.
-    val createTableSql = if (isDatasourceTable) {
-      s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET"
-    } else {
-      s"CREATE TABLE $tableName (c1 INT, c2 STRING)"
-    }
-    sql(createTableSql)
-    // Analyze only one column.
-    sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
-    val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
-      case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
-      case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
-    }.head
-    val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
-    // Check catalog statistics
-    assert(catalogTable.stats.isDefined)
-    assert(catalogTable.stats.get.sizeInBytes == 0)
-    assert(catalogTable.stats.get.rowCount == Some(0))
-    assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
-
-    // Check relation statistics
-    assert(relation.stats.sizeInBytes == 0)
-    assert(relation.stats.rowCount == Some(0))
-    assert(relation.stats.attributeStats.size == 1)
-    val (attribute, colStat) = relation.stats.attributeStats.head
-    assert(attribute.name == "c1")
-    assert(colStat == emptyColStat)
-  }
-}

http://git-wip-us.apache.org/repos/asf/spark/blob/b8e4d567/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
new file mode 100644
index 0000000..4156976
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+
+import java.{lang => jl}
+import java.sql.{Date, Timestamp}
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.plans.logical.ColumnStat
+import org.apache.spark.sql.catalyst.util.DateTimeUtils
+import org.apache.spark.sql.execution.datasources.LogicalRelation
+import org.apache.spark.sql.internal.StaticSQLConf
+import org.apache.spark.sql.test.SQLTestUtils
+import org.apache.spark.sql.types.Decimal
+
+
+/**
+ * The base for statistics test cases that we want to include in both the hive module (for
+ * verifying behavior when using the Hive external catalog) as well as in the sql/core module.
+ */
+abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
+  import testImplicits._
+
+  private val dec1 = new java.math.BigDecimal("1.000000000000000000")
+  private val dec2 = new java.math.BigDecimal("8.000000000000000000")
+  private val d1 = Date.valueOf("2016-05-08")
+  private val d2 = Date.valueOf("2016-05-09")
+  private val t1 = Timestamp.valueOf("2016-05-08 00:00:01")
+  private val t2 = Timestamp.valueOf("2016-05-09 00:00:02")
+
+  /**
+   * Define a very simple 3 row table used for testing column serialization.
+   * Note: last column is seq[int] which doesn't support stats collection.
+   */
+  protected val data = Seq[
+    (jl.Boolean, jl.Byte, jl.Short, jl.Integer, jl.Long,
+      jl.Double, jl.Float, java.math.BigDecimal,
+      String, Array[Byte], Date, Timestamp,
+      Seq[Int])](
+    (false, 1.toByte, 1.toShort, 1, 1L, 1.0, 1.0f, dec1, "s1", "b1".getBytes, d1, t1, null),
+    (true, 2.toByte, 3.toShort, 4, 5L, 6.0, 7.0f, dec2, "ss9", "bb0".getBytes, d2, t2, null),
+    (null, null, null, null, null, null, null, null, null, null, null, null, null)
+  )
+
+  /** A mapping from column to the stats collected. */
+  protected val stats = mutable.LinkedHashMap(
+    "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1),
+    "cbyte" -> ColumnStat(2, Some(1.toByte), Some(2.toByte), 1, 1, 1),
+    "cshort" -> ColumnStat(2, Some(1.toShort), Some(3.toShort), 1, 2, 2),
+    "cint" -> ColumnStat(2, Some(1), Some(4), 1, 4, 4),
+    "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8),
+    "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8),
+    "cfloat" -> ColumnStat(2, Some(1.0f), Some(7.0f), 1, 4, 4),
+    "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 16, 16),
+    "cstring" -> ColumnStat(2, None, None, 1, 3, 3),
+    "cbinary" -> ColumnStat(2, None, None, 1, 3, 3),
+    "cdate" -> ColumnStat(2, Some(DateTimeUtils.fromJavaDate(d1)),
+      Some(DateTimeUtils.fromJavaDate(d2)), 1, 4, 4),
+    "ctimestamp" -> ColumnStat(2, Some(DateTimeUtils.fromJavaTimestamp(t1)),
+      Some(DateTimeUtils.fromJavaTimestamp(t2)), 1, 8, 8)
+  )
+
+  private val randomName = new Random(31)
+
+  def getCatalogTable(tableName: String): CatalogTable = {
+    spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
+  }
+
+  def getCatalogStatistics(tableName: String): CatalogStatistics = {
+    getCatalogTable(tableName).stats.get
+  }
+
+  def checkTableStats(
+      tableName: String,
+      hasSizeInBytes: Boolean,
+      expectedRowCounts: Option[Int]): Option[CatalogStatistics] = {
+    val stats = getCatalogTable(tableName).stats
+    if (hasSizeInBytes || expectedRowCounts.nonEmpty) {
+      assert(stats.isDefined)
+      assert(stats.get.sizeInBytes >= 0)
+      assert(stats.get.rowCount === expectedRowCounts)
+    } else {
+      assert(stats.isEmpty)
+    }
+
+    stats
+  }
+
+  /**
+   * Compute column stats for the given DataFrame and compare it with colStats.
+   */
+  def checkColStats(
+      df: DataFrame,
+      colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
+    val tableName = "column_stats_test_" + randomName.nextInt(1000)
+    withTable(tableName) {
+      df.write.saveAsTable(tableName)
+
+      // Collect statistics
+      sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " +
+        colStats.keys.mkString(", "))
+
+      // Validate statistics
+      val table = getCatalogTable(tableName)
+      assert(table.stats.isDefined)
+      assert(table.stats.get.colStats.size == colStats.size)
+
+      colStats.foreach { case (k, v) =>
+        withClue(s"column $k") {
+          assert(table.stats.get.colStats(k) == v)
+        }
+      }
+    }
+  }
+
+  // This test will be run twice: with and without Hive support
+  test("SPARK-18856: non-empty partitioned table should not report zero size") {
+    withTable("ds_tbl", "hive_tbl") {
+      spark.range(100).select($"id", $"id" % 5 as "p").write.partitionBy("p").saveAsTable("ds_tbl")
+      val stats = spark.table("ds_tbl").queryExecution.optimizedPlan.stats
+      assert(stats.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
+
+      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+        sql("CREATE TABLE hive_tbl(i int) PARTITIONED BY (j int)")
+        sql("INSERT INTO hive_tbl PARTITION(j=1) SELECT 1")
+        val stats2 = spark.table("hive_tbl").queryExecution.optimizedPlan.stats
+        assert(stats2.sizeInBytes > 0, "non-empty partitioned table should not report zero size.")
+      }
+    }
+  }
+
+  // This test will be run twice: with and without Hive support
+  test("conversion from CatalogStatistics to Statistics") {
+    withTable("ds_tbl", "hive_tbl") {
+      // Test data source table
+      checkStatsConversion(tableName = "ds_tbl", isDatasourceTable = true)
+      // Test hive serde table
+      if (spark.conf.get(StaticSQLConf.CATALOG_IMPLEMENTATION) == "hive") {
+        checkStatsConversion(tableName = "hive_tbl", isDatasourceTable = false)
+      }
+    }
+  }
+
+  private def checkStatsConversion(tableName: String, isDatasourceTable: Boolean): Unit = {
+    // Create an empty table and run analyze command on it.
+    val createTableSql = if (isDatasourceTable) {
+      s"CREATE TABLE $tableName (c1 INT, c2 STRING) USING PARQUET"
+    } else {
+      s"CREATE TABLE $tableName (c1 INT, c2 STRING)"
+    }
+    sql(createTableSql)
+    // Analyze only one column.
+    sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS c1")
+    val (relation, catalogTable) = spark.table(tableName).queryExecution.analyzed.collect {
+      case catalogRel: CatalogRelation => (catalogRel, catalogRel.tableMeta)
+      case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
+    }.head
+    val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
+    // Check catalog statistics
+    assert(catalogTable.stats.isDefined)
+    assert(catalogTable.stats.get.sizeInBytes == 0)
+    assert(catalogTable.stats.get.rowCount == Some(0))
+    assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
+
+    // Check relation statistics
+    assert(relation.stats.sizeInBytes == 0)
+    assert(relation.stats.rowCount == Some(0))
+    assert(relation.stats.attributeStats.size == 1)
+    val (attribute, colStat) = relation.stats.attributeStats.head
+    assert(attribute.name == "c1")
+    assert(colStat == emptyColStat)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/b8e4d567/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index c601038..e00fa64 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -25,7 +25,7 @@ import scala.util.matching.Regex
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics, CatalogTable}
+import org.apache.spark.sql.catalyst.catalog.{CatalogRelation, CatalogStatistics}
 import org.apache.spark.sql.catalyst.util.StringUtils
 import org.apache.spark.sql.execution.command.DDLUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
@@ -33,7 +33,6 @@ import org.apache.spark.sql.execution.joins._
 import org.apache.spark.sql.hive.HiveExternalCatalog._
 import org.apache.spark.sql.hive.test.TestHiveSingleton
 import org.apache.spark.sql.internal.SQLConf
-import org.apache.spark.sql.types._
 
 
 class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleton {
@@ -82,58 +81,42 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes
 
     // Non-partitioned table
-    sql("CREATE TABLE analyzeTable (key STRING, value STRING)").collect()
-    sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
-    sql("INSERT INTO TABLE analyzeTable SELECT * FROM src").collect()
+    val nonPartTable = "non_part_table"
+    withTable(nonPartTable) {
+      sql(s"CREATE TABLE $nonPartTable (key STRING, value STRING)")
+      sql(s"INSERT INTO TABLE $nonPartTable SELECT * FROM src")
+      sql(s"INSERT INTO TABLE $nonPartTable SELECT * FROM src")
 
-    sql("ANALYZE TABLE analyzeTable COMPUTE STATISTICS noscan")
+      sql(s"ANALYZE TABLE $nonPartTable COMPUTE STATISTICS noscan")
 
-    assert(queryTotalSize("analyzeTable") === BigInt(11624))
-
-    sql("DROP TABLE analyzeTable").collect()
+      assert(queryTotalSize(nonPartTable) === BigInt(11624))
+    }
 
     // Partitioned table
-    sql(
-      """
-        |CREATE TABLE analyzeTable_part (key STRING, value STRING) PARTITIONED BY (ds STRING)
-      """.stripMargin).collect()
-    sql(
-      """
-        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-01')
-        |SELECT * FROM src
-      """.stripMargin).collect()
-    sql(
-      """
-        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-02')
-        |SELECT * FROM src
-      """.stripMargin).collect()
-    sql(
-      """
-        |INSERT INTO TABLE analyzeTable_part PARTITION (ds='2010-01-03')
-        |SELECT * FROM src
-      """.stripMargin).collect()
+    val partTable = "part_table"
+    withTable(partTable) {
+      sql(s"CREATE TABLE $partTable (key STRING, value STRING) PARTITIONED BY (ds STRING)")
+      sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-01') SELECT * FROM src")
+      sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-02') SELECT * FROM src")
+      sql(s"INSERT INTO TABLE $partTable PARTITION (ds='2010-01-03') SELECT * FROM src")
 
-    assert(queryTotalSize("analyzeTable_part") === spark.sessionState.conf.defaultSizeInBytes)
+      assert(queryTotalSize(partTable) === spark.sessionState.conf.defaultSizeInBytes)
 
-    sql("ANALYZE TABLE analyzeTable_part COMPUTE STATISTICS noscan")
+      sql(s"ANALYZE TABLE $partTable COMPUTE STATISTICS noscan")
 
-    assert(queryTotalSize("analyzeTable_part") === BigInt(17436))
-
-    sql("DROP TABLE analyzeTable_part").collect()
+      assert(queryTotalSize(partTable) === BigInt(17436))
+    }
 
     // Try to analyze a temp table
-    sql("""SELECT * FROM src""").createOrReplaceTempView("tempTable")
-    intercept[AnalysisException] {
-      sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
+    withView("tempTable") {
+      sql("""SELECT * FROM src""").createOrReplaceTempView("tempTable")
+      intercept[AnalysisException] {
+        sql("ANALYZE TABLE tempTable COMPUTE STATISTICS")
+      }
     }
-    spark.sessionState.catalog.dropTable(
-      TableIdentifier("tempTable"), ignoreIfNotExists = true, purge = false)
   }
 
   test("SPARK-21079 - analyze table with location different than that of individual partitions") {
-    def queryTotalSize(tableName: String): BigInt =
-      spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes
-
     val tableName = "analyzeTable_part"
     withTable(tableName) {
       withTempPath { path =>
@@ -148,15 +131,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
         sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
 
-        assert(queryTotalSize(tableName) === BigInt(17436))
+        assert(getCatalogStatistics(tableName).sizeInBytes === BigInt(17436))
       }
     }
   }
 
   test("SPARK-21079 - analyze partitioned table with only a subset of partitions visible") {
-    def queryTotalSize(tableName: String): BigInt =
-      spark.table(tableName).queryExecution.analyzed.stats.sizeInBytes
-
     val sourceTableName = "analyzeTable_part"
     val tableName = "analyzeTable_part_vis"
     withTable(sourceTableName, tableName) {
@@ -188,39 +168,19 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
           // Register only one of the partitions found on disk
           val ds = partitionDates.head
-          sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds')").collect()
+          sql(s"ALTER TABLE $tableName ADD PARTITION (ds='$ds')")
 
           // Analyze original table - expect 3 partitions
           sql(s"ANALYZE TABLE $sourceTableName COMPUTE STATISTICS noscan")
-          assert(queryTotalSize(sourceTableName) === BigInt(3 * 5812))
+          assert(getCatalogStatistics(sourceTableName).sizeInBytes === BigInt(3 * 5812))
 
           // Analyze partial-copy table - expect only 1 partition
           sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS noscan")
-          assert(queryTotalSize(tableName) === BigInt(5812))
+          assert(getCatalogStatistics(tableName).sizeInBytes === BigInt(5812))
         }
     }
   }
 
-  test("analyzing views is not supported") {
-    def assertAnalyzeUnsupported(analyzeCommand: String): Unit = {
-      val err = intercept[AnalysisException] {
-        sql(analyzeCommand)
-      }
-      assert(err.message.contains("ANALYZE TABLE is not supported"))
-    }
-
-    val tableName = "tbl"
-    withTable(tableName) {
-      spark.range(10).write.saveAsTable(tableName)
-      val viewName = "view"
-      withView(viewName) {
-        sql(s"CREATE VIEW $viewName AS SELECT * FROM $tableName")
-        assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS")
-        assertAnalyzeUnsupported(s"ANALYZE TABLE $viewName COMPUTE STATISTICS FOR COLUMNS id")
-      }
-    }
-  }
-
   test("test table-level statistics for hive tables created in HiveExternalCatalog") {
     val textTable = "textTable"
     withTable(textTable) {
@@ -290,8 +250,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     if (analyzedByHive) hiveClient.runSqlHive(s"ANALYZE TABLE $tabName COMPUTE STATISTICS")
     val describeResult1 = hiveClient.runSqlHive(s"DESCRIBE FORMATTED $tabName")
 
-    val tableMetadata =
-      spark.sessionState.catalog.getTableMetadata(TableIdentifier(tabName)).properties
+    val tableMetadata = getCatalogTable(tabName).properties
     // statistics info is not contained in the metadata of the original table
     assert(Seq(StatsSetupConst.COLUMN_STATS_ACCURATE,
       StatsSetupConst.NUM_FILES,
@@ -327,8 +286,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     val tabName = "tab1"
     withTable(tabName) {
       createNonPartitionedTable(tabName, analyzedByHive = false, analyzedBySpark = false)
-      checkTableStats(
-        tabName, hasSizeInBytes = true, expectedRowCounts = None)
+      checkTableStats(tabName, hasSizeInBytes = true, expectedRowCounts = None)
 
       // ALTER TABLE SET TBLPROPERTIES invalidates some contents of Hive specific statistics
       // This is triggered by the Hive alterTable API
@@ -370,10 +328,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
   }
 
   test("alter table should not have the side effect to store statistics in Spark side") {
-    def getCatalogTable(tableName: String): CatalogTable = {
-      spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
-    }
-
     val table = "alter_table_side_effect"
     withTable(table) {
       sql(s"CREATE TABLE $table (i string, j string)")
@@ -637,12 +591,12 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
 
       // the default value for `spark.sql.hive.convertMetastoreParquet` is true, here we just set it
       // for robustness
-      withSQLConf("spark.sql.hive.convertMetastoreParquet" -> "true") {
+      withSQLConf(HiveUtils.CONVERT_METASTORE_PARQUET.key -> "true") {
         checkTableStats(parquetTable, hasSizeInBytes = false, expectedRowCounts = None)
         sql(s"ANALYZE TABLE $parquetTable COMPUTE STATISTICS")
         checkTableStats(parquetTable, hasSizeInBytes = true, expectedRowCounts = Some(500))
       }
-      withSQLConf("spark.sql.hive.convertMetastoreOrc" -> "true") {
+      withSQLConf(HiveUtils.CONVERT_METASTORE_ORC.key -> "true") {
         // We still can get tableSize from Hive before Analyze
         checkTableStats(orcTable, hasSizeInBytes = true, expectedRowCounts = None)
         sql(s"ANALYZE TABLE $orcTable COMPUTE STATISTICS")
@@ -759,8 +713,7 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val parquetTable = "parquetTable"
       withTable(parquetTable) {
         sql(createTableCmd)
-        val catalogTable = spark.sessionState.catalog.getTableMetadata(
-          TableIdentifier(parquetTable))
+        val catalogTable = getCatalogTable(parquetTable)
         assert(DDLUtils.isDatasourceTable(catalogTable))
 
         // Add a filter to avoid creating too many partitions
@@ -795,17 +748,6 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     "partitioned data source table",
     "CREATE TABLE parquetTable (key STRING, value STRING) USING PARQUET PARTITIONED BY (key)")
 
-  test("statistics collection of a table with zero column") {
-    val table_no_cols = "table_no_cols"
-    withTable(table_no_cols) {
-      val rddNoCols = sparkContext.parallelize(1 to 10).map(_ => Row.empty)
-      val dfNoCols = spark.createDataFrame(rddNoCols, StructType(Seq.empty))
-      dfNoCols.write.format("json").saveAsTable(table_no_cols)
-      sql(s"ANALYZE TABLE $table_no_cols COMPUTE STATISTICS")
-      checkTableStats(table_no_cols, hasSizeInBytes = true, expectedRowCounts = Some(10))
-    }
-  }
-
   /** Used to test refreshing cached metadata once table stats are updated. */
   private def getStatsBeforeAfterUpdate(isAnalyzeColumns: Boolean)
     : (CatalogStatistics, CatalogStatistics) = {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org