You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/27 07:37:36 UTC
[1/3] spark git commit: [SPARK-23445] ColumnStat refactoring
Repository: spark
Updated Branches:
refs/heads/master 7ec83658f -> 8077bb04f
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1ee1d57..28c340a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -663,14 +663,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
requireTableExists(db, table)
val rawTable = getRawTable(db, table)
- // For datasource tables and hive serde tables created by spark 2.1 or higher,
- // the data schema is stored in the table properties.
- val schema = restoreTableMetadata(rawTable).schema
-
// convert table statistics to properties so that we can persist them through hive client
val statsProperties =
if (stats.isDefined) {
- statsToProperties(stats.get, schema)
+ statsToProperties(stats.get)
} else {
new mutable.HashMap[String, String]()
}
@@ -1028,9 +1024,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
currentFullPath
}
- private def statsToProperties(
- stats: CatalogStatistics,
- schema: StructType): Map[String, String] = {
+ private def statsToProperties(stats: CatalogStatistics): Map[String, String] = {
val statsProperties = new mutable.HashMap[String, String]()
statsProperties += STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()
@@ -1038,11 +1032,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
}
- val colNameTypeMap: Map[String, DataType] =
- schema.fields.map(f => (f.name, f.dataType)).toMap
stats.colStats.foreach { case (colName, colStat) =>
- colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
- statsProperties += (columnStatKeyPropName(colName, k) -> v)
+ colStat.toMap(colName).foreach { case (k, v) =>
+ // Fully qualified name used in table properties for a particular column stat.
+ // For example, for column "mycol", and "min" stat, this should return
+ // "spark.sql.statistics.colStats.mycol.min".
+ statsProperties += (STATISTICS_COL_STATS_PREFIX + k -> v)
}
}
@@ -1058,23 +1053,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
if (statsProps.isEmpty) {
None
} else {
+ val colStats = new mutable.HashMap[String, CatalogColumnStat]
+ val colStatsProps = properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map {
+ case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
+ }
- val colStats = new mutable.HashMap[String, ColumnStat]
-
- // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
- // but given the number of columns it usually not enormous, this is probably OK as a start.
- // If we want to map this a linear operation, we'd need a stronger contract between the
- // naming convention used for serialization.
- schema.foreach { field =>
- if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) {
- // If "version" field is defined, then the column stat is defined.
- val keyPrefix = columnStatKeyPropName(field.name, "")
- val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
- (k.drop(keyPrefix.length), v)
- }
- ColumnStat.fromMap(table, field, colStatMap).foreach { cs =>
- colStats += field.name -> cs
- }
+ // Find all the column names by matching the KEY_VERSION properties for them.
+ colStatsProps.keys.filter {
+ k => k.endsWith(CatalogColumnStat.KEY_VERSION)
+ }.map { k =>
+ k.dropRight(CatalogColumnStat.KEY_VERSION.length + 1)
+ }.foreach { fieldName =>
+ // and for each, create a column stat.
+ CatalogColumnStat.fromMap(table, fieldName, colStatsProps).foreach { cs =>
+ colStats += fieldName -> cs
}
}
@@ -1093,14 +1085,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
val rawTable = getRawTable(db, table)
- // For datasource tables and hive serde tables created by spark 2.1 or higher,
- // the data schema is stored in the table properties.
- val schema = restoreTableMetadata(rawTable).schema
-
// convert partition statistics to properties so that we can persist them through hive api
val withStatsProps = lowerCasedParts.map { p =>
if (p.stats.isDefined) {
- val statsProperties = statsToProperties(p.stats.get, schema)
+ val statsProperties = statsToProperties(p.stats.get)
p.copy(parameters = p.parameters ++ statsProperties)
} else {
p
@@ -1310,15 +1298,6 @@ object HiveExternalCatalog {
val EMPTY_DATA_SCHEMA = new StructType()
.add("col", "array<string>", nullable = true, comment = "from deserializer")
- /**
- * Returns the fully qualified name used in table properties for a particular column stat.
- * For example, for column "mycol", and "min" stat, this should return
- * "spark.sql.statistics.colStats.mycol.min".
- */
- private def columnStatKeyPropName(columnName: String, statKey: String): String = {
- STATISTICS_COL_STATS_PREFIX + columnName + "." + statKey
- }
-
// A persisted data source table always store its schema in the catalog.
private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
val errorMessage = "Could not read schema from the hive metastore because it is corrupted."
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 3af8af0..61cec82 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
import org.apache.spark.sql._
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, HiveTableRelation}
import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, HistogramBin, HistogramSerializer}
import org.apache.spark.sql.catalyst.util.{DateTimeUtils, StringUtils}
import org.apache.spark.sql.execution.command.DDLUtils
@@ -177,8 +177,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val fetchedStats0 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
assert(fetchedStats0.get.colStats == Map(
- "a" -> ColumnStat(2, Some(1), Some(2), 0, 4, 4),
- "b" -> ColumnStat(1, Some(1), Some(1), 0, 4, 4)))
+ "a" -> CatalogColumnStat(Some(2), Some("1"), Some("2"), Some(0), Some(4), Some(4)),
+ "b" -> CatalogColumnStat(Some(1), Some("1"), Some("1"), Some(0), Some(4), Some(4))))
}
}
@@ -208,8 +208,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val fetchedStats1 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
assert(fetchedStats1.colStats == Map(
- "C1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
- avgLen = 4, maxLen = 4)))
+ "C1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))))
}
}
@@ -596,7 +596,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
val fetchedStats0 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
- assert(fetchedStats0.get.colStats == Map("c1" -> ColumnStat(0, None, None, 0, 4, 4)))
+ assert(fetchedStats0.get.colStats ==
+ Map("c1" -> CatalogColumnStat(Some(0), None, None, Some(0), Some(4), Some(4))))
// Insert new data and analyze: have the latest column stats.
sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
@@ -604,18 +605,18 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val fetchedStats1 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
assert(fetchedStats1.colStats == Map(
- "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
- avgLen = 4, maxLen = 4)))
+ "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))))
// Analyze another column: since the table is not changed, the precious column stats are kept.
sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2")
val fetchedStats2 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
assert(fetchedStats2.colStats == Map(
- "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
- avgLen = 4, maxLen = 4),
- "c2" -> ColumnStat(distinctCount = 1, min = None, max = None, nullCount = 0,
- avgLen = 1, maxLen = 1)))
+ "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ "c2" -> CatalogColumnStat(distinctCount = Some(1), min = None, max = None,
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))))
// Insert new data and analyze: stale column stats are removed and newly collected column
// stats are added.
@@ -624,10 +625,10 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
val fetchedStats3 =
checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)).get
assert(fetchedStats3.colStats == Map(
- "c1" -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
- avgLen = 4, maxLen = 4),
- "c3" -> ColumnStat(distinctCount = 2, min = Some(10.0), max = Some(20.0), nullCount = 0,
- avgLen = 8, maxLen = 8)))
+ "c1" -> CatalogColumnStat(distinctCount = Some(2), min = Some("1"), max = Some("2"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ "c3" -> CatalogColumnStat(distinctCount = Some(2), min = Some("10.0"), max = Some("20.0"),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))))
}
}
@@ -999,115 +1000,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
test("verify serialized column stats after analyzing columns") {
import testImplicits._
- val tableName = "column_stats_test2"
+ val tableName = "column_stats_test_ser"
// (data.head.productArity - 1) because the last column does not support stats collection.
assert(stats.size == data.head.productArity - 1)
val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
- val expectedSerializedColStats = Map(
- "spark.sql.statistics.colStats.cbinary.avgLen" -> "3",
- "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cbinary.maxLen" -> "3",
- "spark.sql.statistics.colStats.cbinary.nullCount" -> "1",
- "spark.sql.statistics.colStats.cbinary.version" -> "1",
- "spark.sql.statistics.colStats.cbool.avgLen" -> "1",
- "spark.sql.statistics.colStats.cbool.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cbool.max" -> "true",
- "spark.sql.statistics.colStats.cbool.maxLen" -> "1",
- "spark.sql.statistics.colStats.cbool.min" -> "false",
- "spark.sql.statistics.colStats.cbool.nullCount" -> "1",
- "spark.sql.statistics.colStats.cbool.version" -> "1",
- "spark.sql.statistics.colStats.cbyte.avgLen" -> "1",
- "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cbyte.max" -> "2",
- "spark.sql.statistics.colStats.cbyte.maxLen" -> "1",
- "spark.sql.statistics.colStats.cbyte.min" -> "1",
- "spark.sql.statistics.colStats.cbyte.nullCount" -> "1",
- "spark.sql.statistics.colStats.cbyte.version" -> "1",
- "spark.sql.statistics.colStats.cdate.avgLen" -> "4",
- "spark.sql.statistics.colStats.cdate.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09",
- "spark.sql.statistics.colStats.cdate.maxLen" -> "4",
- "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08",
- "spark.sql.statistics.colStats.cdate.nullCount" -> "1",
- "spark.sql.statistics.colStats.cdate.version" -> "1",
- "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16",
- "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000",
- "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16",
- "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000",
- "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1",
- "spark.sql.statistics.colStats.cdecimal.version" -> "1",
- "spark.sql.statistics.colStats.cdouble.avgLen" -> "8",
- "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cdouble.max" -> "6.0",
- "spark.sql.statistics.colStats.cdouble.maxLen" -> "8",
- "spark.sql.statistics.colStats.cdouble.min" -> "1.0",
- "spark.sql.statistics.colStats.cdouble.nullCount" -> "1",
- "spark.sql.statistics.colStats.cdouble.version" -> "1",
- "spark.sql.statistics.colStats.cfloat.avgLen" -> "4",
- "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cfloat.max" -> "7.0",
- "spark.sql.statistics.colStats.cfloat.maxLen" -> "4",
- "spark.sql.statistics.colStats.cfloat.min" -> "1.0",
- "spark.sql.statistics.colStats.cfloat.nullCount" -> "1",
- "spark.sql.statistics.colStats.cfloat.version" -> "1",
- "spark.sql.statistics.colStats.cint.avgLen" -> "4",
- "spark.sql.statistics.colStats.cint.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cint.max" -> "4",
- "spark.sql.statistics.colStats.cint.maxLen" -> "4",
- "spark.sql.statistics.colStats.cint.min" -> "1",
- "spark.sql.statistics.colStats.cint.nullCount" -> "1",
- "spark.sql.statistics.colStats.cint.version" -> "1",
- "spark.sql.statistics.colStats.clong.avgLen" -> "8",
- "spark.sql.statistics.colStats.clong.distinctCount" -> "2",
- "spark.sql.statistics.colStats.clong.max" -> "5",
- "spark.sql.statistics.colStats.clong.maxLen" -> "8",
- "spark.sql.statistics.colStats.clong.min" -> "1",
- "spark.sql.statistics.colStats.clong.nullCount" -> "1",
- "spark.sql.statistics.colStats.clong.version" -> "1",
- "spark.sql.statistics.colStats.cshort.avgLen" -> "2",
- "spark.sql.statistics.colStats.cshort.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cshort.max" -> "3",
- "spark.sql.statistics.colStats.cshort.maxLen" -> "2",
- "spark.sql.statistics.colStats.cshort.min" -> "1",
- "spark.sql.statistics.colStats.cshort.nullCount" -> "1",
- "spark.sql.statistics.colStats.cshort.version" -> "1",
- "spark.sql.statistics.colStats.cstring.avgLen" -> "3",
- "spark.sql.statistics.colStats.cstring.distinctCount" -> "2",
- "spark.sql.statistics.colStats.cstring.maxLen" -> "3",
- "spark.sql.statistics.colStats.cstring.nullCount" -> "1",
- "spark.sql.statistics.colStats.cstring.version" -> "1",
- "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8",
- "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2",
- "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 00:00:02.0",
- "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
- "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.0",
- "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
- "spark.sql.statistics.colStats.ctimestamp.version" -> "1"
- )
-
- val expectedSerializedHistograms = Map(
- "spark.sql.statistics.colStats.cbyte.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("cbyte").histogram.get),
- "spark.sql.statistics.colStats.cshort.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("cshort").histogram.get),
- "spark.sql.statistics.colStats.cint.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("cint").histogram.get),
- "spark.sql.statistics.colStats.clong.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("clong").histogram.get),
- "spark.sql.statistics.colStats.cdouble.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("cdouble").histogram.get),
- "spark.sql.statistics.colStats.cfloat.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("cfloat").histogram.get),
- "spark.sql.statistics.colStats.cdecimal.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("cdecimal").histogram.get),
- "spark.sql.statistics.colStats.cdate.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get),
- "spark.sql.statistics.colStats.ctimestamp.histogram" ->
- HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get)
- )
-
def checkColStatsProps(expected: Map[String, String]): Unit = {
sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
val table = hiveClient.getTable("default", tableName)
@@ -1129,6 +1026,29 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
}
}
+ test("verify column stats can be deserialized from tblproperties") {
+ import testImplicits._
+
+ val tableName = "column_stats_test_de"
+ // (data.head.productArity - 1) because the last column does not support stats collection.
+ assert(stats.size == data.head.productArity - 1)
+ val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+
+ withTable(tableName) {
+ df.write.saveAsTable(tableName)
+
+ // Put in stats properties manually.
+ val table = getCatalogTable(tableName)
+ val newTable = table.copy(
+ properties = table.properties ++
+ expectedSerializedColStats ++ expectedSerializedHistograms +
+ ("spark.sql.statistics.totalSize" -> "1") /* totalSize always required */)
+ hiveClient.alterTable(newTable)
+
+ validateColStats(tableName, statsWithHgms)
+ }
+ }
+
test("serialization and deserialization of histograms to/from hive metastore") {
import testImplicits._
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[3/3] spark git commit: [SPARK-23445] ColumnStat refactoring
Posted by li...@apache.org.
[SPARK-23445] ColumnStat refactoring
## What changes were proposed in this pull request?
Refactor ColumnStat to be more flexible.
* Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is split from `Statistics`. This detaches how the statistics are stored from how they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` as `String`, making it not depend on dataType information.
* For `CatalogColumnStat`, parse column names from property names in the metastore (`KEY_VERSION` property), not from metastore schema. This means that `CatalogColumnStat`s can be created for columns even if the schema itself is not stored in the metastore.
* Make all fields optional. `min`, `max` and `histogram` for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate.
The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans.
## How was this patch tested?
Refactored existing tests to work with refactored `ColumnStat` and `CatalogColumnStat`.
New tests added in `StatisticsSuite` checking that backwards / forwards compatibility is not broken.
Author: Juliusz Sompolski <ju...@databricks.com>
Closes #20624 from juliuszsompolski/SPARK-23445.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8077bb04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8077bb04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8077bb04
Branch: refs/heads/master
Commit: 8077bb04f350fd35df83ef896135c0672dc3f7b0
Parents: 7ec8365
Author: Juliusz Sompolski <ju...@databricks.com>
Authored: Mon Feb 26 23:37:31 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Feb 26 23:37:31 2018 -0800
----------------------------------------------------------------------
.../spark/sql/catalyst/catalog/interface.scala | 146 +++++++++-
.../optimizer/StarSchemaDetection.scala | 6 +-
.../sql/catalyst/plans/logical/Statistics.scala | 256 ++---------------
.../statsEstimation/AggregateEstimation.scala | 6 +-
.../statsEstimation/EstimationUtils.scala | 20 +-
.../statsEstimation/FilterEstimation.scala | 98 ++++---
.../statsEstimation/JoinEstimation.scala | 55 ++--
.../catalyst/optimizer/JoinReorderSuite.scala | 25 +-
.../StarJoinCostBasedReorderSuite.scala | 96 +++----
.../optimizer/StarJoinReorderSuite.scala | 77 ++---
.../AggregateEstimationSuite.scala | 24 +-
.../BasicStatsEstimationSuite.scala | 12 +-
.../statsEstimation/FilterEstimationSuite.scala | 279 ++++++++++---------
.../statsEstimation/JoinEstimationSuite.scala | 138 +++++----
.../ProjectEstimationSuite.scala | 70 +++--
.../StatsEstimationTestBase.scala | 10 +-
.../command/AnalyzeColumnCommand.scala | 138 ++++++++-
.../spark/sql/execution/command/tables.scala | 9 +-
.../spark/sql/StatisticsCollectionSuite.scala | 9 +-
.../sql/StatisticsCollectionTestBase.scala | 168 +++++++++--
.../spark/sql/hive/HiveExternalCatalog.scala | 63 ++---
.../apache/spark/sql/hive/StatisticsSuite.scala | 162 +++--------
22 files changed, 995 insertions(+), 872 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 95b6fbb..f3e67dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,7 +21,9 @@ import java.net.URI
import java.util.Date
import scala.collection.mutable
+import scala.util.control.NonFatal
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
@@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
/**
@@ -361,7 +363,7 @@ object CatalogTable {
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
- colStats: Map[String, ColumnStat] = Map.empty) {
+ colStats: Map[String, CatalogColumnStat] = Map.empty) {
/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
@@ -369,7 +371,8 @@ case class CatalogStatistics(
*/
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled && rowCount.isDefined) {
- val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
+ val attrStats = AttributeMap(planOutput
+ .flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
@@ -387,6 +390,143 @@ case class CatalogStatistics(
}
}
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore.
+ */
+case class CatalogColumnStat(
+ distinctCount: Option[BigInt] = None,
+ min: Option[String] = None,
+ max: Option[String] = None,
+ nullCount: Option[BigInt] = None,
+ avgLen: Option[Long] = None,
+ maxLen: Option[Long] = None,
+ histogram: Option[Histogram] = None) {
+
+ /**
+ * Returns a map from string to string that can be used to serialize the column stats.
+ * The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
+ * and the value is the string representation for the value.
+ * min/max values are stored as Strings. They can be deserialized using
+ * [[CatalogColumnStat.fromExternalString]].
+ *
+ * As part of the protocol, the returned map always contains a key called "version".
+ * Any of the fields that are null (None) won't appear in the map.
+ */
+ def toMap(colName: String): Map[String, String] = {
+ val map = new scala.collection.mutable.HashMap[String, String]
+ map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+ distinctCount.foreach { v =>
+ map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
+ }
+ nullCount.foreach { v =>
+ map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString)
+ }
+ avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
+ maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
+ min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
+ max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
+ histogram.foreach { h =>
+ map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
+ }
+ map.toMap
+ }
+
+ /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
+ def toPlanStat(
+ colName: String,
+ dataType: DataType): ColumnStat =
+ ColumnStat(
+ distinctCount = distinctCount,
+ min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+ max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+ nullCount = nullCount,
+ avgLen = avgLen,
+ maxLen = maxLen,
+ histogram = histogram)
+}
+
+object CatalogColumnStat extends Logging {
+
+ // List of string keys used to serialize CatalogColumnStat
+ val KEY_VERSION = "version"
+ private val KEY_DISTINCT_COUNT = "distinctCount"
+ private val KEY_MIN_VALUE = "min"
+ private val KEY_MAX_VALUE = "max"
+ private val KEY_NULL_COUNT = "nullCount"
+ private val KEY_AVG_LEN = "avgLen"
+ private val KEY_MAX_LEN = "maxLen"
+ private val KEY_HISTOGRAM = "histogram"
+
+ /**
+ * Converts from string representation of data type to the corresponding Catalyst data type.
+ */
+ def fromExternalString(s: String, name: String, dataType: DataType): Any = {
+ dataType match {
+ case BooleanType => s.toBoolean
+ case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
+ case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+ case ByteType => s.toByte
+ case ShortType => s.toShort
+ case IntegerType => s.toInt
+ case LongType => s.toLong
+ case FloatType => s.toFloat
+ case DoubleType => s.toDouble
+ case _: DecimalType => Decimal(s)
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case BinaryType | StringType => null
+ case _ =>
+ throw new AnalysisException("Column statistics deserialization is not supported for " +
+ s"column $name of data type: $dataType.")
+ }
+ }
+
+ /**
+ * Converts the given value from Catalyst data type to string representation of external
+ * data type.
+ */
+ def toExternalString(v: Any, colName: String, dataType: DataType): String = {
+ val externalValue = dataType match {
+ case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
+ case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
+ case BooleanType | _: IntegralType | FloatType | DoubleType => v
+ case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case _ =>
+ throw new AnalysisException("Column statistics serialization is not supported for " +
+ s"column $colName of data type: $dataType.")
+ }
+ externalValue.toString
+ }
+
+
+ /**
+ * Creates a [[CatalogColumnStat]] object from the given map.
+ * This is used to deserialize column stats from some external storage.
+ * The serialization side is defined in [[CatalogColumnStat.toMap]].
+ */
+ def fromMap(
+ table: String,
+ colName: String,
+ map: Map[String, String]): Option[CatalogColumnStat] = {
+
+ try {
+ Some(CatalogColumnStat(
+ distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)),
+ min = map.get(s"${colName}.${KEY_MIN_VALUE}"),
+ max = map.get(s"${colName}.${KEY_MAX_VALUE}"),
+ nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
+ avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
+ maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
+ histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
+ ))
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e)
+ None
+ }
+ }
+}
+
case class CatalogTableType private(name: String)
object CatalogTableType {
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
index 1f20b76..2aa762e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
@@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper {
stats.rowCount match {
case Some(rowCount) if rowCount >= 0 =>
if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
- val colStats = stats.attributeStats.get(col)
- if (colStats.get.nullCount > 0) {
+ val colStats = stats.attributeStats.get(col).get
+ if (!colStats.hasCountStats || colStats.nullCount.get > 0) {
false
} else {
- val distinctCount = colStats.get.distinctCount
+ val distinctCount = colStats.distinctCount.get
val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
// ndvMaxErr adjusted based on TPCDS 1TB data results
relDiff <= conf.ndvMaxError * 2
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 96b199d..b3a4886 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -27,6 +27,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils}
@@ -79,11 +80,10 @@ case class Statistics(
/**
* Statistics collected for a column.
*
- * 1. Supported data types are defined in `ColumnStat.supportsType`.
- * 2. The JVM data type stored in min/max is the internal data type for the corresponding
+ * 1. The JVM data type stored in min/max is the internal data type for the corresponding
* Catalyst data type. For example, the internal type of DateType is Int, and that the internal
* type of TimestampType is Long.
- * 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
+ * 2. There is no guarantee that the statistics collected are accurate. Approximation algorithms
* (sketches) might have been used, and the data collected can also be stale.
*
* @param distinctCount number of distinct values
@@ -95,240 +95,32 @@ case class Statistics(
* @param histogram histogram of the values
*/
case class ColumnStat(
- distinctCount: BigInt,
- min: Option[Any],
- max: Option[Any],
- nullCount: BigInt,
- avgLen: Long,
- maxLen: Long,
+ distinctCount: Option[BigInt] = None,
+ min: Option[Any] = None,
+ max: Option[Any] = None,
+ nullCount: Option[BigInt] = None,
+ avgLen: Option[Long] = None,
+ maxLen: Option[Long] = None,
histogram: Option[Histogram] = None) {
- // We currently don't store min/max for binary/string type. This can change in the future and
- // then we need to remove this require.
- require(min.isEmpty || (!min.get.isInstanceOf[Array[Byte]] && !min.get.isInstanceOf[String]))
- require(max.isEmpty || (!max.get.isInstanceOf[Array[Byte]] && !max.get.isInstanceOf[String]))
-
- /**
- * Returns a map from string to string that can be used to serialize the column stats.
- * The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
- * representation for the value. min/max values are converted to the external data type. For
- * example, for DateType we store java.sql.Date, and for TimestampType we store
- * java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
- *
- * As part of the protocol, the returned map always contains a key called "version".
- * In the case min/max values are null (None), they won't appear in the map.
- */
- def toMap(colName: String, dataType: DataType): Map[String, String] = {
- val map = new scala.collection.mutable.HashMap[String, String]
- map.put(ColumnStat.KEY_VERSION, "1")
- map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
- map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
- map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
- map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
- min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
- max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
- histogram.foreach { h => map.put(ColumnStat.KEY_HISTOGRAM, HistogramSerializer.serialize(h)) }
- map.toMap
- }
-
- /**
- * Converts the given value from Catalyst data type to string representation of external
- * data type.
- */
- private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
- val externalValue = dataType match {
- case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
- case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
- case BooleanType | _: IntegralType | FloatType | DoubleType => v
- case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
- // This version of Spark does not use min/max for binary/string types so we ignore it.
- case _ =>
- throw new AnalysisException("Column statistics deserialization is not supported for " +
- s"column $colName of data type: $dataType.")
- }
- externalValue.toString
- }
-
-}
+ // Are distinctCount and nullCount statistics defined?
+ val hasCountStats = distinctCount.isDefined && nullCount.isDefined
+ // Are min and max statistics defined?
+ val hasMinMaxStats = min.isDefined && max.isDefined
-object ColumnStat extends Logging {
-
- // List of string keys used to serialize ColumnStat
- val KEY_VERSION = "version"
- private val KEY_DISTINCT_COUNT = "distinctCount"
- private val KEY_MIN_VALUE = "min"
- private val KEY_MAX_VALUE = "max"
- private val KEY_NULL_COUNT = "nullCount"
- private val KEY_AVG_LEN = "avgLen"
- private val KEY_MAX_LEN = "maxLen"
- private val KEY_HISTOGRAM = "histogram"
-
- /** Returns true iff the we support gathering column statistics on column of the given type. */
- def supportsType(dataType: DataType): Boolean = dataType match {
- case _: IntegralType => true
- case _: DecimalType => true
- case DoubleType | FloatType => true
- case BooleanType => true
- case DateType => true
- case TimestampType => true
- case BinaryType | StringType => true
- case _ => false
- }
-
- /** Returns true iff the we support gathering histogram on column of the given type. */
- def supportsHistogram(dataType: DataType): Boolean = dataType match {
- case _: IntegralType => true
- case _: DecimalType => true
- case DoubleType | FloatType => true
- case DateType => true
- case TimestampType => true
- case _ => false
- }
-
- /**
- * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
- * from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
- */
- def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
- try {
- Some(ColumnStat(
- distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
- // Note that flatMap(Option.apply) turns Option(null) into None.
- min = map.get(KEY_MIN_VALUE)
- .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
- max = map.get(KEY_MAX_VALUE)
- .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
- nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
- avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
- maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong,
- histogram = map.get(KEY_HISTOGRAM).map(HistogramSerializer.deserialize)
- ))
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to parse column statistics for column ${field.name} in table $table", e)
- None
- }
- }
-
- /**
- * Converts from string representation of external data type to the corresponding Catalyst data
- * type.
- */
- private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
- dataType match {
- case BooleanType => s.toBoolean
- case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
- case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
- case ByteType => s.toByte
- case ShortType => s.toShort
- case IntegerType => s.toInt
- case LongType => s.toLong
- case FloatType => s.toFloat
- case DoubleType => s.toDouble
- case _: DecimalType => Decimal(s)
- // This version of Spark does not use min/max for binary/string types so we ignore it.
- case BinaryType | StringType => null
- case _ =>
- throw new AnalysisException("Column statistics deserialization is not supported for " +
- s"column $name of data type: $dataType.")
- }
- }
-
- /**
- * Constructs an expression to compute column statistics for a given column.
- *
- * The expression should create a single struct column with the following schema:
- * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long,
- * distinctCountsForIntervals: Array[Long]
- *
- * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and
- * as a result should stay in sync with it.
- */
- def statExprs(
- col: Attribute,
- conf: SQLConf,
- colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = {
- def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr =>
- expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() }
- })
- val one = Literal(1, LongType)
-
- // the approximate ndv (num distinct value) should never be larger than the number of rows
- val numNonNulls = if (col.nullable) Count(col) else Count(one)
- val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls))
- val numNulls = Subtract(Count(one), numNonNulls)
- val defaultSize = Literal(col.dataType.defaultSize, LongType)
- val nullArray = Literal(null, ArrayType(LongType))
-
- def fixedLenTypeStruct: CreateNamedStruct = {
- val genHistogram =
- ColumnStat.supportsHistogram(col.dataType) && colPercentiles.contains(col)
- val intervalNdvsExpr = if (genHistogram) {
- ApproxCountDistinctForIntervals(col,
- Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError)
- } else {
- nullArray
- }
- // For fixed width types, avg size should be the same as max size.
- struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls,
- defaultSize, defaultSize, intervalNdvsExpr)
- }
-
- col.dataType match {
- case _: IntegralType => fixedLenTypeStruct
- case _: DecimalType => fixedLenTypeStruct
- case DoubleType | FloatType => fixedLenTypeStruct
- case BooleanType => fixedLenTypeStruct
- case DateType => fixedLenTypeStruct
- case TimestampType => fixedLenTypeStruct
- case BinaryType | StringType =>
- // For string and binary type, we don't compute min, max or histogram
- val nullLit = Literal(null, col.dataType)
- struct(
- ndv, nullLit, nullLit, numNulls,
- // Set avg/max size to default size if all the values are null or there is no value.
- Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
- Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
- nullArray)
- case _ =>
- throw new AnalysisException("Analyzing column statistics is not supported for column " +
- s"${col.name} of data type: ${col.dataType}.")
- }
- }
-
- /** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */
- def rowToColumnStat(
- row: InternalRow,
- attr: Attribute,
- rowCount: Long,
- percentiles: Option[ArrayData]): ColumnStat = {
- // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins.
- val cs = ColumnStat(
- distinctCount = BigInt(row.getLong(0)),
- // for string/binary min/max, get should return null
- min = Option(row.get(1, attr.dataType)),
- max = Option(row.get(2, attr.dataType)),
- nullCount = BigInt(row.getLong(3)),
- avgLen = row.getLong(4),
- maxLen = row.getLong(5)
- )
- if (row.isNullAt(6)) {
- cs
- } else {
- val ndvs = row.getArray(6).toLongArray()
- assert(percentiles.get.numElements() == ndvs.length + 1)
- val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
- // Construct equi-height histogram
- val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
- HistogramBin(endpoints(i), endpoints(i + 1), ndv)
- }
- val nonNullRows = rowCount - cs.nullCount
- val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
- cs.copy(histogram = Some(histogram))
- }
- }
+ // Are avgLen and maxLen statistics defined?
+ val hasLenStats = avgLen.isDefined && maxLen.isDefined
+ def toCatalogColumnStat(colName: String, dataType: DataType): CatalogColumnStat =
+ CatalogColumnStat(
+ distinctCount = distinctCount,
+ min = min.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
+ max = max.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
+ nullCount = nullCount,
+ avgLen = avgLen,
+ maxLen = maxLen,
+ histogram = histogram)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
index c41fac4..111c594 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
@@ -32,13 +32,15 @@ object AggregateEstimation {
val childStats = agg.child.stats
// Check if we have column stats for all group-by columns.
val colStatsExist = agg.groupingExpressions.forall { e =>
- e.isInstanceOf[Attribute] && childStats.attributeStats.contains(e.asInstanceOf[Attribute])
+ e.isInstanceOf[Attribute] &&
+ childStats.attributeStats.get(e.asInstanceOf[Attribute]).exists(_.hasCountStats)
}
if (rowCountsExist(agg.child) && colStatsExist) {
// Multiply distinct counts of group-by columns. This is an upper bound, which assumes
// the data contains all combinations of distinct values of group-by columns.
var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
- (res, expr) => res * childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount)
+ (res, expr) => res *
+ childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get)
outputRows = if (agg.groupingExpressions.isEmpty) {
// If there's no group-by columns, the output is a single row containing values of aggregate
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index d793f77..0f147f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.math.BigDecimal.RoundingMode
@@ -38,9 +39,18 @@ object EstimationUtils {
}
}
+ /** Check if each attribute has column stat containing distinct and null counts
+ * in the corresponding statistic. */
+ def columnStatsWithCountsExist(statsAndAttr: (Statistics, Attribute)*): Boolean = {
+ statsAndAttr.forall { case (stats, attr) =>
+ stats.attributeStats.get(attr).map(_.hasCountStats).getOrElse(false)
+ }
+ }
+
+ /** Statistics for a Column containing only NULLs. */
def nullColumnStat(dataType: DataType, rowCount: BigInt): ColumnStat = {
- ColumnStat(distinctCount = 0, min = None, max = None, nullCount = rowCount,
- avgLen = dataType.defaultSize, maxLen = dataType.defaultSize)
+ ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(rowCount),
+ avgLen = Some(dataType.defaultSize), maxLen = Some(dataType.defaultSize))
}
/**
@@ -70,13 +80,13 @@ object EstimationUtils {
// We assign a generic overhead for a Row object, the actual overhead is different for different
// Row format.
val sizePerRow = 8 + attributes.map { attr =>
- if (attrStats.contains(attr)) {
+ if (attrStats.get(attr).map(_.avgLen.isDefined).getOrElse(false)) {
attr.dataType match {
case StringType =>
// UTF8String: base + offset + numBytes
- attrStats(attr).avgLen + 8 + 4
+ attrStats(attr).avgLen.get + 8 + 4
case _ =>
- attrStats(attr).avgLen
+ attrStats(attr).avgLen.get
}
} else {
attr.dataType.defaultSize
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 4cc32de..0538c9d 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -225,7 +225,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
attr: Attribute,
isNull: Boolean,
update: Boolean): Option[Double] = {
- if (!colStatsMap.contains(attr)) {
+ if (!colStatsMap.contains(attr) || !colStatsMap(attr).hasCountStats) {
logDebug("[CBO] No statistics for " + attr)
return None
}
@@ -234,14 +234,14 @@ case class FilterEstimation(plan: Filter) extends Logging {
val nullPercent: Double = if (rowCountValue == 0) {
0
} else {
- (BigDecimal(colStat.nullCount) / BigDecimal(rowCountValue)).toDouble
+ (BigDecimal(colStat.nullCount.get) / BigDecimal(rowCountValue)).toDouble
}
if (update) {
val newStats = if (isNull) {
- colStat.copy(distinctCount = 0, min = None, max = None)
+ colStat.copy(distinctCount = Some(0), min = None, max = None)
} else {
- colStat.copy(nullCount = 0)
+ colStat.copy(nullCount = Some(0))
}
colStatsMap.update(attr, newStats)
}
@@ -322,17 +322,21 @@ case class FilterEstimation(plan: Filter) extends Logging {
// value.
val newStats = attr.dataType match {
case StringType | BinaryType =>
- colStat.copy(distinctCount = 1, nullCount = 0)
+ colStat.copy(distinctCount = Some(1), nullCount = Some(0))
case _ =>
- colStat.copy(distinctCount = 1, min = Some(literal.value),
- max = Some(literal.value), nullCount = 0)
+ colStat.copy(distinctCount = Some(1), min = Some(literal.value),
+ max = Some(literal.value), nullCount = Some(0))
}
colStatsMap.update(attr, newStats)
}
if (colStat.histogram.isEmpty) {
- // returns 1/ndv if there is no histogram
- Some(1.0 / colStat.distinctCount.toDouble)
+ if (!colStat.distinctCount.isEmpty) {
+ // returns 1/ndv if there is no histogram
+ Some(1.0 / colStat.distinctCount.get.toDouble)
+ } else {
+ None
+ }
} else {
Some(computeEqualityPossibilityByHistogram(literal, colStat))
}
@@ -378,13 +382,13 @@ case class FilterEstimation(plan: Filter) extends Logging {
attr: Attribute,
hSet: Set[Any],
update: Boolean): Option[Double] = {
- if (!colStatsMap.contains(attr)) {
+ if (!colStatsMap.hasDistinctCount(attr)) {
logDebug("[CBO] No statistics for " + attr)
return None
}
val colStat = colStatsMap(attr)
- val ndv = colStat.distinctCount
+ val ndv = colStat.distinctCount.get
val dataType = attr.dataType
var newNdv = ndv
@@ -407,8 +411,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
newNdv = ndv.min(BigInt(validQuerySet.size))
if (update) {
- val newStats = colStat.copy(distinctCount = newNdv, min = Some(newMin),
- max = Some(newMax), nullCount = 0)
+ val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin),
+ max = Some(newMax), nullCount = Some(0))
colStatsMap.update(attr, newStats)
}
@@ -416,7 +420,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
case StringType | BinaryType =>
newNdv = ndv.min(BigInt(hSet.size))
if (update) {
- val newStats = colStat.copy(distinctCount = newNdv, nullCount = 0)
+ val newStats = colStat.copy(distinctCount = Some(newNdv), nullCount = Some(0))
colStatsMap.update(attr, newStats)
}
}
@@ -443,12 +447,17 @@ case class FilterEstimation(plan: Filter) extends Logging {
literal: Literal,
update: Boolean): Option[Double] = {
+ if (!colStatsMap.hasMinMaxStats(attr) || !colStatsMap.hasDistinctCount(attr)) {
+ logDebug("[CBO] No statistics for " + attr)
+ return None
+ }
+
val colStat = colStatsMap(attr)
val statsInterval =
ValueInterval(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericValueInterval]
val max = statsInterval.max
val min = statsInterval.min
- val ndv = colStat.distinctCount.toDouble
+ val ndv = colStat.distinctCount.get.toDouble
// determine the overlapping degree between predicate interval and column's interval
val numericLiteral = EstimationUtils.toDouble(literal.value, literal.dataType)
@@ -520,8 +529,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
newMax = newValue
}
- val newStats = colStat.copy(distinctCount = ceil(ndv * percent),
- min = newMin, max = newMax, nullCount = 0)
+ val newStats = colStat.copy(distinctCount = Some(ceil(ndv * percent)),
+ min = newMin, max = newMax, nullCount = Some(0))
colStatsMap.update(attr, newStats)
}
@@ -637,11 +646,11 @@ case class FilterEstimation(plan: Filter) extends Logging {
attrRight: Attribute,
update: Boolean): Option[Double] = {
- if (!colStatsMap.contains(attrLeft)) {
+ if (!colStatsMap.hasCountStats(attrLeft)) {
logDebug("[CBO] No statistics for " + attrLeft)
return None
}
- if (!colStatsMap.contains(attrRight)) {
+ if (!colStatsMap.hasCountStats(attrRight)) {
logDebug("[CBO] No statistics for " + attrRight)
return None
}
@@ -668,7 +677,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
val minRight = statsIntervalRight.min
// determine the overlapping degree between predicate interval and column's interval
- val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
+ val allNotNull = (colStatLeft.nullCount.get == 0) && (colStatRight.nullCount.get == 0)
val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
// Left < Right or Left <= Right
// - no overlap:
@@ -707,14 +716,14 @@ case class FilterEstimation(plan: Filter) extends Logging {
case _: EqualTo =>
((maxLeft < minRight) || (maxRight < minLeft),
(minLeft == minRight) && (maxLeft == maxRight) && allNotNull
- && (colStatLeft.distinctCount == colStatRight.distinctCount)
+ && (colStatLeft.distinctCount.get == colStatRight.distinctCount.get)
)
case _: EqualNullSafe =>
// For null-safe equality, we use a very restrictive condition to evaluate its overlap.
// If null values exists, we set it to partial overlap.
(((maxLeft < minRight) || (maxRight < minLeft)) && allNotNull,
(minLeft == minRight) && (maxLeft == maxRight) && allNotNull
- && (colStatLeft.distinctCount == colStatRight.distinctCount)
+ && (colStatLeft.distinctCount.get == colStatRight.distinctCount.get)
)
}
@@ -731,9 +740,9 @@ case class FilterEstimation(plan: Filter) extends Logging {
if (update) {
// Need to adjust new min/max after the filter condition is applied
- val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+ val ndvLeft = BigDecimal(colStatLeft.distinctCount.get)
val newNdvLeft = ceil(ndvLeft * percent)
- val ndvRight = BigDecimal(colStatRight.distinctCount)
+ val ndvRight = BigDecimal(colStatRight.distinctCount.get)
val newNdvRight = ceil(ndvRight * percent)
var newMaxLeft = colStatLeft.max
@@ -817,10 +826,10 @@ case class FilterEstimation(plan: Filter) extends Logging {
}
}
- val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft,
+ val newStatsLeft = colStatLeft.copy(distinctCount = Some(newNdvLeft), min = newMinLeft,
max = newMaxLeft)
colStatsMap(attrLeft) = newStatsLeft
- val newStatsRight = colStatRight.copy(distinctCount = newNdvRight, min = newMinRight,
+ val newStatsRight = colStatRight.copy(distinctCount = Some(newNdvRight), min = newMinRight,
max = newMaxRight)
colStatsMap(attrRight) = newStatsRight
}
@@ -849,17 +858,35 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
def contains(a: Attribute): Boolean = updatedMap.contains(a.exprId) || originalMap.contains(a)
/**
- * Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
- * originalMap, because updatedMap has the latest (updated) column stats.
+ * Gets an Option of column stat for the given attribute.
+ * Prefer the column stat in updatedMap than that in originalMap,
+ * because updatedMap has the latest (updated) column stats.
*/
- def apply(a: Attribute): ColumnStat = {
+ def get(a: Attribute): Option[ColumnStat] = {
if (updatedMap.contains(a.exprId)) {
- updatedMap(a.exprId)._2
+ updatedMap.get(a.exprId).map(_._2)
} else {
- originalMap(a)
+ originalMap.get(a)
}
}
+ def hasCountStats(a: Attribute): Boolean =
+ get(a).map(_.hasCountStats).getOrElse(false)
+
+ def hasDistinctCount(a: Attribute): Boolean =
+ get(a).map(_.distinctCount.isDefined).getOrElse(false)
+
+ def hasMinMaxStats(a: Attribute): Boolean =
+ get(a).map(_.hasCountStats).getOrElse(false)
+
+ /**
+ * Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
+ * originalMap, because updatedMap has the latest (updated) column stats.
+ */
+ def apply(a: Attribute): ColumnStat = {
+ get(a).get
+ }
+
/** Updates column stats in updatedMap. */
def update(a: Attribute, stats: ColumnStat): Unit = updatedMap.update(a.exprId, a -> stats)
@@ -871,11 +898,14 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
: AttributeMap[ColumnStat] = {
val newColumnStats = originalMap.map { case (attr, oriColStat) =>
val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
- val newNdv = if (colStat.distinctCount > 1) {
+ val newNdv = if (colStat.distinctCount.isEmpty) {
+ // No NDV in the original stats.
+ None
+ } else if (colStat.distinctCount.get > 1) {
// Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
// decreases; otherwise keep it unchanged.
- EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
- newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
+ Some(EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
+ newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount.get))
} else {
// no need to scale down since it is already down to 1 (for skewed distribution case)
colStat.distinctCount
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
index f0294a4..2543e38 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
@@ -85,7 +85,8 @@ case class JoinEstimation(join: Join) extends Logging {
// 3. Update statistics based on the output of join
val inputAttrStats = AttributeMap(
leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq)
- val attributesWithStat = join.output.filter(a => inputAttrStats.contains(a))
+ val attributesWithStat = join.output.filter(a =>
+ inputAttrStats.get(a).map(_.hasCountStats).getOrElse(false))
val (fromLeft, fromRight) = attributesWithStat.partition(join.left.outputSet.contains(_))
val outputStats: Seq[(Attribute, ColumnStat)] = if (outputRows == 0) {
@@ -106,10 +107,10 @@ case class JoinEstimation(join: Join) extends Logging {
case FullOuter =>
fromLeft.map { a =>
val oriColStat = inputAttrStats(a)
- (a, oriColStat.copy(nullCount = oriColStat.nullCount + rightRows))
+ (a, oriColStat.copy(nullCount = Some(oriColStat.nullCount.get + rightRows)))
} ++ fromRight.map { a =>
val oriColStat = inputAttrStats(a)
- (a, oriColStat.copy(nullCount = oriColStat.nullCount + leftRows))
+ (a, oriColStat.copy(nullCount = Some(oriColStat.nullCount.get + leftRows)))
}
case _ =>
assert(joinType == Inner || joinType == Cross)
@@ -219,19 +220,27 @@ case class JoinEstimation(join: Join) extends Logging {
private def computeByNdv(
leftKey: AttributeReference,
rightKey: AttributeReference,
- newMin: Option[Any],
- newMax: Option[Any]): (BigInt, ColumnStat) = {
+ min: Option[Any],
+ max: Option[Any]): (BigInt, ColumnStat) = {
val leftKeyStat = leftStats.attributeStats(leftKey)
val rightKeyStat = rightStats.attributeStats(rightKey)
- val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
+ val maxNdv = leftKeyStat.distinctCount.get.max(rightKeyStat.distinctCount.get)
// Compute cardinality by the basic formula.
val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
// Get the intersected column stat.
- val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
- val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
- val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
- val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)
+ val newNdv = Some(leftKeyStat.distinctCount.get.min(rightKeyStat.distinctCount.get))
+ val newMaxLen = if (leftKeyStat.maxLen.isDefined && rightKeyStat.maxLen.isDefined) {
+ Some(math.min(leftKeyStat.maxLen.get, rightKeyStat.maxLen.get))
+ } else {
+ None
+ }
+ val newAvgLen = if (leftKeyStat.avgLen.isDefined && rightKeyStat.avgLen.isDefined) {
+ Some((leftKeyStat.avgLen.get + rightKeyStat.avgLen.get) / 2)
+ } else {
+ None
+ }
+ val newStats = ColumnStat(newNdv, min, max, Some(0), newAvgLen, newMaxLen)
(ceil(card), newStats)
}
@@ -267,9 +276,17 @@ case class JoinEstimation(join: Join) extends Logging {
val leftKeyStat = leftStats.attributeStats(leftKey)
val rightKeyStat = rightStats.attributeStats(rightKey)
- val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
- val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
- val newStats = ColumnStat(ceil(totalNdv), newMin, newMax, 0, newAvgLen, newMaxLen)
+ val newMaxLen = if (leftKeyStat.maxLen.isDefined && rightKeyStat.maxLen.isDefined) {
+ Some(math.min(leftKeyStat.maxLen.get, rightKeyStat.maxLen.get))
+ } else {
+ None
+ }
+ val newAvgLen = if (leftKeyStat.avgLen.isDefined && rightKeyStat.avgLen.isDefined) {
+ Some((leftKeyStat.avgLen.get + rightKeyStat.avgLen.get) / 2)
+ } else {
+ None
+ }
+ val newStats = ColumnStat(Some(ceil(totalNdv)), newMin, newMax, Some(0), newAvgLen, newMaxLen)
(ceil(card), newStats)
}
@@ -292,10 +309,14 @@ case class JoinEstimation(join: Join) extends Logging {
} else {
val oldColStat = oldAttrStats(a)
val oldNdv = oldColStat.distinctCount
- val newNdv = if (join.left.outputSet.contains(a)) {
- updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv)
+ val newNdv = if (oldNdv.isDefined) {
+ Some(if (join.left.outputSet.contains(a)) {
+ updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv.get)
+ } else {
+ updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv.get)
+ })
} else {
- updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv)
+ None
}
val newColStat = oldColStat.copy(distinctCount = newNdv)
// TODO: support nullCount updates for specific outer joins
@@ -313,7 +334,7 @@ case class JoinEstimation(join: Join) extends Logging {
// Note: join keys from EqualNullSafe also fall into this case (Coalesce), consider to
// support it in the future by using `nullCount` in column stats.
case (lk: AttributeReference, rk: AttributeReference)
- if columnStatsExist((leftStats, lk), (rightStats, rk)) => (lk, rk)
+ if columnStatsWithCountsExist((leftStats, lk), (rightStats, rk)) => (lk, rk)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 2fb587d..565b0a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -62,24 +62,15 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
}
}
- /** Set up tables and columns for testing */
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
- attr("t1.k-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t1.v-1-10") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t2.k-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t3.v-1-100") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t4.k-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t4.v-1-10") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t5.k-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t5.v-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ attr("t1.k-1-2") -> rangeColumnStat(2, 0),
+ attr("t1.v-1-10") -> rangeColumnStat(10, 0),
+ attr("t2.k-1-5") -> rangeColumnStat(5, 0),
+ attr("t3.v-1-100") -> rangeColumnStat(100, 0),
+ attr("t4.k-1-2") -> rangeColumnStat(2, 0),
+ attr("t4.v-1-10") -> rangeColumnStat(10, 0),
+ attr("t5.k-1-5") -> rangeColumnStat(5, 0),
+ attr("t5.v-1-5") -> rangeColumnStat(5, 0)
))
private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index ada6e2a..d4d23ad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -68,88 +68,56 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
// F1 (fact table)
- attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("f1_fk1") -> rangeColumnStat(100, 0),
+ attr("f1_fk2") -> rangeColumnStat(100, 0),
+ attr("f1_fk3") -> rangeColumnStat(100, 0),
+ attr("f1_c1") -> rangeColumnStat(100, 0),
+ attr("f1_c2") -> rangeColumnStat(100, 0),
// D1 (dimension)
- attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d1_pk") -> rangeColumnStat(100, 0),
+ attr("d1_c2") -> rangeColumnStat(50, 0),
+ attr("d1_c3") -> rangeColumnStat(50, 0),
// D2 (dimension)
- attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d2_pk") -> rangeColumnStat(20, 0),
+ attr("d2_c2") -> rangeColumnStat(10, 0),
+ attr("d2_c3") -> rangeColumnStat(10, 0),
// D3 (dimension)
- attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d3_pk") -> rangeColumnStat(10, 0),
+ attr("d3_c2") -> rangeColumnStat(5, 0),
+ attr("d3_c3") -> rangeColumnStat(5, 0),
// T1 (regular table i.e. outside star)
- attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t1_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t1_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t1_c1") -> rangeColumnStat(20, 1),
+ attr("t1_c2") -> rangeColumnStat(10, 1),
+ attr("t1_c3") -> rangeColumnStat(10, 1),
// T2 (regular table)
- attr("t2_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t2_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t2_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t2_c1") -> rangeColumnStat(5, 1),
+ attr("t2_c2") -> rangeColumnStat(5, 1),
+ attr("t2_c3") -> rangeColumnStat(5, 1),
// T3 (regular table)
- attr("t3_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t3_c1") -> rangeColumnStat(5, 1),
+ attr("t3_c2") -> rangeColumnStat(5, 1),
+ attr("t3_c3") -> rangeColumnStat(5, 1),
// T4 (regular table)
- attr("t4_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t4_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t4_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t4_c1") -> rangeColumnStat(5, 1),
+ attr("t4_c2") -> rangeColumnStat(5, 1),
+ attr("t4_c3") -> rangeColumnStat(5, 1),
// T5 (regular table)
- attr("t5_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t5_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t5_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t5_c1") -> rangeColumnStat(5, 1),
+ attr("t5_c2") -> rangeColumnStat(5, 1),
+ attr("t5_c3") -> rangeColumnStat(5, 1),
// T6 (regular table)
- attr("t6_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t6_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t6_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4)
+ attr("t6_c1") -> rangeColumnStat(5, 1),
+ attr("t6_c2") -> rangeColumnStat(5, 1),
+ attr("t6_c3") -> rangeColumnStat(5, 1)
))
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 777c563..4e0883e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -70,59 +70,40 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
// Tables' cardinality: f1 > d3 > d1 > d2 > s3
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
// F1
- attr("f1_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_fk2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_fk3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_c4") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("f1_fk1") -> rangeColumnStat(3, 0),
+ attr("f1_fk2") -> rangeColumnStat(3, 0),
+ attr("f1_fk3") -> rangeColumnStat(4, 0),
+ attr("f1_c4") -> rangeColumnStat(4, 0),
// D1
- attr("d1_pk1") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c4") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d1_pk1") -> rangeColumnStat(4, 0),
+ attr("d1_c2") -> rangeColumnStat(3, 0),
+ attr("d1_c3") -> rangeColumnStat(4, 0),
+ attr("d1_c4") -> ColumnStat(distinctCount = Some(2), min = Some("2"), max = Some("3"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
// D2
- attr("d2_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("d2_pk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d2_c3") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d2_c4") -> ColumnStat(distinctCount = 2, min = Some(3), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d2_c2") -> ColumnStat(distinctCount = Some(3), min = Some("1"), max = Some("3"),
+ nullCount = Some(1), avgLen = Some(4), maxLen = Some(4)),
+ attr("d2_pk1") -> rangeColumnStat(3, 0),
+ attr("d2_c3") -> rangeColumnStat(3, 0),
+ attr("d2_c4") -> ColumnStat(distinctCount = Some(2), min = Some("3"), max = Some("4"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
// D3
- attr("d3_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_pk1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_c4") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d3_fk1") -> rangeColumnStat(3, 0),
+ attr("d3_c2") -> rangeColumnStat(3, 0),
+ attr("d3_pk1") -> rangeColumnStat(5, 0),
+ attr("d3_c4") -> ColumnStat(distinctCount = Some(2), min = Some("2"), max = Some("3"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
// S3
- attr("s3_pk1") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("s3_c2") -> ColumnStat(distinctCount = 1, min = Some(3), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("s3_c3") -> ColumnStat(distinctCount = 1, min = Some(3), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("s3_c4") -> ColumnStat(distinctCount = 2, min = Some(3), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("s3_pk1") -> rangeColumnStat(2, 0),
+ attr("s3_c2") -> rangeColumnStat(1, 0),
+ attr("s3_c3") -> rangeColumnStat(1, 0),
+ attr("s3_c4") -> ColumnStat(distinctCount = Some(2), min = Some("3"), max = Some("4"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
// F11
- attr("f11_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f11_fk2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f11_fk3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f11_c4") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ attr("f11_fk1") -> rangeColumnStat(3, 0),
+ attr("f11_fk2") -> rangeColumnStat(3, 0),
+ attr("f11_fk3") -> rangeColumnStat(4, 0),
+ attr("f11_c4") -> rangeColumnStat(4, 0)
))
private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
index 23f95a6..8213d56 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
@@ -29,16 +29,16 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
/** Columns for testing */
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
- attr("key11") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key12") -> ColumnStat(distinctCount = 4, min = Some(10), max = Some(40), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key21") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key22") -> ColumnStat(distinctCount = 2, min = Some(10), max = Some(20), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key31") -> ColumnStat(distinctCount = 0, min = None, max = None, nullCount = 0,
- avgLen = 4, maxLen = 4)
+ attr("key11") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key12") -> ColumnStat(distinctCount = Some(4), min = Some(10), max = Some(40),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key21") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None,
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
))
private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
@@ -63,8 +63,8 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
tableRowCount = 6,
groupByColumns = Seq("key21", "key22"),
// Row count = product of ndv
- expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount * nameToColInfo("key22")._2
- .distinctCount)
+ expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount.get *
+ nameToColInfo("key22")._2.distinctCount.get)
}
test("empty group-by column") {
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 7d532ff..953094c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.IntegerType
class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
val attribute = attr("key")
- val colStat = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ val colStat = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
val plan = StatsTestPlan(
outputList = Seq(attribute),
@@ -116,13 +116,17 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
sizeInBytes = 40,
rowCount = Some(10),
attributeStats = AttributeMap(Seq(
- AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))))
+ AttributeReference("c1", IntegerType)() -> ColumnStat(distinctCount = Some(10),
+ min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))))
val expectedCboStats =
Statistics(
sizeInBytes = 4,
rowCount = Some(1),
attributeStats = AttributeMap(Seq(
- AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))))
+ AttributeReference("c1", IntegerType)() -> ColumnStat(distinctCount = Some(10),
+ min = Some(5), max = Some(5),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))))
val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
checkStats(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org
[2/3] spark git commit: [SPARK-23445] ColumnStat refactoring
Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
index 2b1fe98..43440d5 100755
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
@@ -37,59 +37,61 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
// Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
val attrInt = AttributeReference("cint", IntegerType)()
- val colStatInt = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ val colStatInt = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
// column cbool has only 2 distinct values
val attrBool = AttributeReference("cbool", BooleanType)()
- val colStatBool = ColumnStat(distinctCount = 2, min = Some(false), max = Some(true),
- nullCount = 0, avgLen = 1, maxLen = 1)
+ val colStatBool = ColumnStat(distinctCount = Some(2), min = Some(false), max = Some(true),
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))
// column cdate has 10 values from 2017-01-01 through 2017-01-10.
val dMin = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-01"))
val dMax = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-10"))
val attrDate = AttributeReference("cdate", DateType)()
- val colStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), max = Some(dMax),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ val colStatDate = ColumnStat(distinctCount = Some(10),
+ min = Some(dMin), max = Some(dMax),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
// column cdecimal has 4 values from 0.20 through 0.80 at increment of 0.20.
val decMin = Decimal("0.200000000000000000")
val decMax = Decimal("0.800000000000000000")
val attrDecimal = AttributeReference("cdecimal", DecimalType(18, 18))()
- val colStatDecimal = ColumnStat(distinctCount = 4, min = Some(decMin), max = Some(decMax),
- nullCount = 0, avgLen = 8, maxLen = 8)
+ val colStatDecimal = ColumnStat(distinctCount = Some(4),
+ min = Some(decMin), max = Some(decMax),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
// column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0
val attrDouble = AttributeReference("cdouble", DoubleType)()
- val colStatDouble = ColumnStat(distinctCount = 10, min = Some(1.0), max = Some(10.0),
- nullCount = 0, avgLen = 8, maxLen = 8)
+ val colStatDouble = ColumnStat(distinctCount = Some(10), min = Some(1.0), max = Some(10.0),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
// column cstring has 10 String values:
// "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9"
val attrString = AttributeReference("cstring", StringType)()
- val colStatString = ColumnStat(distinctCount = 10, min = None, max = None,
- nullCount = 0, avgLen = 2, maxLen = 2)
+ val colStatString = ColumnStat(distinctCount = Some(10), min = None, max = None,
+ nullCount = Some(0), avgLen = Some(2), maxLen = Some(2))
// column cint2 has values: 7, 8, 9, 10, 11, 12, 13, 14, 15, 16
// Hence, distinctCount:10, min:7, max:16, nullCount:0, avgLen:4, maxLen:4
// This column is created to test "cint < cint2
val attrInt2 = AttributeReference("cint2", IntegerType)()
- val colStatInt2 = ColumnStat(distinctCount = 10, min = Some(7), max = Some(16),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ val colStatInt2 = ColumnStat(distinctCount = Some(10), min = Some(7), max = Some(16),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
// column cint3 has values: 30, 31, 32, 33, 34, 35, 36, 37, 38, 39
// Hence, distinctCount:10, min:30, max:39, nullCount:0, avgLen:4, maxLen:4
// This column is created to test "cint = cint3 without overlap at all.
val attrInt3 = AttributeReference("cint3", IntegerType)()
- val colStatInt3 = ColumnStat(distinctCount = 10, min = Some(30), max = Some(39),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ val colStatInt3 = ColumnStat(distinctCount = Some(10), min = Some(30), max = Some(39),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
// column cint4 has values in the range from 1 to 10
// distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
// This column is created to test complete overlap
val attrInt4 = AttributeReference("cint4", IntegerType)()
- val colStatInt4 = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ val colStatInt4 = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
// column cintHgm has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 with histogram.
// Note that cintHgm has an even distribution with histogram information built.
@@ -98,8 +100,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val hgmInt = Histogram(2.0, Array(HistogramBin(1.0, 2.0, 2),
HistogramBin(2.0, 4.0, 2), HistogramBin(4.0, 6.0, 2),
HistogramBin(6.0, 8.0, 2), HistogramBin(8.0, 10.0, 2)))
- val colStatIntHgm = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))
+ val colStatIntHgm = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))
// column cintSkewHgm has values: 1, 4, 4, 5, 5, 5, 5, 6, 6, 10 with histogram.
// Note that cintSkewHgm has a skewed distribution with histogram information built.
@@ -108,8 +110,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val hgmIntSkew = Histogram(2.0, Array(HistogramBin(1.0, 4.0, 2),
HistogramBin(4.0, 5.0, 2), HistogramBin(5.0, 5.0, 1),
HistogramBin(5.0, 6.0, 2), HistogramBin(6.0, 10.0, 2)))
- val colStatIntSkewHgm = ColumnStat(distinctCount = 5, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))
+ val colStatIntSkewHgm = ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))
val attributeMap = AttributeMap(Seq(
attrInt -> colStatInt,
@@ -172,7 +174,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Or(LessThan(attrInt, Literal(3)), Literal(null, IntegerType))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> colStatInt.copy(distinctCount = 3)),
+ Seq(attrInt -> colStatInt.copy(distinctCount = Some(3))),
expectedRowCount = 3)
}
@@ -180,7 +182,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(LessThan(attrInt, Literal(3)), Literal(null, IntegerType)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> colStatInt.copy(distinctCount = 8)),
+ Seq(attrInt -> colStatInt.copy(distinctCount = Some(8))),
expectedRowCount = 8)
}
@@ -196,23 +198,23 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(LessThan(attrInt, Literal(3)), Not(Literal(null, IntegerType))))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> colStatInt.copy(distinctCount = 8)),
+ Seq(attrInt -> colStatInt.copy(distinctCount = Some(8))),
expectedRowCount = 8)
}
test("cint = 2") {
validateEstimatedStats(
Filter(EqualTo(attrInt, Literal(2)), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 1, min = Some(2), max = Some(2),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(1), min = Some(2), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 1)
}
test("cint <=> 2") {
validateEstimatedStats(
Filter(EqualNullSafe(attrInt, Literal(2)), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 1, min = Some(2), max = Some(2),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(1), min = Some(2), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 1)
}
@@ -227,8 +229,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint < 3") {
validateEstimatedStats(
Filter(LessThan(attrInt, Literal(3)), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(3), min = Some(1), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 3)
}
@@ -243,16 +245,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint <= 3") {
validateEstimatedStats(
Filter(LessThanOrEqual(attrInt, Literal(3)), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(3), min = Some(1), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 3)
}
test("cint > 6") {
validateEstimatedStats(
Filter(GreaterThan(attrInt, Literal(6)), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(5), min = Some(6), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 5)
}
@@ -267,8 +269,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint >= 6") {
validateEstimatedStats(
Filter(GreaterThanOrEqual(attrInt, Literal(6)), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(5), min = Some(6), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 5)
}
@@ -282,8 +284,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cint IS NOT NULL") {
validateEstimatedStats(
Filter(IsNotNull(attrInt), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 10)
}
@@ -301,8 +303,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = And(GreaterThan(attrInt, Literal(3)), LessThanOrEqual(attrInt, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(3), max = Some(6),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 4)
}
@@ -310,7 +312,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Or(EqualTo(attrInt, Literal(3)), EqualTo(attrInt, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> colStatInt.copy(distinctCount = 2)),
+ Seq(attrInt -> colStatInt.copy(distinctCount = Some(2))),
expectedRowCount = 2)
}
@@ -318,7 +320,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(GreaterThan(attrInt, Literal(3)), LessThanOrEqual(attrInt, Literal(6))))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> colStatInt.copy(distinctCount = 6)),
+ Seq(attrInt -> colStatInt.copy(distinctCount = Some(6))),
expectedRowCount = 6)
}
@@ -326,7 +328,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(Or(LessThanOrEqual(attrInt, Literal(3)), GreaterThan(attrInt, Literal(6))))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> colStatInt.copy(distinctCount = 5)),
+ Seq(attrInt -> colStatInt.copy(distinctCount = Some(5))),
expectedRowCount = 5)
}
@@ -342,47 +344,47 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(Or(EqualTo(attrInt, Literal(3)), LessThan(attrString, Literal("A8"))))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt, attrString), 10L)),
- Seq(attrInt -> colStatInt.copy(distinctCount = 9),
- attrString -> colStatString.copy(distinctCount = 9)),
+ Seq(attrInt -> colStatInt.copy(distinctCount = Some(9)),
+ attrString -> colStatString.copy(distinctCount = Some(9))),
expectedRowCount = 9)
}
test("cint IN (3, 4, 5)") {
validateEstimatedStats(
Filter(InSet(attrInt, Set(3, 4, 5)), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(3), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(3), min = Some(3), max = Some(5),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 3)
}
test("cint NOT IN (3, 4, 5)") {
validateEstimatedStats(
Filter(Not(InSet(attrInt, Set(3, 4, 5))), childStatsTestPlan(Seq(attrInt), 10L)),
- Seq(attrInt -> colStatInt.copy(distinctCount = 7)),
+ Seq(attrInt -> colStatInt.copy(distinctCount = Some(7))),
expectedRowCount = 7)
}
test("cbool IN (true)") {
validateEstimatedStats(
Filter(InSet(attrBool, Set(true)), childStatsTestPlan(Seq(attrBool), 10L)),
- Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = Some(true),
- nullCount = 0, avgLen = 1, maxLen = 1)),
+ Seq(attrBool -> ColumnStat(distinctCount = Some(1), min = Some(true), max = Some(true),
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))),
expectedRowCount = 5)
}
test("cbool = true") {
validateEstimatedStats(
Filter(EqualTo(attrBool, Literal(true)), childStatsTestPlan(Seq(attrBool), 10L)),
- Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = Some(true),
- nullCount = 0, avgLen = 1, maxLen = 1)),
+ Seq(attrBool -> ColumnStat(distinctCount = Some(1), min = Some(true), max = Some(true),
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))),
expectedRowCount = 5)
}
test("cbool > false") {
validateEstimatedStats(
Filter(GreaterThan(attrBool, Literal(false)), childStatsTestPlan(Seq(attrBool), 10L)),
- Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max = Some(true),
- nullCount = 0, avgLen = 1, maxLen = 1)),
+ Seq(attrBool -> ColumnStat(distinctCount = Some(1), min = Some(false), max = Some(true),
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))),
expectedRowCount = 5)
}
@@ -391,18 +393,21 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
validateEstimatedStats(
Filter(EqualTo(attrDate, Literal(d20170102, DateType)),
childStatsTestPlan(Seq(attrDate), 10L)),
- Seq(attrDate -> ColumnStat(distinctCount = 1, min = Some(d20170102), max = Some(d20170102),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrDate -> ColumnStat(distinctCount = Some(1),
+ min = Some(d20170102), max = Some(d20170102),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 1)
}
test("cdate < cast('2017-01-03' AS DATE)") {
+ val d20170101 = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-01"))
val d20170103 = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-03"))
validateEstimatedStats(
Filter(LessThan(attrDate, Literal(d20170103, DateType)),
childStatsTestPlan(Seq(attrDate), 10L)),
- Seq(attrDate -> ColumnStat(distinctCount = 3, min = Some(dMin), max = Some(d20170103),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrDate -> ColumnStat(distinctCount = Some(3),
+ min = Some(d20170101), max = Some(d20170103),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 3)
}
@@ -414,8 +419,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
validateEstimatedStats(
Filter(In(attrDate, Seq(Literal(d20170103, DateType), Literal(d20170104, DateType),
Literal(d20170105, DateType))), childStatsTestPlan(Seq(attrDate), 10L)),
- Seq(attrDate -> ColumnStat(distinctCount = 3, min = Some(d20170103), max = Some(d20170105),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrDate -> ColumnStat(distinctCount = Some(3),
+ min = Some(d20170103), max = Some(d20170105),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 3)
}
@@ -424,42 +430,45 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
validateEstimatedStats(
Filter(EqualTo(attrDecimal, Literal(dec_0_40)),
childStatsTestPlan(Seq(attrDecimal), 4L)),
- Seq(attrDecimal -> ColumnStat(distinctCount = 1, min = Some(dec_0_40), max = Some(dec_0_40),
- nullCount = 0, avgLen = 8, maxLen = 8)),
+ Seq(attrDecimal -> ColumnStat(distinctCount = Some(1),
+ min = Some(dec_0_40), max = Some(dec_0_40),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))),
expectedRowCount = 1)
}
test("cdecimal < 0.60 ") {
+ val dec_0_20 = Decimal("0.200000000000000000")
val dec_0_60 = Decimal("0.600000000000000000")
validateEstimatedStats(
Filter(LessThan(attrDecimal, Literal(dec_0_60)),
childStatsTestPlan(Seq(attrDecimal), 4L)),
- Seq(attrDecimal -> ColumnStat(distinctCount = 3, min = Some(decMin), max = Some(dec_0_60),
- nullCount = 0, avgLen = 8, maxLen = 8)),
+ Seq(attrDecimal -> ColumnStat(distinctCount = Some(3),
+ min = Some(dec_0_20), max = Some(dec_0_60),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))),
expectedRowCount = 3)
}
test("cdouble < 3.0") {
validateEstimatedStats(
Filter(LessThan(attrDouble, Literal(3.0)), childStatsTestPlan(Seq(attrDouble), 10L)),
- Seq(attrDouble -> ColumnStat(distinctCount = 3, min = Some(1.0), max = Some(3.0),
- nullCount = 0, avgLen = 8, maxLen = 8)),
+ Seq(attrDouble -> ColumnStat(distinctCount = Some(3), min = Some(1.0), max = Some(3.0),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))),
expectedRowCount = 3)
}
test("cstring = 'A2'") {
validateEstimatedStats(
Filter(EqualTo(attrString, Literal("A2")), childStatsTestPlan(Seq(attrString), 10L)),
- Seq(attrString -> ColumnStat(distinctCount = 1, min = None, max = None,
- nullCount = 0, avgLen = 2, maxLen = 2)),
+ Seq(attrString -> ColumnStat(distinctCount = Some(1), min = None, max = None,
+ nullCount = Some(0), avgLen = Some(2), maxLen = Some(2))),
expectedRowCount = 1)
}
test("cstring < 'A2' - unsupported condition") {
validateEstimatedStats(
Filter(LessThan(attrString, Literal("A2")), childStatsTestPlan(Seq(attrString), 10L)),
- Seq(attrString -> ColumnStat(distinctCount = 10, min = None, max = None,
- nullCount = 0, avgLen = 2, maxLen = 2)),
+ Seq(attrString -> ColumnStat(distinctCount = Some(10), min = None, max = None,
+ nullCount = Some(0), avgLen = Some(2), maxLen = Some(2))),
expectedRowCount = 10)
}
@@ -468,8 +477,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// valid values in IN clause is greater than the number of distinct values for a given column.
// For example, column has only 2 distinct values 1 and 6.
// The predicate is: column IN (1, 2, 3, 4, 5).
- val cornerChildColStatInt = ColumnStat(distinctCount = 2, min = Some(1), max = Some(6),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ val cornerChildColStatInt = ColumnStat(distinctCount = Some(2),
+ min = Some(1), max = Some(6),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
val cornerChildStatsTestplan = StatsTestPlan(
outputList = Seq(attrInt),
rowCount = 2L,
@@ -477,16 +487,17 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
)
validateEstimatedStats(
Filter(InSet(attrInt, Set(1, 2, 3, 4, 5)), cornerChildStatsTestplan),
- Seq(attrInt -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(5),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 2)
}
// This is a limitation test. We should remove it after the limitation is removed.
test("don't estimate IsNull or IsNotNull if the child is a non-leaf node") {
val attrIntLargerRange = AttributeReference("c1", IntegerType)()
- val colStatIntLargerRange = ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
- nullCount = 10, avgLen = 4, maxLen = 4)
+ val colStatIntLargerRange = ColumnStat(distinctCount = Some(20),
+ min = Some(1), max = Some(20),
+ nullCount = Some(10), avgLen = Some(4), maxLen = Some(4))
val smallerTable = childStatsTestPlan(Seq(attrInt), 10L)
val largerTable = StatsTestPlan(
outputList = Seq(attrIntLargerRange),
@@ -508,10 +519,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// partial overlap case
validateEstimatedStats(
Filter(EqualTo(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attrInt2 -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 4)
}
@@ -519,10 +530,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// partial overlap case
validateEstimatedStats(
Filter(GreaterThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attrInt2 -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 4)
}
@@ -530,10 +541,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// partial overlap case
validateEstimatedStats(
Filter(LessThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(16),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attrInt2 -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(16),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 4)
}
@@ -541,10 +552,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// complete overlap case
validateEstimatedStats(
Filter(EqualTo(attrInt, attrInt4), childStatsTestPlan(Seq(attrInt, attrInt4), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attrInt4 -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attrInt4 -> ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 10)
}
@@ -552,10 +563,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// partial overlap case
validateEstimatedStats(
Filter(LessThan(attrInt, attrInt4), childStatsTestPlan(Seq(attrInt, attrInt4), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attrInt4 -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attrInt4 -> ColumnStat(distinctCount = Some(4), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 4)
}
@@ -571,10 +582,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
// all table records qualify.
validateEstimatedStats(
Filter(LessThan(attrInt, attrInt3), childStatsTestPlan(Seq(attrInt, attrInt3), 10L)),
- Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attrInt3 -> ColumnStat(distinctCount = 10, min = Some(30), max = Some(39),
- nullCount = 0, avgLen = 4, maxLen = 4)),
+ Seq(attrInt -> ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attrInt3 -> ColumnStat(distinctCount = Some(10), min = Some(30), max = Some(39),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
expectedRowCount = 10)
}
@@ -592,11 +603,11 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrInt, attrInt4, attrString), 10L)),
Seq(
- attrInt -> ColumnStat(distinctCount = 5, min = Some(3), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attrInt4 -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(6),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attrString -> colStatString.copy(distinctCount = 5)),
+ attrInt -> ColumnStat(distinctCount = Some(5), min = Some(3), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attrInt4 -> ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(6),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attrString -> colStatString.copy(distinctCount = Some(5))),
expectedRowCount = 5)
}
@@ -606,15 +617,15 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(LessThan(attrIntHgm, Literal(3)), Literal(null, IntegerType)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
- Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 7)),
+ Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = Some(7))),
expectedRowCount = 7)
}
test("cintHgm = 5") {
validateEstimatedStats(
Filter(EqualTo(attrIntHgm, Literal(5)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
- Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+ Seq(attrIntHgm -> ColumnStat(distinctCount = Some(1), min = Some(5), max = Some(5),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
expectedRowCount = 1)
}
@@ -629,8 +640,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cintHgm < 3") {
validateEstimatedStats(
Filter(LessThan(attrIntHgm, Literal(3)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
- Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+ Seq(attrIntHgm -> ColumnStat(distinctCount = Some(3), min = Some(1), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
expectedRowCount = 3)
}
@@ -645,16 +656,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cintHgm <= 3") {
validateEstimatedStats(
Filter(LessThanOrEqual(attrIntHgm, Literal(3)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
- Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+ Seq(attrIntHgm -> ColumnStat(distinctCount = Some(3), min = Some(1), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
expectedRowCount = 3)
}
test("cintHgm > 6") {
validateEstimatedStats(
Filter(GreaterThan(attrIntHgm, Literal(6)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
- Seq(attrIntHgm -> ColumnStat(distinctCount = 4, min = Some(6), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+ Seq(attrIntHgm -> ColumnStat(distinctCount = Some(4), min = Some(6), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
expectedRowCount = 4)
}
@@ -669,8 +680,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cintHgm >= 6") {
validateEstimatedStats(
Filter(GreaterThanOrEqual(attrIntHgm, Literal(6)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
- Seq(attrIntHgm -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+ Seq(attrIntHgm -> ColumnStat(distinctCount = Some(5), min = Some(6), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
expectedRowCount = 5)
}
@@ -679,8 +690,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
Literal(3)), LessThanOrEqual(attrIntHgm, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
- Seq(attrIntHgm -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+ Seq(attrIntHgm -> ColumnStat(distinctCount = Some(4), min = Some(3), max = Some(6),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
expectedRowCount = 4)
}
@@ -688,7 +699,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Or(EqualTo(attrIntHgm, Literal(3)), EqualTo(attrIntHgm, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
- Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 3)),
+ Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = Some(3))),
expectedRowCount = 3)
}
@@ -698,15 +709,15 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Not(And(LessThan(attrIntSkewHgm, Literal(3)), Literal(null, IntegerType)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
- Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = 5)),
+ Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = Some(5))),
expectedRowCount = 9)
}
test("cintSkewHgm = 5") {
validateEstimatedStats(
Filter(EqualTo(attrIntSkewHgm, Literal(5)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
- Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+ Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(1), min = Some(5), max = Some(5),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
expectedRowCount = 4)
}
@@ -721,8 +732,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
test("cintSkewHgm < 3") {
validateEstimatedStats(
Filter(LessThan(attrIntSkewHgm, Literal(3)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
- Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+ Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(1), min = Some(1), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
expectedRowCount = 2)
}
@@ -738,16 +749,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
validateEstimatedStats(
Filter(LessThanOrEqual(attrIntSkewHgm, Literal(3)),
childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
- Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+ Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(1), min = Some(1), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
expectedRowCount = 2)
}
test("cintSkewHgm > 6") {
validateEstimatedStats(
Filter(GreaterThan(attrIntSkewHgm, Literal(6)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
- Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(6), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+ Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(1), min = Some(6), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
expectedRowCount = 2)
}
@@ -764,8 +775,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
validateEstimatedStats(
Filter(GreaterThanOrEqual(attrIntSkewHgm, Literal(6)),
childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
- Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 2, min = Some(6), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+ Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(2), min = Some(6), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
expectedRowCount = 3)
}
@@ -774,8 +785,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
Literal(3)), LessThanOrEqual(attrIntSkewHgm, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
- Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6),
- nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+ Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(4), min = Some(3), max = Some(6),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
expectedRowCount = 8)
}
@@ -783,7 +794,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
val condition = Or(EqualTo(attrIntSkewHgm, Literal(3)), EqualTo(attrIntSkewHgm, Literal(6)))
validateEstimatedStats(
Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
- Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = 2)),
+ Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = Some(2))),
expectedRowCount = 3)
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
index 26139d8..12c0a7b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
@@ -33,16 +33,16 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
/** Set up tables and its columns for testing */
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
- attr("key-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key-5-9") -> ColumnStat(distinctCount = 5, min = Some(5), max = Some(9), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key-2-4") -> ColumnStat(distinctCount = 3, min = Some(2), max = Some(4), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key-2-3") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3), nullCount = 0,
- avgLen = 4, maxLen = 4)
+ attr("key-1-5") -> ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(5),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key-5-9") -> ColumnStat(distinctCount = Some(5), min = Some(5), max = Some(9),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key-1-2") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key-2-4") -> ColumnStat(distinctCount = Some(3), min = Some(2), max = Some(4),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key-2-3") -> ColumnStat(distinctCount = Some(2), min = Some(2), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
))
private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
@@ -70,8 +70,8 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
private def estimateByHistogram(
leftHistogram: Histogram,
rightHistogram: Histogram,
- expectedMin: Double,
- expectedMax: Double,
+ expectedMin: Any,
+ expectedMax: Any,
expectedNdv: Long,
expectedRows: Long): Unit = {
val col1 = attr("key1")
@@ -86,9 +86,11 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
rowCount = Some(expectedRows),
attributeStats = AttributeMap(Seq(
col1 -> c1.stats.attributeStats(col1).copy(
- distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
+ distinctCount = Some(expectedNdv),
+ min = Some(expectedMin), max = Some(expectedMax)),
col2 -> c2.stats.attributeStats(col2).copy(
- distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
+ distinctCount = Some(expectedNdv),
+ min = Some(expectedMin), max = Some(expectedMax))))
)
// Join order should not affect estimation result.
@@ -100,9 +102,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
private def generateJoinChild(
col: Attribute,
histogram: Histogram,
- expectedMin: Double,
- expectedMax: Double): LogicalPlan = {
- val colStat = inferColumnStat(histogram)
+ expectedMin: Any,
+ expectedMax: Any): LogicalPlan = {
+ val colStat = inferColumnStat(histogram, expectedMin, expectedMax)
StatsTestPlan(
outputList = Seq(col),
rowCount = (histogram.height * histogram.bins.length).toLong,
@@ -110,7 +112,11 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
}
/** Column statistics should be consistent with histograms in tests. */
- private def inferColumnStat(histogram: Histogram): ColumnStat = {
+ private def inferColumnStat(
+ histogram: Histogram,
+ expectedMin: Any,
+ expectedMax: Any): ColumnStat = {
+
var ndv = 0L
for (i <- histogram.bins.indices) {
val bin = histogram.bins(i)
@@ -118,8 +124,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
ndv += bin.ndv
}
}
- ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
- max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
+ ColumnStat(distinctCount = Some(ndv),
+ min = Some(expectedMin), max = Some(expectedMax),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4),
histogram = Some(histogram))
}
@@ -343,10 +350,10 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
rowCount = Some(5 + 3),
attributeStats = AttributeMap(
// Update null count in column stats.
- Seq(nameToAttr("key-1-5") -> columnInfo(nameToAttr("key-1-5")).copy(nullCount = 3),
- nameToAttr("key-5-9") -> columnInfo(nameToAttr("key-5-9")).copy(nullCount = 3),
- nameToAttr("key-1-2") -> columnInfo(nameToAttr("key-1-2")).copy(nullCount = 5),
- nameToAttr("key-2-4") -> columnInfo(nameToAttr("key-2-4")).copy(nullCount = 5))))
+ Seq(nameToAttr("key-1-5") -> columnInfo(nameToAttr("key-1-5")).copy(nullCount = Some(3)),
+ nameToAttr("key-5-9") -> columnInfo(nameToAttr("key-5-9")).copy(nullCount = Some(3)),
+ nameToAttr("key-1-2") -> columnInfo(nameToAttr("key-1-2")).copy(nullCount = Some(5)),
+ nameToAttr("key-2-4") -> columnInfo(nameToAttr("key-2-4")).copy(nullCount = Some(5)))))
assert(join.stats == expectedStats)
}
@@ -356,11 +363,11 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
val join = Join(table1, table2, Inner,
Some(EqualTo(nameToAttr("key-1-5"), nameToAttr("key-1-2"))))
// Update column stats for equi-join keys (key-1-5 and key-1-2).
- val joinedColStat = ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
- avgLen = 4, maxLen = 4)
+ val joinedColStat = ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
// Update column stat for other column if #outputRow / #sideRow < 1 (key-5-9), or keep it
// unchanged (key-2-4).
- val colStatForkey59 = nameToColInfo("key-5-9")._2.copy(distinctCount = 5 * 3 / 5)
+ val colStatForkey59 = nameToColInfo("key-5-9")._2.copy(distinctCount = Some(5 * 3 / 5))
val expectedStats = Statistics(
sizeInBytes = 3 * (8 + 4 * 4),
@@ -379,10 +386,10 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
EqualTo(nameToAttr("key-2-4"), nameToAttr("key-2-3")))))
// Update column stats for join keys.
- val joinedColStat1 = ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
- avgLen = 4, maxLen = 4)
- val joinedColStat2 = ColumnStat(distinctCount = 2, min = Some(2), max = Some(3), nullCount = 0,
- avgLen = 4, maxLen = 4)
+ val joinedColStat1 = ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
+ val joinedColStat2 = ColumnStat(distinctCount = Some(2), min = Some(2), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
val expectedStats = Statistics(
sizeInBytes = 2 * (8 + 4 * 4),
@@ -398,8 +405,8 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
// table3 (key-1-2 int, key-2-3 int): (1, 2), (2, 3)
val join = Join(table3, table2, LeftOuter,
Some(EqualTo(nameToAttr("key-2-3"), nameToAttr("key-2-4"))))
- val joinedColStat = ColumnStat(distinctCount = 2, min = Some(2), max = Some(3), nullCount = 0,
- avgLen = 4, maxLen = 4)
+ val joinedColStat = ColumnStat(distinctCount = Some(2), min = Some(2), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
val expectedStats = Statistics(
sizeInBytes = 2 * (8 + 4 * 4),
@@ -416,8 +423,8 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
// table3 (key-1-2 int, key-2-3 int): (1, 2), (2, 3)
val join = Join(table2, table3, RightOuter,
Some(EqualTo(nameToAttr("key-2-4"), nameToAttr("key-2-3"))))
- val joinedColStat = ColumnStat(distinctCount = 2, min = Some(2), max = Some(3), nullCount = 0,
- avgLen = 4, maxLen = 4)
+ val joinedColStat = ColumnStat(distinctCount = Some(2), min = Some(2), max = Some(3),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
val expectedStats = Statistics(
sizeInBytes = 2 * (8 + 4 * 4),
@@ -466,30 +473,40 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
val date = DateTimeUtils.fromJavaDate(Date.valueOf("2016-05-08"))
val timestamp = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-05-08 00:00:01"))
mutable.LinkedHashMap[Attribute, ColumnStat](
- AttributeReference("cbool", BooleanType)() -> ColumnStat(distinctCount = 1,
- min = Some(false), max = Some(false), nullCount = 0, avgLen = 1, maxLen = 1),
- AttributeReference("cbyte", ByteType)() -> ColumnStat(distinctCount = 1,
- min = Some(1.toByte), max = Some(1.toByte), nullCount = 0, avgLen = 1, maxLen = 1),
- AttributeReference("cshort", ShortType)() -> ColumnStat(distinctCount = 1,
- min = Some(1.toShort), max = Some(1.toShort), nullCount = 0, avgLen = 2, maxLen = 2),
- AttributeReference("cint", IntegerType)() -> ColumnStat(distinctCount = 1,
- min = Some(1), max = Some(1), nullCount = 0, avgLen = 4, maxLen = 4),
- AttributeReference("clong", LongType)() -> ColumnStat(distinctCount = 1,
- min = Some(1L), max = Some(1L), nullCount = 0, avgLen = 8, maxLen = 8),
- AttributeReference("cdouble", DoubleType)() -> ColumnStat(distinctCount = 1,
- min = Some(1.0), max = Some(1.0), nullCount = 0, avgLen = 8, maxLen = 8),
- AttributeReference("cfloat", FloatType)() -> ColumnStat(distinctCount = 1,
- min = Some(1.0f), max = Some(1.0f), nullCount = 0, avgLen = 4, maxLen = 4),
- AttributeReference("cdec", DecimalType.SYSTEM_DEFAULT)() -> ColumnStat(distinctCount = 1,
- min = Some(dec), max = Some(dec), nullCount = 0, avgLen = 16, maxLen = 16),
- AttributeReference("cstring", StringType)() -> ColumnStat(distinctCount = 1,
- min = None, max = None, nullCount = 0, avgLen = 3, maxLen = 3),
- AttributeReference("cbinary", BinaryType)() -> ColumnStat(distinctCount = 1,
- min = None, max = None, nullCount = 0, avgLen = 3, maxLen = 3),
- AttributeReference("cdate", DateType)() -> ColumnStat(distinctCount = 1,
- min = Some(date), max = Some(date), nullCount = 0, avgLen = 4, maxLen = 4),
- AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = 1,
- min = Some(timestamp), max = Some(timestamp), nullCount = 0, avgLen = 8, maxLen = 8)
+ AttributeReference("cbool", BooleanType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(false), max = Some(false),
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)),
+ AttributeReference("cbyte", ByteType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(1.toByte), max = Some(1.toByte),
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)),
+ AttributeReference("cshort", ShortType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(1.toShort), max = Some(1.toShort),
+ nullCount = Some(0), avgLen = Some(2), maxLen = Some(2)),
+ AttributeReference("cint", IntegerType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(1), max = Some(1),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ AttributeReference("clong", LongType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(1L), max = Some(1L),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+ AttributeReference("cdouble", DoubleType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(1.0), max = Some(1.0),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+ AttributeReference("cfloat", FloatType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(1.0f), max = Some(1.0f),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ AttributeReference("cdec", DecimalType.SYSTEM_DEFAULT)() -> ColumnStat(
+ distinctCount = Some(1), min = Some(dec), max = Some(dec),
+ nullCount = Some(0), avgLen = Some(16), maxLen = Some(16)),
+ AttributeReference("cstring", StringType)() -> ColumnStat(distinctCount = Some(1),
+ min = None, max = None, nullCount = Some(0), avgLen = Some(3), maxLen = Some(3)),
+ AttributeReference("cbinary", BinaryType)() -> ColumnStat(distinctCount = Some(1),
+ min = None, max = None, nullCount = Some(0), avgLen = Some(3), maxLen = Some(3)),
+ AttributeReference("cdate", DateType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(date), max = Some(date),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = Some(1),
+ min = Some(timestamp), max = Some(timestamp),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
)
}
@@ -520,7 +537,8 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
test("join with null column") {
val (nullColumn, nullColStat) = (attr("cnull"),
- ColumnStat(distinctCount = 0, min = None, max = None, nullCount = 1, avgLen = 4, maxLen = 4))
+ ColumnStat(distinctCount = Some(0), min = None, max = None,
+ nullCount = Some(1), avgLen = Some(4), maxLen = Some(4)))
val nullTable = StatsTestPlan(
outputList = Seq(nullColumn),
rowCount = 1,
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
index cda54fa..dcb3701 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
@@ -28,10 +28,10 @@ import org.apache.spark.sql.types._
class ProjectEstimationSuite extends StatsEstimationTestBase {
test("project with alias") {
- val (ar1, colStat1) = (attr("key1"), ColumnStat(distinctCount = 2, min = Some(1),
- max = Some(2), nullCount = 0, avgLen = 4, maxLen = 4))
- val (ar2, colStat2) = (attr("key2"), ColumnStat(distinctCount = 1, min = Some(10),
- max = Some(10), nullCount = 0, avgLen = 4, maxLen = 4))
+ val (ar1, colStat1) = (attr("key1"), ColumnStat(distinctCount = Some(2), min = Some(1),
+ max = Some(2), nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))
+ val (ar2, colStat2) = (attr("key2"), ColumnStat(distinctCount = Some(1), min = Some(10),
+ max = Some(10), nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))
val child = StatsTestPlan(
outputList = Seq(ar1, ar2),
@@ -49,8 +49,8 @@ class ProjectEstimationSuite extends StatsEstimationTestBase {
}
test("project on empty table") {
- val (ar1, colStat1) = (attr("key1"), ColumnStat(distinctCount = 0, min = None, max = None,
- nullCount = 0, avgLen = 4, maxLen = 4))
+ val (ar1, colStat1) = (attr("key1"), ColumnStat(distinctCount = Some(0), min = None, max = None,
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))
val child = StatsTestPlan(
outputList = Seq(ar1),
rowCount = 0,
@@ -71,30 +71,40 @@ class ProjectEstimationSuite extends StatsEstimationTestBase {
val t2 = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-05-09 00:00:02"))
val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
- AttributeReference("cbool", BooleanType)() -> ColumnStat(distinctCount = 2,
- min = Some(false), max = Some(true), nullCount = 0, avgLen = 1, maxLen = 1),
- AttributeReference("cbyte", ByteType)() -> ColumnStat(distinctCount = 2,
- min = Some(1.toByte), max = Some(2.toByte), nullCount = 0, avgLen = 1, maxLen = 1),
- AttributeReference("cshort", ShortType)() -> ColumnStat(distinctCount = 2,
- min = Some(1.toShort), max = Some(3.toShort), nullCount = 0, avgLen = 2, maxLen = 2),
- AttributeReference("cint", IntegerType)() -> ColumnStat(distinctCount = 2,
- min = Some(1), max = Some(4), nullCount = 0, avgLen = 4, maxLen = 4),
- AttributeReference("clong", LongType)() -> ColumnStat(distinctCount = 2,
- min = Some(1L), max = Some(5L), nullCount = 0, avgLen = 8, maxLen = 8),
- AttributeReference("cdouble", DoubleType)() -> ColumnStat(distinctCount = 2,
- min = Some(1.0), max = Some(6.0), nullCount = 0, avgLen = 8, maxLen = 8),
- AttributeReference("cfloat", FloatType)() -> ColumnStat(distinctCount = 2,
- min = Some(1.0f), max = Some(7.0f), nullCount = 0, avgLen = 4, maxLen = 4),
- AttributeReference("cdecimal", DecimalType.SYSTEM_DEFAULT)() -> ColumnStat(distinctCount = 2,
- min = Some(dec1), max = Some(dec2), nullCount = 0, avgLen = 16, maxLen = 16),
- AttributeReference("cstring", StringType)() -> ColumnStat(distinctCount = 2,
- min = None, max = None, nullCount = 0, avgLen = 3, maxLen = 3),
- AttributeReference("cbinary", BinaryType)() -> ColumnStat(distinctCount = 2,
- min = None, max = None, nullCount = 0, avgLen = 3, maxLen = 3),
- AttributeReference("cdate", DateType)() -> ColumnStat(distinctCount = 2,
- min = Some(d1), max = Some(d2), nullCount = 0, avgLen = 4, maxLen = 4),
- AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = 2,
- min = Some(t1), max = Some(t2), nullCount = 0, avgLen = 8, maxLen = 8)
+ AttributeReference("cbool", BooleanType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(false), max = Some(true),
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)),
+ AttributeReference("cbyte", ByteType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(1), max = Some(2),
+ nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)),
+ AttributeReference("cshort", ShortType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(1), max = Some(3),
+ nullCount = Some(0), avgLen = Some(2), maxLen = Some(2)),
+ AttributeReference("cint", IntegerType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(1), max = Some(4),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ AttributeReference("clong", LongType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(1), max = Some(5),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+ AttributeReference("cdouble", DoubleType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(1.0), max = Some(6.0),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+ AttributeReference("cfloat", FloatType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(1.0), max = Some(7.0),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ AttributeReference("cdecimal", DecimalType.SYSTEM_DEFAULT)() -> ColumnStat(
+ distinctCount = Some(2), min = Some(dec1), max = Some(dec2),
+ nullCount = Some(0), avgLen = Some(16), maxLen = Some(16)),
+ AttributeReference("cstring", StringType)() -> ColumnStat(distinctCount = Some(2),
+ min = None, max = None, nullCount = Some(0), avgLen = Some(3), maxLen = Some(3)),
+ AttributeReference("cbinary", BinaryType)() -> ColumnStat(distinctCount = Some(2),
+ min = None, max = None, nullCount = Some(0), avgLen = Some(3), maxLen = Some(3)),
+ AttributeReference("cdate", DateType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(d1), max = Some(d2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = Some(2),
+ min = Some(t1), max = Some(t2),
+ nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
))
val columnSizes: Map[Attribute, Long] = columnInfo.map(kv => (kv._1, getColSize(kv._1, kv._2)))
val child = StatsTestPlan(
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
index 31dea2e..9dceca5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
@@ -42,8 +42,8 @@ trait StatsEstimationTestBase extends SparkFunSuite {
def getColSize(attribute: Attribute, colStat: ColumnStat): Long = attribute.dataType match {
// For UTF8String: base + offset + numBytes
- case StringType => colStat.avgLen + 8 + 4
- case _ => colStat.avgLen
+ case StringType => colStat.avgLen.getOrElse(attribute.dataType.defaultSize.toLong) + 8 + 4
+ case _ => colStat.avgLen.getOrElse(attribute.dataType.defaultSize)
}
def attr(colName: String): AttributeReference = AttributeReference(colName, IntegerType)()
@@ -54,6 +54,12 @@ trait StatsEstimationTestBase extends SparkFunSuite {
val nameToAttr: Map[String, Attribute] = plan.output.map(a => (a.name, a)).toMap
AttributeMap(colStats.map(kv => nameToAttr(kv._1) -> kv._2))
}
+
+ /** Get a test ColumnStat with given distinctCount and nullCount */
+ def rangeColumnStat(distinctCount: Int, nullCount: Int): ColumnStat =
+ ColumnStat(distinctCount = Some(distinctCount),
+ min = Some(1), max = Some(distinctCount),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 1122522..640e013 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.command
import scala.collection.mutable
import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTableType}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.util.ArrayData
import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
/**
@@ -64,12 +66,12 @@ case class AnalyzeColumnCommand(
/**
* Compute stats for the given columns.
- * @return (row count, map from column name to ColumnStats)
+ * @return (row count, map from column name to CatalogColumnStats)
*/
private def computeColumnStats(
sparkSession: SparkSession,
tableIdent: TableIdentifier,
- columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = {
+ columnNames: Seq[String]): (Long, Map[String, CatalogColumnStat]) = {
val conf = sparkSession.sessionState.conf
val relation = sparkSession.table(tableIdent).logicalPlan
@@ -81,7 +83,7 @@ case class AnalyzeColumnCommand(
// Make sure the column types are supported for stats gathering.
attributesToAnalyze.foreach { attr =>
- if (!ColumnStat.supportsType(attr.dataType)) {
+ if (!supportsType(attr.dataType)) {
throw new AnalysisException(
s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
"and Spark does not support statistics collection on this column type.")
@@ -103,7 +105,7 @@ case class AnalyzeColumnCommand(
// will be structs containing all column stats.
// The layout of each struct follows the layout of the ColumnStats.
val expressions = Count(Literal(1)).toAggregateExpression() +:
- attributesToAnalyze.map(ColumnStat.statExprs(_, conf, attributePercentiles))
+ attributesToAnalyze.map(statExprs(_, conf, attributePercentiles))
val namedExpressions = expressions.map(e => Alias(e, e.toString)())
val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation))
@@ -111,9 +113,9 @@ case class AnalyzeColumnCommand(
val rowCount = statsRow.getLong(0)
val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) =>
- // according to `ColumnStat.statExprs`, the stats struct always have 7 fields.
- (attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
- attributePercentiles.get(attr)))
+ // according to `statExprs`, the stats struct always have 7 fields.
+ (attr.name, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
+ attributePercentiles.get(attr)).toCatalogColumnStat(attr.name, attr.dataType))
}.toMap
(rowCount, columnStats)
}
@@ -124,7 +126,7 @@ case class AnalyzeColumnCommand(
sparkSession: SparkSession,
relation: LogicalPlan): AttributeMap[ArrayData] = {
val attrsToGenHistogram = if (conf.histogramEnabled) {
- attributesToAnalyze.filter(a => ColumnStat.supportsHistogram(a.dataType))
+ attributesToAnalyze.filter(a => supportsHistogram(a.dataType))
} else {
Nil
}
@@ -154,4 +156,120 @@ case class AnalyzeColumnCommand(
AttributeMap(attributePercentiles.toSeq)
}
+ /** Returns true iff the we support gathering column statistics on column of the given type. */
+ private def supportsType(dataType: DataType): Boolean = dataType match {
+ case _: IntegralType => true
+ case _: DecimalType => true
+ case DoubleType | FloatType => true
+ case BooleanType => true
+ case DateType => true
+ case TimestampType => true
+ case BinaryType | StringType => true
+ case _ => false
+ }
+
+ /** Returns true iff the we support gathering histogram on column of the given type. */
+ private def supportsHistogram(dataType: DataType): Boolean = dataType match {
+ case _: IntegralType => true
+ case _: DecimalType => true
+ case DoubleType | FloatType => true
+ case DateType => true
+ case TimestampType => true
+ case _ => false
+ }
+
+ /**
+ * Constructs an expression to compute column statistics for a given column.
+ *
+ * The expression should create a single struct column with the following schema:
+ * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long,
+ * distinctCountsForIntervals: Array[Long]
+ *
+ * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and
+ * as a result should stay in sync with it.
+ */
+ private def statExprs(
+ col: Attribute,
+ conf: SQLConf,
+ colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = {
+ def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr =>
+ expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() }
+ })
+ val one = Literal(1, LongType)
+
+ // the approximate ndv (num distinct value) should never be larger than the number of rows
+ val numNonNulls = if (col.nullable) Count(col) else Count(one)
+ val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls))
+ val numNulls = Subtract(Count(one), numNonNulls)
+ val defaultSize = Literal(col.dataType.defaultSize, LongType)
+ val nullArray = Literal(null, ArrayType(LongType))
+
+ def fixedLenTypeStruct: CreateNamedStruct = {
+ val genHistogram =
+ supportsHistogram(col.dataType) && colPercentiles.contains(col)
+ val intervalNdvsExpr = if (genHistogram) {
+ ApproxCountDistinctForIntervals(col,
+ Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError)
+ } else {
+ nullArray
+ }
+ // For fixed width types, avg size should be the same as max size.
+ struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls,
+ defaultSize, defaultSize, intervalNdvsExpr)
+ }
+
+ col.dataType match {
+ case _: IntegralType => fixedLenTypeStruct
+ case _: DecimalType => fixedLenTypeStruct
+ case DoubleType | FloatType => fixedLenTypeStruct
+ case BooleanType => fixedLenTypeStruct
+ case DateType => fixedLenTypeStruct
+ case TimestampType => fixedLenTypeStruct
+ case BinaryType | StringType =>
+ // For string and binary type, we don't compute min, max or histogram
+ val nullLit = Literal(null, col.dataType)
+ struct(
+ ndv, nullLit, nullLit, numNulls,
+ // Set avg/max size to default size if all the values are null or there is no value.
+ Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
+ Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
+ nullArray)
+ case _ =>
+ throw new AnalysisException("Analyzing column statistics is not supported for column " +
+ s"${col.name} of data type: ${col.dataType}.")
+ }
+ }
+
+ /** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */
+ private def rowToColumnStat(
+ row: InternalRow,
+ attr: Attribute,
+ rowCount: Long,
+ percentiles: Option[ArrayData]): ColumnStat = {
+ // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins.
+ val cs = ColumnStat(
+ distinctCount = Option(BigInt(row.getLong(0))),
+ // for string/binary min/max, get should return null
+ min = Option(row.get(1, attr.dataType)),
+ max = Option(row.get(2, attr.dataType)),
+ nullCount = Option(BigInt(row.getLong(3))),
+ avgLen = Option(row.getLong(4)),
+ maxLen = Option(row.getLong(5))
+ )
+ if (row.isNullAt(6) || cs.nullCount.isEmpty) {
+ cs
+ } else {
+ val ndvs = row.getArray(6).toLongArray()
+ assert(percentiles.get.numElements() == ndvs.length + 1)
+ val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
+ // Construct equi-height histogram
+ val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
+ HistogramBin(endpoints(i), endpoints(i + 1), ndv)
+ }
+ val nonNullRows = rowCount - cs.nullCount.get
+ val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
+ cs.copy(histogram = Some(histogram))
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e400975..4474919 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -695,10 +695,11 @@ case class DescribeColumnCommand(
// Show column stats when EXTENDED or FORMATTED is specified.
buffer += Row("min", cs.flatMap(_.min.map(_.toString)).getOrElse("NULL"))
buffer += Row("max", cs.flatMap(_.max.map(_.toString)).getOrElse("NULL"))
- buffer += Row("num_nulls", cs.map(_.nullCount.toString).getOrElse("NULL"))
- buffer += Row("distinct_count", cs.map(_.distinctCount.toString).getOrElse("NULL"))
- buffer += Row("avg_col_len", cs.map(_.avgLen.toString).getOrElse("NULL"))
- buffer += Row("max_col_len", cs.map(_.maxLen.toString).getOrElse("NULL"))
+ buffer += Row("num_nulls", cs.flatMap(_.nullCount.map(_.toString)).getOrElse("NULL"))
+ buffer += Row("distinct_count",
+ cs.flatMap(_.distinctCount.map(_.toString)).getOrElse("NULL"))
+ buffer += Row("avg_col_len", cs.flatMap(_.avgLen.map(_.toString)).getOrElse("NULL"))
+ buffer += Row("max_col_len", cs.flatMap(_.maxLen.map(_.toString)).getOrElse("NULL"))
val histDesc = for {
c <- cs
hist <- c.histogram
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index b11e798..ed4ea02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
import scala.collection.mutable
import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSQLContext
@@ -95,7 +96,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
assert(fetchedStats2.get.sizeInBytes == 0)
val expectedColStat =
- "key" -> ColumnStat(0, None, None, 0, IntegerType.defaultSize, IntegerType.defaultSize)
+ "key" -> CatalogColumnStat(Some(0), None, None, Some(0),
+ Some(IntegerType.defaultSize), Some(IntegerType.defaultSize))
// There won't be histogram for empty column.
Seq("true", "false").foreach { histogramEnabled =>
@@ -156,7 +158,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
Seq(stats, statsWithHgms).foreach { s =>
s.zip(df.schema).foreach { case ((k, v), field) =>
withClue(s"column $k with type ${field.dataType}") {
- val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap(k, field.dataType))
+ val roundtrip = CatalogColumnStat.fromMap("table_is_foo", field.name, v.toMap(k))
assert(roundtrip == Some(v))
}
}
@@ -187,7 +189,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}.mkString(", "))
val expectedColStats = dataTypes.map { case (tpe, idx) =>
- (s"col$idx", ColumnStat(0, None, None, 1, tpe.defaultSize.toLong, tpe.defaultSize.toLong))
+ (s"col$idx", CatalogColumnStat(Some(0), None, None, Some(1),
+ Some(tpe.defaultSize.toLong), Some(tpe.defaultSize.toLong)))
}
// There won't be histograms for null columns.
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 65ccc19..bf4abb6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -24,8 +24,8 @@ import scala.collection.mutable
import scala.util.Random
import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram, HistogramBin, LogicalPlan}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram, HistogramBin, HistogramSerializer, LogicalPlan}
import org.apache.spark.sql.catalyst.util.DateTimeUtils
import org.apache.spark.sql.execution.datasources.LogicalRelation
import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -67,18 +67,21 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
/** A mapping from column to the stats collected. */
protected val stats = mutable.LinkedHashMap(
- "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1),
- "cbyte" -> ColumnStat(2, Some(1.toByte), Some(2.toByte), 1, 1, 1),
- "cshort" -> ColumnStat(2, Some(1.toShort), Some(3.toShort), 1, 2, 2),
- "cint" -> ColumnStat(2, Some(1), Some(4), 1, 4, 4),
- "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8),
- "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8),
- "cfloat" -> ColumnStat(2, Some(1.0f), Some(7.0f), 1, 4, 4),
- "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 16, 16),
- "cstring" -> ColumnStat(2, None, None, 1, 3, 3),
- "cbinary" -> ColumnStat(2, None, None, 1, 3, 3),
- "cdate" -> ColumnStat(2, Some(d1Internal), Some(d2Internal), 1, 4, 4),
- "ctimestamp" -> ColumnStat(2, Some(t1Internal), Some(t2Internal), 1, 8, 8)
+ "cbool" -> CatalogColumnStat(Some(2), Some("false"), Some("true"), Some(1), Some(1), Some(1)),
+ "cbyte" -> CatalogColumnStat(Some(2), Some("1"), Some("2"), Some(1), Some(1), Some(1)),
+ "cshort" -> CatalogColumnStat(Some(2), Some("1"), Some("3"), Some(1), Some(2), Some(2)),
+ "cint" -> CatalogColumnStat(Some(2), Some("1"), Some("4"), Some(1), Some(4), Some(4)),
+ "clong" -> CatalogColumnStat(Some(2), Some("1"), Some("5"), Some(1), Some(8), Some(8)),
+ "cdouble" -> CatalogColumnStat(Some(2), Some("1.0"), Some("6.0"), Some(1), Some(8), Some(8)),
+ "cfloat" -> CatalogColumnStat(Some(2), Some("1.0"), Some("7.0"), Some(1), Some(4), Some(4)),
+ "cdecimal" -> CatalogColumnStat(Some(2), Some(dec1.toString), Some(dec2.toString), Some(1),
+ Some(16), Some(16)),
+ "cstring" -> CatalogColumnStat(Some(2), None, None, Some(1), Some(3), Some(3)),
+ "cbinary" -> CatalogColumnStat(Some(2), None, None, Some(1), Some(3), Some(3)),
+ "cdate" -> CatalogColumnStat(Some(2), Some(d1.toString), Some(d2.toString), Some(1), Some(4),
+ Some(4)),
+ "ctimestamp" -> CatalogColumnStat(Some(2), Some(t1.toString), Some(t2.toString), Some(1),
+ Some(8), Some(8))
)
/**
@@ -110,6 +113,110 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
colStats
}
+ val expectedSerializedColStats = Map(
+ "spark.sql.statistics.colStats.cbinary.avgLen" -> "3",
+ "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cbinary.maxLen" -> "3",
+ "spark.sql.statistics.colStats.cbinary.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cbinary.version" -> "1",
+ "spark.sql.statistics.colStats.cbool.avgLen" -> "1",
+ "spark.sql.statistics.colStats.cbool.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cbool.max" -> "true",
+ "spark.sql.statistics.colStats.cbool.maxLen" -> "1",
+ "spark.sql.statistics.colStats.cbool.min" -> "false",
+ "spark.sql.statistics.colStats.cbool.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cbool.version" -> "1",
+ "spark.sql.statistics.colStats.cbyte.avgLen" -> "1",
+ "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cbyte.max" -> "2",
+ "spark.sql.statistics.colStats.cbyte.maxLen" -> "1",
+ "spark.sql.statistics.colStats.cbyte.min" -> "1",
+ "spark.sql.statistics.colStats.cbyte.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cbyte.version" -> "1",
+ "spark.sql.statistics.colStats.cdate.avgLen" -> "4",
+ "spark.sql.statistics.colStats.cdate.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09",
+ "spark.sql.statistics.colStats.cdate.maxLen" -> "4",
+ "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08",
+ "spark.sql.statistics.colStats.cdate.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cdate.version" -> "1",
+ "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16",
+ "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000",
+ "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16",
+ "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000",
+ "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cdecimal.version" -> "1",
+ "spark.sql.statistics.colStats.cdouble.avgLen" -> "8",
+ "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cdouble.max" -> "6.0",
+ "spark.sql.statistics.colStats.cdouble.maxLen" -> "8",
+ "spark.sql.statistics.colStats.cdouble.min" -> "1.0",
+ "spark.sql.statistics.colStats.cdouble.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cdouble.version" -> "1",
+ "spark.sql.statistics.colStats.cfloat.avgLen" -> "4",
+ "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cfloat.max" -> "7.0",
+ "spark.sql.statistics.colStats.cfloat.maxLen" -> "4",
+ "spark.sql.statistics.colStats.cfloat.min" -> "1.0",
+ "spark.sql.statistics.colStats.cfloat.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cfloat.version" -> "1",
+ "spark.sql.statistics.colStats.cint.avgLen" -> "4",
+ "spark.sql.statistics.colStats.cint.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cint.max" -> "4",
+ "spark.sql.statistics.colStats.cint.maxLen" -> "4",
+ "spark.sql.statistics.colStats.cint.min" -> "1",
+ "spark.sql.statistics.colStats.cint.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cint.version" -> "1",
+ "spark.sql.statistics.colStats.clong.avgLen" -> "8",
+ "spark.sql.statistics.colStats.clong.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.clong.max" -> "5",
+ "spark.sql.statistics.colStats.clong.maxLen" -> "8",
+ "spark.sql.statistics.colStats.clong.min" -> "1",
+ "spark.sql.statistics.colStats.clong.nullCount" -> "1",
+ "spark.sql.statistics.colStats.clong.version" -> "1",
+ "spark.sql.statistics.colStats.cshort.avgLen" -> "2",
+ "spark.sql.statistics.colStats.cshort.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cshort.max" -> "3",
+ "spark.sql.statistics.colStats.cshort.maxLen" -> "2",
+ "spark.sql.statistics.colStats.cshort.min" -> "1",
+ "spark.sql.statistics.colStats.cshort.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cshort.version" -> "1",
+ "spark.sql.statistics.colStats.cstring.avgLen" -> "3",
+ "spark.sql.statistics.colStats.cstring.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.cstring.maxLen" -> "3",
+ "spark.sql.statistics.colStats.cstring.nullCount" -> "1",
+ "spark.sql.statistics.colStats.cstring.version" -> "1",
+ "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8",
+ "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2",
+ "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 00:00:02.0",
+ "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
+ "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.0",
+ "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
+ "spark.sql.statistics.colStats.ctimestamp.version" -> "1"
+ )
+
+ val expectedSerializedHistograms = Map(
+ "spark.sql.statistics.colStats.cbyte.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("cbyte").histogram.get),
+ "spark.sql.statistics.colStats.cshort.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("cshort").histogram.get),
+ "spark.sql.statistics.colStats.cint.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("cint").histogram.get),
+ "spark.sql.statistics.colStats.clong.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("clong").histogram.get),
+ "spark.sql.statistics.colStats.cdouble.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("cdouble").histogram.get),
+ "spark.sql.statistics.colStats.cfloat.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("cfloat").histogram.get),
+ "spark.sql.statistics.colStats.cdecimal.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("cdecimal").histogram.get),
+ "spark.sql.statistics.colStats.cdate.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get),
+ "spark.sql.statistics.colStats.ctimestamp.histogram" ->
+ HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get)
+ )
+
private val randomName = new Random(31)
def getCatalogTable(tableName: String): CatalogTable = {
@@ -151,7 +258,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
*/
def checkColStats(
df: DataFrame,
- colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
+ colStats: mutable.LinkedHashMap[String, CatalogColumnStat]): Unit = {
val tableName = "column_stats_test_" + randomName.nextInt(1000)
withTable(tableName) {
df.write.saveAsTable(tableName)
@@ -161,14 +268,24 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
colStats.keys.mkString(", "))
// Validate statistics
- val table = getCatalogTable(tableName)
- assert(table.stats.isDefined)
- assert(table.stats.get.colStats.size == colStats.size)
-
- colStats.foreach { case (k, v) =>
- withClue(s"column $k") {
- assert(table.stats.get.colStats(k) == v)
- }
+ validateColStats(tableName, colStats)
+ }
+ }
+
+ /**
+ * Validate if the given catalog table has the provided statistics.
+ */
+ def validateColStats(
+ tableName: String,
+ colStats: mutable.LinkedHashMap[String, CatalogColumnStat]): Unit = {
+
+ val table = getCatalogTable(tableName)
+ assert(table.stats.isDefined)
+ assert(table.stats.get.colStats.size == colStats.size)
+
+ colStats.foreach { case (k, v) =>
+ withClue(s"column $k") {
+ assert(table.stats.get.colStats(k) == v)
}
}
}
@@ -215,12 +332,13 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
}.head
- val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
+ val emptyColStat = ColumnStat(Some(0), None, None, Some(0), Some(4), Some(4))
+ val emptyCatalogColStat = CatalogColumnStat(Some(0), None, None, Some(0), Some(4), Some(4))
// Check catalog statistics
assert(catalogTable.stats.isDefined)
assert(catalogTable.stats.get.sizeInBytes == 0)
assert(catalogTable.stats.get.rowCount == Some(0))
- assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
+ assert(catalogTable.stats.get.colStats == Map("c1" -> emptyCatalogColStat))
// Check relation statistics
withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org