You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/27 07:37:38 UTC
[3/3] spark git commit: [SPARK-23445] ColumnStat refactoring
[SPARK-23445] ColumnStat refactoring
## What changes were proposed in this pull request?
Refactor ColumnStat to be more flexible.
* Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is split from `Statistics`. This detaches how the statistics are stored from how they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` as `String`, making it not depend on dataType information.
* For `CatalogColumnStat`, parse column names from property names in the metastore (`KEY_VERSION` property), not from metastore schema. This means that `CatalogColumnStat`s can be created for columns even if the schema itself is not stored in the metastore.
* Make all fields optional. `min`, `max` and `histogram` for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate.
The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans.
## How was this patch tested?
Refactored existing tests to work with refactored `ColumnStat` and `CatalogColumnStat`.
New tests added in `StatisticsSuite` checking that backwards / forwards compatibility is not broken.
Author: Juliusz Sompolski <ju...@databricks.com>
Closes #20624 from juliuszsompolski/SPARK-23445.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8077bb04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8077bb04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8077bb04
Branch: refs/heads/master
Commit: 8077bb04f350fd35df83ef896135c0672dc3f7b0
Parents: 7ec8365
Author: Juliusz Sompolski <ju...@databricks.com>
Authored: Mon Feb 26 23:37:31 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Feb 26 23:37:31 2018 -0800
----------------------------------------------------------------------
.../spark/sql/catalyst/catalog/interface.scala | 146 +++++++++-
.../optimizer/StarSchemaDetection.scala | 6 +-
.../sql/catalyst/plans/logical/Statistics.scala | 256 ++---------------
.../statsEstimation/AggregateEstimation.scala | 6 +-
.../statsEstimation/EstimationUtils.scala | 20 +-
.../statsEstimation/FilterEstimation.scala | 98 ++++---
.../statsEstimation/JoinEstimation.scala | 55 ++--
.../catalyst/optimizer/JoinReorderSuite.scala | 25 +-
.../StarJoinCostBasedReorderSuite.scala | 96 +++----
.../optimizer/StarJoinReorderSuite.scala | 77 ++---
.../AggregateEstimationSuite.scala | 24 +-
.../BasicStatsEstimationSuite.scala | 12 +-
.../statsEstimation/FilterEstimationSuite.scala | 279 ++++++++++---------
.../statsEstimation/JoinEstimationSuite.scala | 138 +++++----
.../ProjectEstimationSuite.scala | 70 +++--
.../StatsEstimationTestBase.scala | 10 +-
.../command/AnalyzeColumnCommand.scala | 138 ++++++++-
.../spark/sql/execution/command/tables.scala | 9 +-
.../spark/sql/StatisticsCollectionSuite.scala | 9 +-
.../sql/StatisticsCollectionTestBase.scala | 168 +++++++++--
.../spark/sql/hive/HiveExternalCatalog.scala | 63 ++---
.../apache/spark/sql/hive/StatisticsSuite.scala | 162 +++--------
22 files changed, 995 insertions(+), 872 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 95b6fbb..f3e67dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,7 +21,9 @@ import java.net.URI
import java.util.Date
import scala.collection.mutable
+import scala.util.control.NonFatal
+import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
@@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
/**
@@ -361,7 +363,7 @@ object CatalogTable {
case class CatalogStatistics(
sizeInBytes: BigInt,
rowCount: Option[BigInt] = None,
- colStats: Map[String, ColumnStat] = Map.empty) {
+ colStats: Map[String, CatalogColumnStat] = Map.empty) {
/**
* Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
@@ -369,7 +371,8 @@ case class CatalogStatistics(
*/
def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
if (cboEnabled && rowCount.isDefined) {
- val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
+ val attrStats = AttributeMap(planOutput
+ .flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
// Estimate size as number of rows * row size.
val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
@@ -387,6 +390,143 @@ case class CatalogStatistics(
}
}
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore.
+ */
+case class CatalogColumnStat(
+ distinctCount: Option[BigInt] = None,
+ min: Option[String] = None,
+ max: Option[String] = None,
+ nullCount: Option[BigInt] = None,
+ avgLen: Option[Long] = None,
+ maxLen: Option[Long] = None,
+ histogram: Option[Histogram] = None) {
+
+ /**
+ * Returns a map from string to string that can be used to serialize the column stats.
+ * The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
+ * and the value is the string representation for the value.
+ * min/max values are stored as Strings. They can be deserialized using
+ * [[CatalogColumnStat.fromExternalString]].
+ *
+ * As part of the protocol, the returned map always contains a key called "version".
+ * Any of the fields that are null (None) won't appear in the map.
+ */
+ def toMap(colName: String): Map[String, String] = {
+ val map = new scala.collection.mutable.HashMap[String, String]
+ map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+ distinctCount.foreach { v =>
+ map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
+ }
+ nullCount.foreach { v =>
+ map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString)
+ }
+ avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
+ maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
+ min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
+ max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
+ histogram.foreach { h =>
+ map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
+ }
+ map.toMap
+ }
+
+ /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
+ def toPlanStat(
+ colName: String,
+ dataType: DataType): ColumnStat =
+ ColumnStat(
+ distinctCount = distinctCount,
+ min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+ max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+ nullCount = nullCount,
+ avgLen = avgLen,
+ maxLen = maxLen,
+ histogram = histogram)
+}
+
+object CatalogColumnStat extends Logging {
+
+ // List of string keys used to serialize CatalogColumnStat
+ val KEY_VERSION = "version"
+ private val KEY_DISTINCT_COUNT = "distinctCount"
+ private val KEY_MIN_VALUE = "min"
+ private val KEY_MAX_VALUE = "max"
+ private val KEY_NULL_COUNT = "nullCount"
+ private val KEY_AVG_LEN = "avgLen"
+ private val KEY_MAX_LEN = "maxLen"
+ private val KEY_HISTOGRAM = "histogram"
+
+ /**
+ * Converts from string representation of data type to the corresponding Catalyst data type.
+ */
+ def fromExternalString(s: String, name: String, dataType: DataType): Any = {
+ dataType match {
+ case BooleanType => s.toBoolean
+ case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
+ case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+ case ByteType => s.toByte
+ case ShortType => s.toShort
+ case IntegerType => s.toInt
+ case LongType => s.toLong
+ case FloatType => s.toFloat
+ case DoubleType => s.toDouble
+ case _: DecimalType => Decimal(s)
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case BinaryType | StringType => null
+ case _ =>
+ throw new AnalysisException("Column statistics deserialization is not supported for " +
+ s"column $name of data type: $dataType.")
+ }
+ }
+
+ /**
+ * Converts the given value from Catalyst data type to string representation of external
+ * data type.
+ */
+ def toExternalString(v: Any, colName: String, dataType: DataType): String = {
+ val externalValue = dataType match {
+ case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
+ case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
+ case BooleanType | _: IntegralType | FloatType | DoubleType => v
+ case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
+ // This version of Spark does not use min/max for binary/string types so we ignore it.
+ case _ =>
+ throw new AnalysisException("Column statistics serialization is not supported for " +
+ s"column $colName of data type: $dataType.")
+ }
+ externalValue.toString
+ }
+
+
+ /**
+ * Creates a [[CatalogColumnStat]] object from the given map.
+ * This is used to deserialize column stats from some external storage.
+ * The serialization side is defined in [[CatalogColumnStat.toMap]].
+ */
+ def fromMap(
+ table: String,
+ colName: String,
+ map: Map[String, String]): Option[CatalogColumnStat] = {
+
+ try {
+ Some(CatalogColumnStat(
+ distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)),
+ min = map.get(s"${colName}.${KEY_MIN_VALUE}"),
+ max = map.get(s"${colName}.${KEY_MAX_VALUE}"),
+ nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
+ avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
+ maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
+ histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
+ ))
+ } catch {
+ case NonFatal(e) =>
+ logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e)
+ None
+ }
+ }
+}
+
case class CatalogTableType private(name: String)
object CatalogTableType {
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
index 1f20b76..2aa762e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
@@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper {
stats.rowCount match {
case Some(rowCount) if rowCount >= 0 =>
if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
- val colStats = stats.attributeStats.get(col)
- if (colStats.get.nullCount > 0) {
+ val colStats = stats.attributeStats.get(col).get
+ if (!colStats.hasCountStats || colStats.nullCount.get > 0) {
false
} else {
- val distinctCount = colStats.get.distinctCount
+ val distinctCount = colStats.distinctCount.get
val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
// ndvMaxErr adjusted based on TPCDS 1TB data results
relDiff <= conf.ndvMaxError * 2
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 96b199d..b3a4886 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -27,6 +27,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.catalyst.expressions.aggregate._
import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils}
@@ -79,11 +80,10 @@ case class Statistics(
/**
* Statistics collected for a column.
*
- * 1. Supported data types are defined in `ColumnStat.supportsType`.
- * 2. The JVM data type stored in min/max is the internal data type for the corresponding
+ * 1. The JVM data type stored in min/max is the internal data type for the corresponding
* Catalyst data type. For example, the internal type of DateType is Int, and that the internal
* type of TimestampType is Long.
- * 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
+ * 2. There is no guarantee that the statistics collected are accurate. Approximation algorithms
* (sketches) might have been used, and the data collected can also be stale.
*
* @param distinctCount number of distinct values
@@ -95,240 +95,32 @@ case class Statistics(
* @param histogram histogram of the values
*/
case class ColumnStat(
- distinctCount: BigInt,
- min: Option[Any],
- max: Option[Any],
- nullCount: BigInt,
- avgLen: Long,
- maxLen: Long,
+ distinctCount: Option[BigInt] = None,
+ min: Option[Any] = None,
+ max: Option[Any] = None,
+ nullCount: Option[BigInt] = None,
+ avgLen: Option[Long] = None,
+ maxLen: Option[Long] = None,
histogram: Option[Histogram] = None) {
- // We currently don't store min/max for binary/string type. This can change in the future and
- // then we need to remove this require.
- require(min.isEmpty || (!min.get.isInstanceOf[Array[Byte]] && !min.get.isInstanceOf[String]))
- require(max.isEmpty || (!max.get.isInstanceOf[Array[Byte]] && !max.get.isInstanceOf[String]))
-
- /**
- * Returns a map from string to string that can be used to serialize the column stats.
- * The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
- * representation for the value. min/max values are converted to the external data type. For
- * example, for DateType we store java.sql.Date, and for TimestampType we store
- * java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
- *
- * As part of the protocol, the returned map always contains a key called "version".
- * In the case min/max values are null (None), they won't appear in the map.
- */
- def toMap(colName: String, dataType: DataType): Map[String, String] = {
- val map = new scala.collection.mutable.HashMap[String, String]
- map.put(ColumnStat.KEY_VERSION, "1")
- map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
- map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
- map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
- map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
- min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
- max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
- histogram.foreach { h => map.put(ColumnStat.KEY_HISTOGRAM, HistogramSerializer.serialize(h)) }
- map.toMap
- }
-
- /**
- * Converts the given value from Catalyst data type to string representation of external
- * data type.
- */
- private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
- val externalValue = dataType match {
- case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
- case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
- case BooleanType | _: IntegralType | FloatType | DoubleType => v
- case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
- // This version of Spark does not use min/max for binary/string types so we ignore it.
- case _ =>
- throw new AnalysisException("Column statistics deserialization is not supported for " +
- s"column $colName of data type: $dataType.")
- }
- externalValue.toString
- }
-
-}
+ // Are distinctCount and nullCount statistics defined?
+ val hasCountStats = distinctCount.isDefined && nullCount.isDefined
+ // Are min and max statistics defined?
+ val hasMinMaxStats = min.isDefined && max.isDefined
-object ColumnStat extends Logging {
-
- // List of string keys used to serialize ColumnStat
- val KEY_VERSION = "version"
- private val KEY_DISTINCT_COUNT = "distinctCount"
- private val KEY_MIN_VALUE = "min"
- private val KEY_MAX_VALUE = "max"
- private val KEY_NULL_COUNT = "nullCount"
- private val KEY_AVG_LEN = "avgLen"
- private val KEY_MAX_LEN = "maxLen"
- private val KEY_HISTOGRAM = "histogram"
-
- /** Returns true iff the we support gathering column statistics on column of the given type. */
- def supportsType(dataType: DataType): Boolean = dataType match {
- case _: IntegralType => true
- case _: DecimalType => true
- case DoubleType | FloatType => true
- case BooleanType => true
- case DateType => true
- case TimestampType => true
- case BinaryType | StringType => true
- case _ => false
- }
-
- /** Returns true iff the we support gathering histogram on column of the given type. */
- def supportsHistogram(dataType: DataType): Boolean = dataType match {
- case _: IntegralType => true
- case _: DecimalType => true
- case DoubleType | FloatType => true
- case DateType => true
- case TimestampType => true
- case _ => false
- }
-
- /**
- * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
- * from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
- */
- def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
- try {
- Some(ColumnStat(
- distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
- // Note that flatMap(Option.apply) turns Option(null) into None.
- min = map.get(KEY_MIN_VALUE)
- .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
- max = map.get(KEY_MAX_VALUE)
- .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
- nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
- avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
- maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong,
- histogram = map.get(KEY_HISTOGRAM).map(HistogramSerializer.deserialize)
- ))
- } catch {
- case NonFatal(e) =>
- logWarning(s"Failed to parse column statistics for column ${field.name} in table $table", e)
- None
- }
- }
-
- /**
- * Converts from string representation of external data type to the corresponding Catalyst data
- * type.
- */
- private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
- dataType match {
- case BooleanType => s.toBoolean
- case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
- case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
- case ByteType => s.toByte
- case ShortType => s.toShort
- case IntegerType => s.toInt
- case LongType => s.toLong
- case FloatType => s.toFloat
- case DoubleType => s.toDouble
- case _: DecimalType => Decimal(s)
- // This version of Spark does not use min/max for binary/string types so we ignore it.
- case BinaryType | StringType => null
- case _ =>
- throw new AnalysisException("Column statistics deserialization is not supported for " +
- s"column $name of data type: $dataType.")
- }
- }
-
- /**
- * Constructs an expression to compute column statistics for a given column.
- *
- * The expression should create a single struct column with the following schema:
- * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long,
- * distinctCountsForIntervals: Array[Long]
- *
- * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and
- * as a result should stay in sync with it.
- */
- def statExprs(
- col: Attribute,
- conf: SQLConf,
- colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = {
- def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr =>
- expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() }
- })
- val one = Literal(1, LongType)
-
- // the approximate ndv (num distinct value) should never be larger than the number of rows
- val numNonNulls = if (col.nullable) Count(col) else Count(one)
- val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls))
- val numNulls = Subtract(Count(one), numNonNulls)
- val defaultSize = Literal(col.dataType.defaultSize, LongType)
- val nullArray = Literal(null, ArrayType(LongType))
-
- def fixedLenTypeStruct: CreateNamedStruct = {
- val genHistogram =
- ColumnStat.supportsHistogram(col.dataType) && colPercentiles.contains(col)
- val intervalNdvsExpr = if (genHistogram) {
- ApproxCountDistinctForIntervals(col,
- Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError)
- } else {
- nullArray
- }
- // For fixed width types, avg size should be the same as max size.
- struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls,
- defaultSize, defaultSize, intervalNdvsExpr)
- }
-
- col.dataType match {
- case _: IntegralType => fixedLenTypeStruct
- case _: DecimalType => fixedLenTypeStruct
- case DoubleType | FloatType => fixedLenTypeStruct
- case BooleanType => fixedLenTypeStruct
- case DateType => fixedLenTypeStruct
- case TimestampType => fixedLenTypeStruct
- case BinaryType | StringType =>
- // For string and binary type, we don't compute min, max or histogram
- val nullLit = Literal(null, col.dataType)
- struct(
- ndv, nullLit, nullLit, numNulls,
- // Set avg/max size to default size if all the values are null or there is no value.
- Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
- Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
- nullArray)
- case _ =>
- throw new AnalysisException("Analyzing column statistics is not supported for column " +
- s"${col.name} of data type: ${col.dataType}.")
- }
- }
-
- /** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */
- def rowToColumnStat(
- row: InternalRow,
- attr: Attribute,
- rowCount: Long,
- percentiles: Option[ArrayData]): ColumnStat = {
- // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins.
- val cs = ColumnStat(
- distinctCount = BigInt(row.getLong(0)),
- // for string/binary min/max, get should return null
- min = Option(row.get(1, attr.dataType)),
- max = Option(row.get(2, attr.dataType)),
- nullCount = BigInt(row.getLong(3)),
- avgLen = row.getLong(4),
- maxLen = row.getLong(5)
- )
- if (row.isNullAt(6)) {
- cs
- } else {
- val ndvs = row.getArray(6).toLongArray()
- assert(percentiles.get.numElements() == ndvs.length + 1)
- val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
- // Construct equi-height histogram
- val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
- HistogramBin(endpoints(i), endpoints(i + 1), ndv)
- }
- val nonNullRows = rowCount - cs.nullCount
- val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
- cs.copy(histogram = Some(histogram))
- }
- }
+ // Are avgLen and maxLen statistics defined?
+ val hasLenStats = avgLen.isDefined && maxLen.isDefined
+ def toCatalogColumnStat(colName: String, dataType: DataType): CatalogColumnStat =
+ CatalogColumnStat(
+ distinctCount = distinctCount,
+ min = min.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
+ max = max.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
+ nullCount = nullCount,
+ avgLen = avgLen,
+ maxLen = maxLen,
+ histogram = histogram)
}
/**
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
index c41fac4..111c594 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
@@ -32,13 +32,15 @@ object AggregateEstimation {
val childStats = agg.child.stats
// Check if we have column stats for all group-by columns.
val colStatsExist = agg.groupingExpressions.forall { e =>
- e.isInstanceOf[Attribute] && childStats.attributeStats.contains(e.asInstanceOf[Attribute])
+ e.isInstanceOf[Attribute] &&
+ childStats.attributeStats.get(e.asInstanceOf[Attribute]).exists(_.hasCountStats)
}
if (rowCountsExist(agg.child) && colStatsExist) {
// Multiply distinct counts of group-by columns. This is an upper bound, which assumes
// the data contains all combinations of distinct values of group-by columns.
var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
- (res, expr) => res * childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount)
+ (res, expr) => res *
+ childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get)
outputRows = if (agg.groupingExpressions.isEmpty) {
// If there's no group-by columns, the output is a single row containing values of aggregate
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index d793f77..0f147f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -17,6 +17,7 @@
package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
+import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
import scala.math.BigDecimal.RoundingMode
@@ -38,9 +39,18 @@ object EstimationUtils {
}
}
+ /** Check if each attribute has column stat containing distinct and null counts
+ * in the corresponding statistic. */
+ def columnStatsWithCountsExist(statsAndAttr: (Statistics, Attribute)*): Boolean = {
+ statsAndAttr.forall { case (stats, attr) =>
+ stats.attributeStats.get(attr).map(_.hasCountStats).getOrElse(false)
+ }
+ }
+
+ /** Statistics for a Column containing only NULLs. */
def nullColumnStat(dataType: DataType, rowCount: BigInt): ColumnStat = {
- ColumnStat(distinctCount = 0, min = None, max = None, nullCount = rowCount,
- avgLen = dataType.defaultSize, maxLen = dataType.defaultSize)
+ ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(rowCount),
+ avgLen = Some(dataType.defaultSize), maxLen = Some(dataType.defaultSize))
}
/**
@@ -70,13 +80,13 @@ object EstimationUtils {
// We assign a generic overhead for a Row object, the actual overhead is different for different
// Row format.
val sizePerRow = 8 + attributes.map { attr =>
- if (attrStats.contains(attr)) {
+ if (attrStats.get(attr).map(_.avgLen.isDefined).getOrElse(false)) {
attr.dataType match {
case StringType =>
// UTF8String: base + offset + numBytes
- attrStats(attr).avgLen + 8 + 4
+ attrStats(attr).avgLen.get + 8 + 4
case _ =>
- attrStats(attr).avgLen
+ attrStats(attr).avgLen.get
}
} else {
attr.dataType.defaultSize
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 4cc32de..0538c9d 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -225,7 +225,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
attr: Attribute,
isNull: Boolean,
update: Boolean): Option[Double] = {
- if (!colStatsMap.contains(attr)) {
+ if (!colStatsMap.contains(attr) || !colStatsMap(attr).hasCountStats) {
logDebug("[CBO] No statistics for " + attr)
return None
}
@@ -234,14 +234,14 @@ case class FilterEstimation(plan: Filter) extends Logging {
val nullPercent: Double = if (rowCountValue == 0) {
0
} else {
- (BigDecimal(colStat.nullCount) / BigDecimal(rowCountValue)).toDouble
+ (BigDecimal(colStat.nullCount.get) / BigDecimal(rowCountValue)).toDouble
}
if (update) {
val newStats = if (isNull) {
- colStat.copy(distinctCount = 0, min = None, max = None)
+ colStat.copy(distinctCount = Some(0), min = None, max = None)
} else {
- colStat.copy(nullCount = 0)
+ colStat.copy(nullCount = Some(0))
}
colStatsMap.update(attr, newStats)
}
@@ -322,17 +322,21 @@ case class FilterEstimation(plan: Filter) extends Logging {
// value.
val newStats = attr.dataType match {
case StringType | BinaryType =>
- colStat.copy(distinctCount = 1, nullCount = 0)
+ colStat.copy(distinctCount = Some(1), nullCount = Some(0))
case _ =>
- colStat.copy(distinctCount = 1, min = Some(literal.value),
- max = Some(literal.value), nullCount = 0)
+ colStat.copy(distinctCount = Some(1), min = Some(literal.value),
+ max = Some(literal.value), nullCount = Some(0))
}
colStatsMap.update(attr, newStats)
}
if (colStat.histogram.isEmpty) {
- // returns 1/ndv if there is no histogram
- Some(1.0 / colStat.distinctCount.toDouble)
+ if (!colStat.distinctCount.isEmpty) {
+ // returns 1/ndv if there is no histogram
+ Some(1.0 / colStat.distinctCount.get.toDouble)
+ } else {
+ None
+ }
} else {
Some(computeEqualityPossibilityByHistogram(literal, colStat))
}
@@ -378,13 +382,13 @@ case class FilterEstimation(plan: Filter) extends Logging {
attr: Attribute,
hSet: Set[Any],
update: Boolean): Option[Double] = {
- if (!colStatsMap.contains(attr)) {
+ if (!colStatsMap.hasDistinctCount(attr)) {
logDebug("[CBO] No statistics for " + attr)
return None
}
val colStat = colStatsMap(attr)
- val ndv = colStat.distinctCount
+ val ndv = colStat.distinctCount.get
val dataType = attr.dataType
var newNdv = ndv
@@ -407,8 +411,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
// 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
newNdv = ndv.min(BigInt(validQuerySet.size))
if (update) {
- val newStats = colStat.copy(distinctCount = newNdv, min = Some(newMin),
- max = Some(newMax), nullCount = 0)
+ val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin),
+ max = Some(newMax), nullCount = Some(0))
colStatsMap.update(attr, newStats)
}
@@ -416,7 +420,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
case StringType | BinaryType =>
newNdv = ndv.min(BigInt(hSet.size))
if (update) {
- val newStats = colStat.copy(distinctCount = newNdv, nullCount = 0)
+ val newStats = colStat.copy(distinctCount = Some(newNdv), nullCount = Some(0))
colStatsMap.update(attr, newStats)
}
}
@@ -443,12 +447,17 @@ case class FilterEstimation(plan: Filter) extends Logging {
literal: Literal,
update: Boolean): Option[Double] = {
+ if (!colStatsMap.hasMinMaxStats(attr) || !colStatsMap.hasDistinctCount(attr)) {
+ logDebug("[CBO] No statistics for " + attr)
+ return None
+ }
+
val colStat = colStatsMap(attr)
val statsInterval =
ValueInterval(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericValueInterval]
val max = statsInterval.max
val min = statsInterval.min
- val ndv = colStat.distinctCount.toDouble
+ val ndv = colStat.distinctCount.get.toDouble
// determine the overlapping degree between predicate interval and column's interval
val numericLiteral = EstimationUtils.toDouble(literal.value, literal.dataType)
@@ -520,8 +529,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
newMax = newValue
}
- val newStats = colStat.copy(distinctCount = ceil(ndv * percent),
- min = newMin, max = newMax, nullCount = 0)
+ val newStats = colStat.copy(distinctCount = Some(ceil(ndv * percent)),
+ min = newMin, max = newMax, nullCount = Some(0))
colStatsMap.update(attr, newStats)
}
@@ -637,11 +646,11 @@ case class FilterEstimation(plan: Filter) extends Logging {
attrRight: Attribute,
update: Boolean): Option[Double] = {
- if (!colStatsMap.contains(attrLeft)) {
+ if (!colStatsMap.hasCountStats(attrLeft)) {
logDebug("[CBO] No statistics for " + attrLeft)
return None
}
- if (!colStatsMap.contains(attrRight)) {
+ if (!colStatsMap.hasCountStats(attrRight)) {
logDebug("[CBO] No statistics for " + attrRight)
return None
}
@@ -668,7 +677,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
val minRight = statsIntervalRight.min
// determine the overlapping degree between predicate interval and column's interval
- val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
+ val allNotNull = (colStatLeft.nullCount.get == 0) && (colStatRight.nullCount.get == 0)
val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
// Left < Right or Left <= Right
// - no overlap:
@@ -707,14 +716,14 @@ case class FilterEstimation(plan: Filter) extends Logging {
case _: EqualTo =>
((maxLeft < minRight) || (maxRight < minLeft),
(minLeft == minRight) && (maxLeft == maxRight) && allNotNull
- && (colStatLeft.distinctCount == colStatRight.distinctCount)
+ && (colStatLeft.distinctCount.get == colStatRight.distinctCount.get)
)
case _: EqualNullSafe =>
// For null-safe equality, we use a very restrictive condition to evaluate its overlap.
// If null values exists, we set it to partial overlap.
(((maxLeft < minRight) || (maxRight < minLeft)) && allNotNull,
(minLeft == minRight) && (maxLeft == maxRight) && allNotNull
- && (colStatLeft.distinctCount == colStatRight.distinctCount)
+ && (colStatLeft.distinctCount.get == colStatRight.distinctCount.get)
)
}
@@ -731,9 +740,9 @@ case class FilterEstimation(plan: Filter) extends Logging {
if (update) {
// Need to adjust new min/max after the filter condition is applied
- val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+ val ndvLeft = BigDecimal(colStatLeft.distinctCount.get)
val newNdvLeft = ceil(ndvLeft * percent)
- val ndvRight = BigDecimal(colStatRight.distinctCount)
+ val ndvRight = BigDecimal(colStatRight.distinctCount.get)
val newNdvRight = ceil(ndvRight * percent)
var newMaxLeft = colStatLeft.max
@@ -817,10 +826,10 @@ case class FilterEstimation(plan: Filter) extends Logging {
}
}
- val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft,
+ val newStatsLeft = colStatLeft.copy(distinctCount = Some(newNdvLeft), min = newMinLeft,
max = newMaxLeft)
colStatsMap(attrLeft) = newStatsLeft
- val newStatsRight = colStatRight.copy(distinctCount = newNdvRight, min = newMinRight,
+ val newStatsRight = colStatRight.copy(distinctCount = Some(newNdvRight), min = newMinRight,
max = newMaxRight)
colStatsMap(attrRight) = newStatsRight
}
@@ -849,17 +858,35 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
def contains(a: Attribute): Boolean = updatedMap.contains(a.exprId) || originalMap.contains(a)
/**
- * Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
- * originalMap, because updatedMap has the latest (updated) column stats.
+ * Gets an Option of column stat for the given attribute.
+ * Prefer the column stat in updatedMap than that in originalMap,
+ * because updatedMap has the latest (updated) column stats.
*/
- def apply(a: Attribute): ColumnStat = {
+ def get(a: Attribute): Option[ColumnStat] = {
if (updatedMap.contains(a.exprId)) {
- updatedMap(a.exprId)._2
+ updatedMap.get(a.exprId).map(_._2)
} else {
- originalMap(a)
+ originalMap.get(a)
}
}
+ def hasCountStats(a: Attribute): Boolean =
+ get(a).map(_.hasCountStats).getOrElse(false)
+
+ def hasDistinctCount(a: Attribute): Boolean =
+ get(a).map(_.distinctCount.isDefined).getOrElse(false)
+
+ def hasMinMaxStats(a: Attribute): Boolean =
+ get(a).map(_.hasCountStats).getOrElse(false)
+
+ /**
+ * Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
+ * originalMap, because updatedMap has the latest (updated) column stats.
+ */
+ def apply(a: Attribute): ColumnStat = {
+ get(a).get
+ }
+
/** Updates column stats in updatedMap. */
def update(a: Attribute, stats: ColumnStat): Unit = updatedMap.update(a.exprId, a -> stats)
@@ -871,11 +898,14 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
: AttributeMap[ColumnStat] = {
val newColumnStats = originalMap.map { case (attr, oriColStat) =>
val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
- val newNdv = if (colStat.distinctCount > 1) {
+ val newNdv = if (colStat.distinctCount.isEmpty) {
+ // No NDV in the original stats.
+ None
+ } else if (colStat.distinctCount.get > 1) {
// Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
// decreases; otherwise keep it unchanged.
- EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
- newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
+ Some(EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
+ newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount.get))
} else {
// no need to scale down since it is already down to 1 (for skewed distribution case)
colStat.distinctCount
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
index f0294a4..2543e38 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
@@ -85,7 +85,8 @@ case class JoinEstimation(join: Join) extends Logging {
// 3. Update statistics based on the output of join
val inputAttrStats = AttributeMap(
leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq)
- val attributesWithStat = join.output.filter(a => inputAttrStats.contains(a))
+ val attributesWithStat = join.output.filter(a =>
+ inputAttrStats.get(a).map(_.hasCountStats).getOrElse(false))
val (fromLeft, fromRight) = attributesWithStat.partition(join.left.outputSet.contains(_))
val outputStats: Seq[(Attribute, ColumnStat)] = if (outputRows == 0) {
@@ -106,10 +107,10 @@ case class JoinEstimation(join: Join) extends Logging {
case FullOuter =>
fromLeft.map { a =>
val oriColStat = inputAttrStats(a)
- (a, oriColStat.copy(nullCount = oriColStat.nullCount + rightRows))
+ (a, oriColStat.copy(nullCount = Some(oriColStat.nullCount.get + rightRows)))
} ++ fromRight.map { a =>
val oriColStat = inputAttrStats(a)
- (a, oriColStat.copy(nullCount = oriColStat.nullCount + leftRows))
+ (a, oriColStat.copy(nullCount = Some(oriColStat.nullCount.get + leftRows)))
}
case _ =>
assert(joinType == Inner || joinType == Cross)
@@ -219,19 +220,27 @@ case class JoinEstimation(join: Join) extends Logging {
private def computeByNdv(
leftKey: AttributeReference,
rightKey: AttributeReference,
- newMin: Option[Any],
- newMax: Option[Any]): (BigInt, ColumnStat) = {
+ min: Option[Any],
+ max: Option[Any]): (BigInt, ColumnStat) = {
val leftKeyStat = leftStats.attributeStats(leftKey)
val rightKeyStat = rightStats.attributeStats(rightKey)
- val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
+ val maxNdv = leftKeyStat.distinctCount.get.max(rightKeyStat.distinctCount.get)
// Compute cardinality by the basic formula.
val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
// Get the intersected column stat.
- val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
- val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
- val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
- val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)
+ val newNdv = Some(leftKeyStat.distinctCount.get.min(rightKeyStat.distinctCount.get))
+ val newMaxLen = if (leftKeyStat.maxLen.isDefined && rightKeyStat.maxLen.isDefined) {
+ Some(math.min(leftKeyStat.maxLen.get, rightKeyStat.maxLen.get))
+ } else {
+ None
+ }
+ val newAvgLen = if (leftKeyStat.avgLen.isDefined && rightKeyStat.avgLen.isDefined) {
+ Some((leftKeyStat.avgLen.get + rightKeyStat.avgLen.get) / 2)
+ } else {
+ None
+ }
+ val newStats = ColumnStat(newNdv, min, max, Some(0), newAvgLen, newMaxLen)
(ceil(card), newStats)
}
@@ -267,9 +276,17 @@ case class JoinEstimation(join: Join) extends Logging {
val leftKeyStat = leftStats.attributeStats(leftKey)
val rightKeyStat = rightStats.attributeStats(rightKey)
- val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
- val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
- val newStats = ColumnStat(ceil(totalNdv), newMin, newMax, 0, newAvgLen, newMaxLen)
+ val newMaxLen = if (leftKeyStat.maxLen.isDefined && rightKeyStat.maxLen.isDefined) {
+ Some(math.min(leftKeyStat.maxLen.get, rightKeyStat.maxLen.get))
+ } else {
+ None
+ }
+ val newAvgLen = if (leftKeyStat.avgLen.isDefined && rightKeyStat.avgLen.isDefined) {
+ Some((leftKeyStat.avgLen.get + rightKeyStat.avgLen.get) / 2)
+ } else {
+ None
+ }
+ val newStats = ColumnStat(Some(ceil(totalNdv)), newMin, newMax, Some(0), newAvgLen, newMaxLen)
(ceil(card), newStats)
}
@@ -292,10 +309,14 @@ case class JoinEstimation(join: Join) extends Logging {
} else {
val oldColStat = oldAttrStats(a)
val oldNdv = oldColStat.distinctCount
- val newNdv = if (join.left.outputSet.contains(a)) {
- updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv)
+ val newNdv = if (oldNdv.isDefined) {
+ Some(if (join.left.outputSet.contains(a)) {
+ updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv.get)
+ } else {
+ updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv.get)
+ })
} else {
- updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv)
+ None
}
val newColStat = oldColStat.copy(distinctCount = newNdv)
// TODO: support nullCount updates for specific outer joins
@@ -313,7 +334,7 @@ case class JoinEstimation(join: Join) extends Logging {
// Note: join keys from EqualNullSafe also fall into this case (Coalesce), consider to
// support it in the future by using `nullCount` in column stats.
case (lk: AttributeReference, rk: AttributeReference)
- if columnStatsExist((leftStats, lk), (rightStats, rk)) => (lk, rk)
+ if columnStatsWithCountsExist((leftStats, lk), (rightStats, rk)) => (lk, rk)
}
}
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 2fb587d..565b0a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -62,24 +62,15 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
}
}
- /** Set up tables and columns for testing */
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
- attr("t1.k-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t1.v-1-10") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t2.k-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t3.v-1-100") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t4.k-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t4.v-1-10") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t5.k-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("t5.v-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ attr("t1.k-1-2") -> rangeColumnStat(2, 0),
+ attr("t1.v-1-10") -> rangeColumnStat(10, 0),
+ attr("t2.k-1-5") -> rangeColumnStat(5, 0),
+ attr("t3.v-1-100") -> rangeColumnStat(100, 0),
+ attr("t4.k-1-2") -> rangeColumnStat(2, 0),
+ attr("t4.v-1-10") -> rangeColumnStat(10, 0),
+ attr("t5.k-1-5") -> rangeColumnStat(5, 0),
+ attr("t5.v-1-5") -> rangeColumnStat(5, 0)
))
private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index ada6e2a..d4d23ad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -68,88 +68,56 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
// F1 (fact table)
- attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("f1_fk1") -> rangeColumnStat(100, 0),
+ attr("f1_fk2") -> rangeColumnStat(100, 0),
+ attr("f1_fk3") -> rangeColumnStat(100, 0),
+ attr("f1_c1") -> rangeColumnStat(100, 0),
+ attr("f1_c2") -> rangeColumnStat(100, 0),
// D1 (dimension)
- attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d1_pk") -> rangeColumnStat(100, 0),
+ attr("d1_c2") -> rangeColumnStat(50, 0),
+ attr("d1_c3") -> rangeColumnStat(50, 0),
// D2 (dimension)
- attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d2_pk") -> rangeColumnStat(20, 0),
+ attr("d2_c2") -> rangeColumnStat(10, 0),
+ attr("d2_c3") -> rangeColumnStat(10, 0),
// D3 (dimension)
- attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d3_pk") -> rangeColumnStat(10, 0),
+ attr("d3_c2") -> rangeColumnStat(5, 0),
+ attr("d3_c3") -> rangeColumnStat(5, 0),
// T1 (regular table i.e. outside star)
- attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t1_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t1_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t1_c1") -> rangeColumnStat(20, 1),
+ attr("t1_c2") -> rangeColumnStat(10, 1),
+ attr("t1_c3") -> rangeColumnStat(10, 1),
// T2 (regular table)
- attr("t2_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t2_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t2_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t2_c1") -> rangeColumnStat(5, 1),
+ attr("t2_c2") -> rangeColumnStat(5, 1),
+ attr("t2_c3") -> rangeColumnStat(5, 1),
// T3 (regular table)
- attr("t3_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t3_c1") -> rangeColumnStat(5, 1),
+ attr("t3_c2") -> rangeColumnStat(5, 1),
+ attr("t3_c3") -> rangeColumnStat(5, 1),
// T4 (regular table)
- attr("t4_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t4_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t4_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t4_c1") -> rangeColumnStat(5, 1),
+ attr("t4_c2") -> rangeColumnStat(5, 1),
+ attr("t4_c3") -> rangeColumnStat(5, 1),
// T5 (regular table)
- attr("t5_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t5_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t5_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
+ attr("t5_c1") -> rangeColumnStat(5, 1),
+ attr("t5_c2") -> rangeColumnStat(5, 1),
+ attr("t5_c3") -> rangeColumnStat(5, 1),
// T6 (regular table)
- attr("t6_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t6_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("t6_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 1, avgLen = 4, maxLen = 4)
+ attr("t6_c1") -> rangeColumnStat(5, 1),
+ attr("t6_c2") -> rangeColumnStat(5, 1),
+ attr("t6_c3") -> rangeColumnStat(5, 1)
))
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 777c563..4e0883e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -70,59 +70,40 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
// Tables' cardinality: f1 > d3 > d1 > d2 > s3
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
// F1
- attr("f1_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_fk2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_fk3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f1_c4") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("f1_fk1") -> rangeColumnStat(3, 0),
+ attr("f1_fk2") -> rangeColumnStat(3, 0),
+ attr("f1_fk3") -> rangeColumnStat(4, 0),
+ attr("f1_c4") -> rangeColumnStat(4, 0),
// D1
- attr("d1_pk1") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d1_c4") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d1_pk1") -> rangeColumnStat(4, 0),
+ attr("d1_c2") -> rangeColumnStat(3, 0),
+ attr("d1_c3") -> rangeColumnStat(4, 0),
+ attr("d1_c4") -> ColumnStat(distinctCount = Some(2), min = Some("2"), max = Some("3"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
// D2
- attr("d2_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 1, avgLen = 4, maxLen = 4),
- attr("d2_pk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d2_c3") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d2_c4") -> ColumnStat(distinctCount = 2, min = Some(3), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d2_c2") -> ColumnStat(distinctCount = Some(3), min = Some("1"), max = Some("3"),
+ nullCount = Some(1), avgLen = Some(4), maxLen = Some(4)),
+ attr("d2_pk1") -> rangeColumnStat(3, 0),
+ attr("d2_c3") -> rangeColumnStat(3, 0),
+ attr("d2_c4") -> ColumnStat(distinctCount = Some(2), min = Some("3"), max = Some("4"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
// D3
- attr("d3_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_pk1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("d3_c4") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("d3_fk1") -> rangeColumnStat(3, 0),
+ attr("d3_c2") -> rangeColumnStat(3, 0),
+ attr("d3_pk1") -> rangeColumnStat(5, 0),
+ attr("d3_c4") -> ColumnStat(distinctCount = Some(2), min = Some("2"), max = Some("3"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
// S3
- attr("s3_pk1") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("s3_c2") -> ColumnStat(distinctCount = 1, min = Some(3), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("s3_c3") -> ColumnStat(distinctCount = 1, min = Some(3), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("s3_c4") -> ColumnStat(distinctCount = 2, min = Some(3), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
+ attr("s3_pk1") -> rangeColumnStat(2, 0),
+ attr("s3_c2") -> rangeColumnStat(1, 0),
+ attr("s3_c3") -> rangeColumnStat(1, 0),
+ attr("s3_c4") -> ColumnStat(distinctCount = Some(2), min = Some("3"), max = Some("4"),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
// F11
- attr("f11_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f11_fk2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f11_fk3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4),
- attr("f11_c4") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ attr("f11_fk1") -> rangeColumnStat(3, 0),
+ attr("f11_fk2") -> rangeColumnStat(3, 0),
+ attr("f11_fk3") -> rangeColumnStat(4, 0),
+ attr("f11_c4") -> rangeColumnStat(4, 0)
))
private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
index 23f95a6..8213d56 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
@@ -29,16 +29,16 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
/** Columns for testing */
private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
- attr("key11") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key12") -> ColumnStat(distinctCount = 4, min = Some(10), max = Some(40), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key21") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key22") -> ColumnStat(distinctCount = 2, min = Some(10), max = Some(20), nullCount = 0,
- avgLen = 4, maxLen = 4),
- attr("key31") -> ColumnStat(distinctCount = 0, min = None, max = None, nullCount = 0,
- avgLen = 4, maxLen = 4)
+ attr("key11") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key12") -> ColumnStat(distinctCount = Some(4), min = Some(10), max = Some(40),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key21") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+ attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None,
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
))
private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
@@ -63,8 +63,8 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
tableRowCount = 6,
groupByColumns = Seq("key21", "key22"),
// Row count = product of ndv
- expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount * nameToColInfo("key22")._2
- .distinctCount)
+ expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount.get *
+ nameToColInfo("key22")._2.distinctCount.get)
}
test("empty group-by column") {
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 7d532ff..953094c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.IntegerType
class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
val attribute = attr("key")
- val colStat = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
- nullCount = 0, avgLen = 4, maxLen = 4)
+ val colStat = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
val plan = StatsTestPlan(
outputList = Seq(attribute),
@@ -116,13 +116,17 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
sizeInBytes = 40,
rowCount = Some(10),
attributeStats = AttributeMap(Seq(
- AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))))
+ AttributeReference("c1", IntegerType)() -> ColumnStat(distinctCount = Some(10),
+ min = Some(1), max = Some(10),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))))
val expectedCboStats =
Statistics(
sizeInBytes = 4,
rowCount = Some(1),
attributeStats = AttributeMap(Seq(
- AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))))
+ AttributeReference("c1", IntegerType)() -> ColumnStat(distinctCount = Some(10),
+ min = Some(5), max = Some(5),
+ nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))))
val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
checkStats(
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org