You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/27 07:37:38 UTC

[3/3] spark git commit: [SPARK-23445] ColumnStat refactoring

[SPARK-23445] ColumnStat refactoring

## What changes were proposed in this pull request?

Refactor ColumnStat to be more flexible.

* Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is split from `Statistics`. This detaches how the statistics are stored from how they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` as `String`, making it not depend on dataType information.
* For `CatalogColumnStat`, parse column names from property names in the metastore (`KEY_VERSION` property), not from metastore schema. This means that `CatalogColumnStat`s can be created for columns even if the schema itself is not stored in the metastore.
* Make all fields optional. `min`, `max` and `histogram` for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate.

The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans.

## How was this patch tested?

Refactored existing tests to work with refactored `ColumnStat` and `CatalogColumnStat`.
New tests added in `StatisticsSuite` checking that backwards / forwards compatibility is not broken.

Author: Juliusz Sompolski <ju...@databricks.com>

Closes #20624 from juliuszsompolski/SPARK-23445.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8077bb04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8077bb04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8077bb04

Branch: refs/heads/master
Commit: 8077bb04f350fd35df83ef896135c0672dc3f7b0
Parents: 7ec8365
Author: Juliusz Sompolski <ju...@databricks.com>
Authored: Mon Feb 26 23:37:31 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Feb 26 23:37:31 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  | 146 +++++++++-
 .../optimizer/StarSchemaDetection.scala         |   6 +-
 .../sql/catalyst/plans/logical/Statistics.scala | 256 ++---------------
 .../statsEstimation/AggregateEstimation.scala   |   6 +-
 .../statsEstimation/EstimationUtils.scala       |  20 +-
 .../statsEstimation/FilterEstimation.scala      |  98 ++++---
 .../statsEstimation/JoinEstimation.scala        |  55 ++--
 .../catalyst/optimizer/JoinReorderSuite.scala   |  25 +-
 .../StarJoinCostBasedReorderSuite.scala         |  96 +++----
 .../optimizer/StarJoinReorderSuite.scala        |  77 ++---
 .../AggregateEstimationSuite.scala              |  24 +-
 .../BasicStatsEstimationSuite.scala             |  12 +-
 .../statsEstimation/FilterEstimationSuite.scala | 279 ++++++++++---------
 .../statsEstimation/JoinEstimationSuite.scala   | 138 +++++----
 .../ProjectEstimationSuite.scala                |  70 +++--
 .../StatsEstimationTestBase.scala               |  10 +-
 .../command/AnalyzeColumnCommand.scala          | 138 ++++++++-
 .../spark/sql/execution/command/tables.scala    |   9 +-
 .../spark/sql/StatisticsCollectionSuite.scala   |   9 +-
 .../sql/StatisticsCollectionTestBase.scala      | 168 +++++++++--
 .../spark/sql/hive/HiveExternalCatalog.scala    |  63 ++---
 .../apache/spark/sql/hive/StatisticsSuite.scala | 162 +++--------
 22 files changed, 995 insertions(+), 872 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 95b6fbb..f3e67dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,7 +21,9 @@ import java.net.URI
 import java.util.Date
 
 import scala.collection.mutable
+import scala.util.control.NonFatal
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
@@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 
 
 /**
@@ -361,7 +363,7 @@ object CatalogTable {
 case class CatalogStatistics(
     sizeInBytes: BigInt,
     rowCount: Option[BigInt] = None,
-    colStats: Map[String, ColumnStat] = Map.empty) {
+    colStats: Map[String, CatalogColumnStat] = Map.empty) {
 
   /**
    * Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
@@ -369,7 +371,8 @@ case class CatalogStatistics(
    */
   def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
     if (cboEnabled && rowCount.isDefined) {
-      val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
+      val attrStats = AttributeMap(planOutput
+        .flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
       // Estimate size as number of rows * row size.
       val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
       Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
@@ -387,6 +390,143 @@ case class CatalogStatistics(
   }
 }
 
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore.
+ */
+case class CatalogColumnStat(
+    distinctCount: Option[BigInt] = None,
+    min: Option[String] = None,
+    max: Option[String] = None,
+    nullCount: Option[BigInt] = None,
+    avgLen: Option[Long] = None,
+    maxLen: Option[Long] = None,
+    histogram: Option[Histogram] = None) {
+
+  /**
+   * Returns a map from string to string that can be used to serialize the column stats.
+   * The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
+   * and the value is the string representation for the value.
+   * min/max values are stored as Strings. They can be deserialized using
+   * [[CatalogColumnStat.fromExternalString]].
+   *
+   * As part of the protocol, the returned map always contains a key called "version".
+   * Any of the fields that are null (None) won't appear in the map.
+   */
+  def toMap(colName: String): Map[String, String] = {
+    val map = new scala.collection.mutable.HashMap[String, String]
+    map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+    distinctCount.foreach { v =>
+      map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
+    }
+    nullCount.foreach { v =>
+      map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString)
+    }
+    avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
+    maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
+    min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
+    max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
+    histogram.foreach { h =>
+      map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
+    }
+    map.toMap
+  }
+
+  /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
+  def toPlanStat(
+      colName: String,
+      dataType: DataType): ColumnStat =
+    ColumnStat(
+      distinctCount = distinctCount,
+      min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+      max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+      nullCount = nullCount,
+      avgLen = avgLen,
+      maxLen = maxLen,
+      histogram = histogram)
+}
+
+object CatalogColumnStat extends Logging {
+
+  // List of string keys used to serialize CatalogColumnStat
+  val KEY_VERSION = "version"
+  private val KEY_DISTINCT_COUNT = "distinctCount"
+  private val KEY_MIN_VALUE = "min"
+  private val KEY_MAX_VALUE = "max"
+  private val KEY_NULL_COUNT = "nullCount"
+  private val KEY_AVG_LEN = "avgLen"
+  private val KEY_MAX_LEN = "maxLen"
+  private val KEY_HISTOGRAM = "histogram"
+
+  /**
+   * Converts from string representation of data type to the corresponding Catalyst data type.
+   */
+  def fromExternalString(s: String, name: String, dataType: DataType): Any = {
+    dataType match {
+      case BooleanType => s.toBoolean
+      case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
+      case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+      case ByteType => s.toByte
+      case ShortType => s.toShort
+      case IntegerType => s.toInt
+      case LongType => s.toLong
+      case FloatType => s.toFloat
+      case DoubleType => s.toDouble
+      case _: DecimalType => Decimal(s)
+      // This version of Spark does not use min/max for binary/string types so we ignore it.
+      case BinaryType | StringType => null
+      case _ =>
+        throw new AnalysisException("Column statistics deserialization is not supported for " +
+          s"column $name of data type: $dataType.")
+    }
+  }
+
+  /**
+   * Converts the given value from Catalyst data type to string representation of external
+   * data type.
+   */
+  def toExternalString(v: Any, colName: String, dataType: DataType): String = {
+    val externalValue = dataType match {
+      case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
+      case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
+      case BooleanType | _: IntegralType | FloatType | DoubleType => v
+      case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
+      // This version of Spark does not use min/max for binary/string types so we ignore it.
+      case _ =>
+        throw new AnalysisException("Column statistics serialization is not supported for " +
+          s"column $colName of data type: $dataType.")
+    }
+    externalValue.toString
+  }
+
+
+  /**
+   * Creates a [[CatalogColumnStat]] object from the given map.
+   * This is used to deserialize column stats from some external storage.
+   * The serialization side is defined in [[CatalogColumnStat.toMap]].
+   */
+  def fromMap(
+    table: String,
+    colName: String,
+    map: Map[String, String]): Option[CatalogColumnStat] = {
+
+    try {
+      Some(CatalogColumnStat(
+        distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)),
+        min = map.get(s"${colName}.${KEY_MIN_VALUE}"),
+        max = map.get(s"${colName}.${KEY_MAX_VALUE}"),
+        nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
+        avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
+        maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
+        histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
+      ))
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e)
+        None
+    }
+  }
+}
+
 
 case class CatalogTableType private(name: String)
 object CatalogTableType {

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
index 1f20b76..2aa762e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
@@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper {
           stats.rowCount match {
             case Some(rowCount) if rowCount >= 0 =>
               if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
-                val colStats = stats.attributeStats.get(col)
-                if (colStats.get.nullCount > 0) {
+                val colStats = stats.attributeStats.get(col).get
+                if (!colStats.hasCountStats || colStats.nullCount.get > 0) {
                   false
                 } else {
-                  val distinctCount = colStats.get.distinctCount
+                  val distinctCount = colStats.distinctCount.get
                   val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
                   // ndvMaxErr adjusted based on TPCDS 1TB data results
                   relDiff <= conf.ndvMaxError * 2

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 96b199d..b3a4886 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -27,6 +27,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils}
@@ -79,11 +80,10 @@ case class Statistics(
 /**
  * Statistics collected for a column.
  *
- * 1. Supported data types are defined in `ColumnStat.supportsType`.
- * 2. The JVM data type stored in min/max is the internal data type for the corresponding
+ * 1. The JVM data type stored in min/max is the internal data type for the corresponding
  *    Catalyst data type. For example, the internal type of DateType is Int, and that the internal
  *    type of TimestampType is Long.
- * 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
+ * 2. There is no guarantee that the statistics collected are accurate. Approximation algorithms
  *    (sketches) might have been used, and the data collected can also be stale.
  *
  * @param distinctCount number of distinct values
@@ -95,240 +95,32 @@ case class Statistics(
  * @param histogram histogram of the values
  */
 case class ColumnStat(
-    distinctCount: BigInt,
-    min: Option[Any],
-    max: Option[Any],
-    nullCount: BigInt,
-    avgLen: Long,
-    maxLen: Long,
+    distinctCount: Option[BigInt] = None,
+    min: Option[Any] = None,
+    max: Option[Any] = None,
+    nullCount: Option[BigInt] = None,
+    avgLen: Option[Long] = None,
+    maxLen: Option[Long] = None,
     histogram: Option[Histogram] = None) {
 
-  // We currently don't store min/max for binary/string type. This can change in the future and
-  // then we need to remove this require.
-  require(min.isEmpty || (!min.get.isInstanceOf[Array[Byte]] && !min.get.isInstanceOf[String]))
-  require(max.isEmpty || (!max.get.isInstanceOf[Array[Byte]] && !max.get.isInstanceOf[String]))
-
-  /**
-   * Returns a map from string to string that can be used to serialize the column stats.
-   * The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
-   * representation for the value. min/max values are converted to the external data type. For
-   * example, for DateType we store java.sql.Date, and for TimestampType we store
-   * java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
-   *
-   * As part of the protocol, the returned map always contains a key called "version".
-   * In the case min/max values are null (None), they won't appear in the map.
-   */
-  def toMap(colName: String, dataType: DataType): Map[String, String] = {
-    val map = new scala.collection.mutable.HashMap[String, String]
-    map.put(ColumnStat.KEY_VERSION, "1")
-    map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
-    map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
-    map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
-    map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
-    min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
-    max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
-    histogram.foreach { h => map.put(ColumnStat.KEY_HISTOGRAM, HistogramSerializer.serialize(h)) }
-    map.toMap
-  }
-
-  /**
-   * Converts the given value from Catalyst data type to string representation of external
-   * data type.
-   */
-  private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
-    val externalValue = dataType match {
-      case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
-      case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
-      case BooleanType | _: IntegralType | FloatType | DoubleType => v
-      case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
-      // This version of Spark does not use min/max for binary/string types so we ignore it.
-      case _ =>
-        throw new AnalysisException("Column statistics deserialization is not supported for " +
-          s"column $colName of data type: $dataType.")
-    }
-    externalValue.toString
-  }
-
-}
+  // Are distinctCount and nullCount statistics defined?
+  val hasCountStats = distinctCount.isDefined && nullCount.isDefined
 
+  // Are min and max statistics defined?
+  val hasMinMaxStats = min.isDefined && max.isDefined
 
-object ColumnStat extends Logging {
-
-  // List of string keys used to serialize ColumnStat
-  val KEY_VERSION = "version"
-  private val KEY_DISTINCT_COUNT = "distinctCount"
-  private val KEY_MIN_VALUE = "min"
-  private val KEY_MAX_VALUE = "max"
-  private val KEY_NULL_COUNT = "nullCount"
-  private val KEY_AVG_LEN = "avgLen"
-  private val KEY_MAX_LEN = "maxLen"
-  private val KEY_HISTOGRAM = "histogram"
-
-  /** Returns true iff the we support gathering column statistics on column of the given type. */
-  def supportsType(dataType: DataType): Boolean = dataType match {
-    case _: IntegralType => true
-    case _: DecimalType => true
-    case DoubleType | FloatType => true
-    case BooleanType => true
-    case DateType => true
-    case TimestampType => true
-    case BinaryType | StringType => true
-    case _ => false
-  }
-
-  /** Returns true iff the we support gathering histogram on column of the given type. */
-  def supportsHistogram(dataType: DataType): Boolean = dataType match {
-    case _: IntegralType => true
-    case _: DecimalType => true
-    case DoubleType | FloatType => true
-    case DateType => true
-    case TimestampType => true
-    case _ => false
-  }
-
-  /**
-   * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
-   * from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
-   */
-  def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
-    try {
-      Some(ColumnStat(
-        distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
-        // Note that flatMap(Option.apply) turns Option(null) into None.
-        min = map.get(KEY_MIN_VALUE)
-          .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
-        max = map.get(KEY_MAX_VALUE)
-          .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
-        nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
-        avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
-        maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong,
-        histogram = map.get(KEY_HISTOGRAM).map(HistogramSerializer.deserialize)
-      ))
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to parse column statistics for column ${field.name} in table $table", e)
-        None
-    }
-  }
-
-  /**
-   * Converts from string representation of external data type to the corresponding Catalyst data
-   * type.
-   */
-  private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
-    dataType match {
-      case BooleanType => s.toBoolean
-      case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
-      case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
-      case ByteType => s.toByte
-      case ShortType => s.toShort
-      case IntegerType => s.toInt
-      case LongType => s.toLong
-      case FloatType => s.toFloat
-      case DoubleType => s.toDouble
-      case _: DecimalType => Decimal(s)
-      // This version of Spark does not use min/max for binary/string types so we ignore it.
-      case BinaryType | StringType => null
-      case _ =>
-        throw new AnalysisException("Column statistics deserialization is not supported for " +
-          s"column $name of data type: $dataType.")
-    }
-  }
-
-  /**
-   * Constructs an expression to compute column statistics for a given column.
-   *
-   * The expression should create a single struct column with the following schema:
-   * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long,
-   * distinctCountsForIntervals: Array[Long]
-   *
-   * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and
-   * as a result should stay in sync with it.
-   */
-  def statExprs(
-      col: Attribute,
-      conf: SQLConf,
-      colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = {
-    def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr =>
-      expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() }
-    })
-    val one = Literal(1, LongType)
-
-    // the approximate ndv (num distinct value) should never be larger than the number of rows
-    val numNonNulls = if (col.nullable) Count(col) else Count(one)
-    val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls))
-    val numNulls = Subtract(Count(one), numNonNulls)
-    val defaultSize = Literal(col.dataType.defaultSize, LongType)
-    val nullArray = Literal(null, ArrayType(LongType))
-
-    def fixedLenTypeStruct: CreateNamedStruct = {
-      val genHistogram =
-        ColumnStat.supportsHistogram(col.dataType) && colPercentiles.contains(col)
-      val intervalNdvsExpr = if (genHistogram) {
-        ApproxCountDistinctForIntervals(col,
-          Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError)
-      } else {
-        nullArray
-      }
-      // For fixed width types, avg size should be the same as max size.
-      struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls,
-        defaultSize, defaultSize, intervalNdvsExpr)
-    }
-
-    col.dataType match {
-      case _: IntegralType => fixedLenTypeStruct
-      case _: DecimalType => fixedLenTypeStruct
-      case DoubleType | FloatType => fixedLenTypeStruct
-      case BooleanType => fixedLenTypeStruct
-      case DateType => fixedLenTypeStruct
-      case TimestampType => fixedLenTypeStruct
-      case BinaryType | StringType =>
-        // For string and binary type, we don't compute min, max or histogram
-        val nullLit = Literal(null, col.dataType)
-        struct(
-          ndv, nullLit, nullLit, numNulls,
-          // Set avg/max size to default size if all the values are null or there is no value.
-          Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
-          Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
-          nullArray)
-      case _ =>
-        throw new AnalysisException("Analyzing column statistics is not supported for column " +
-          s"${col.name} of data type: ${col.dataType}.")
-    }
-  }
-
-  /** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */
-  def rowToColumnStat(
-      row: InternalRow,
-      attr: Attribute,
-      rowCount: Long,
-      percentiles: Option[ArrayData]): ColumnStat = {
-    // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins.
-    val cs = ColumnStat(
-      distinctCount = BigInt(row.getLong(0)),
-      // for string/binary min/max, get should return null
-      min = Option(row.get(1, attr.dataType)),
-      max = Option(row.get(2, attr.dataType)),
-      nullCount = BigInt(row.getLong(3)),
-      avgLen = row.getLong(4),
-      maxLen = row.getLong(5)
-    )
-    if (row.isNullAt(6)) {
-      cs
-    } else {
-      val ndvs = row.getArray(6).toLongArray()
-      assert(percentiles.get.numElements() == ndvs.length + 1)
-      val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
-      // Construct equi-height histogram
-      val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
-        HistogramBin(endpoints(i), endpoints(i + 1), ndv)
-      }
-      val nonNullRows = rowCount - cs.nullCount
-      val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
-      cs.copy(histogram = Some(histogram))
-    }
-  }
+  // Are avgLen and maxLen statistics defined?
+  val hasLenStats = avgLen.isDefined && maxLen.isDefined
 
+  def toCatalogColumnStat(colName: String, dataType: DataType): CatalogColumnStat =
+    CatalogColumnStat(
+      distinctCount = distinctCount,
+      min = min.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
+      max = max.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
+      nullCount = nullCount,
+      avgLen = avgLen,
+      maxLen = maxLen,
+      histogram = histogram)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
index c41fac4..111c594 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
@@ -32,13 +32,15 @@ object AggregateEstimation {
     val childStats = agg.child.stats
     // Check if we have column stats for all group-by columns.
     val colStatsExist = agg.groupingExpressions.forall { e =>
-      e.isInstanceOf[Attribute] && childStats.attributeStats.contains(e.asInstanceOf[Attribute])
+      e.isInstanceOf[Attribute] &&
+        childStats.attributeStats.get(e.asInstanceOf[Attribute]).exists(_.hasCountStats)
     }
     if (rowCountsExist(agg.child) && colStatsExist) {
       // Multiply distinct counts of group-by columns. This is an upper bound, which assumes
       // the data contains all combinations of distinct values of group-by columns.
       var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
-        (res, expr) => res * childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount)
+        (res, expr) => res *
+          childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get)
 
       outputRows = if (agg.groupingExpressions.isEmpty) {
         // If there's no group-by columns, the output is a single row containing values of aggregate

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index d793f77..0f147f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.math.BigDecimal.RoundingMode
 
@@ -38,9 +39,18 @@ object EstimationUtils {
     }
   }
 
+  /** Check if each attribute has column stat containing distinct and null counts
+   *  in the corresponding statistic. */
+  def columnStatsWithCountsExist(statsAndAttr: (Statistics, Attribute)*): Boolean = {
+    statsAndAttr.forall { case (stats, attr) =>
+      stats.attributeStats.get(attr).map(_.hasCountStats).getOrElse(false)
+    }
+  }
+
+  /** Statistics for a Column containing only NULLs. */
   def nullColumnStat(dataType: DataType, rowCount: BigInt): ColumnStat = {
-    ColumnStat(distinctCount = 0, min = None, max = None, nullCount = rowCount,
-      avgLen = dataType.defaultSize, maxLen = dataType.defaultSize)
+    ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(rowCount),
+      avgLen = Some(dataType.defaultSize), maxLen = Some(dataType.defaultSize))
   }
 
   /**
@@ -70,13 +80,13 @@ object EstimationUtils {
     // We assign a generic overhead for a Row object, the actual overhead is different for different
     // Row format.
     val sizePerRow = 8 + attributes.map { attr =>
-      if (attrStats.contains(attr)) {
+      if (attrStats.get(attr).map(_.avgLen.isDefined).getOrElse(false)) {
         attr.dataType match {
           case StringType =>
             // UTF8String: base + offset + numBytes
-            attrStats(attr).avgLen + 8 + 4
+            attrStats(attr).avgLen.get + 8 + 4
           case _ =>
-            attrStats(attr).avgLen
+            attrStats(attr).avgLen.get
         }
       } else {
         attr.dataType.defaultSize

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 4cc32de..0538c9d 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -225,7 +225,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
       attr: Attribute,
       isNull: Boolean,
       update: Boolean): Option[Double] = {
-    if (!colStatsMap.contains(attr)) {
+    if (!colStatsMap.contains(attr) || !colStatsMap(attr).hasCountStats) {
       logDebug("[CBO] No statistics for " + attr)
       return None
     }
@@ -234,14 +234,14 @@ case class FilterEstimation(plan: Filter) extends Logging {
     val nullPercent: Double = if (rowCountValue == 0) {
       0
     } else {
-      (BigDecimal(colStat.nullCount) / BigDecimal(rowCountValue)).toDouble
+      (BigDecimal(colStat.nullCount.get) / BigDecimal(rowCountValue)).toDouble
     }
 
     if (update) {
       val newStats = if (isNull) {
-        colStat.copy(distinctCount = 0, min = None, max = None)
+        colStat.copy(distinctCount = Some(0), min = None, max = None)
       } else {
-        colStat.copy(nullCount = 0)
+        colStat.copy(nullCount = Some(0))
       }
       colStatsMap.update(attr, newStats)
     }
@@ -322,17 +322,21 @@ case class FilterEstimation(plan: Filter) extends Logging {
         // value.
         val newStats = attr.dataType match {
           case StringType | BinaryType =>
-            colStat.copy(distinctCount = 1, nullCount = 0)
+            colStat.copy(distinctCount = Some(1), nullCount = Some(0))
           case _ =>
-            colStat.copy(distinctCount = 1, min = Some(literal.value),
-              max = Some(literal.value), nullCount = 0)
+            colStat.copy(distinctCount = Some(1), min = Some(literal.value),
+              max = Some(literal.value), nullCount = Some(0))
         }
         colStatsMap.update(attr, newStats)
       }
 
       if (colStat.histogram.isEmpty) {
-        // returns 1/ndv if there is no histogram
-        Some(1.0 / colStat.distinctCount.toDouble)
+        if (!colStat.distinctCount.isEmpty) {
+          // returns 1/ndv if there is no histogram
+          Some(1.0 / colStat.distinctCount.get.toDouble)
+        } else {
+          None
+        }
       } else {
         Some(computeEqualityPossibilityByHistogram(literal, colStat))
       }
@@ -378,13 +382,13 @@ case class FilterEstimation(plan: Filter) extends Logging {
       attr: Attribute,
       hSet: Set[Any],
       update: Boolean): Option[Double] = {
-    if (!colStatsMap.contains(attr)) {
+    if (!colStatsMap.hasDistinctCount(attr)) {
       logDebug("[CBO] No statistics for " + attr)
       return None
     }
 
     val colStat = colStatsMap(attr)
-    val ndv = colStat.distinctCount
+    val ndv = colStat.distinctCount.get
     val dataType = attr.dataType
     var newNdv = ndv
 
@@ -407,8 +411,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
         // 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
         newNdv = ndv.min(BigInt(validQuerySet.size))
         if (update) {
-          val newStats = colStat.copy(distinctCount = newNdv, min = Some(newMin),
-            max = Some(newMax), nullCount = 0)
+          val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin),
+            max = Some(newMax), nullCount = Some(0))
           colStatsMap.update(attr, newStats)
         }
 
@@ -416,7 +420,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
       case StringType | BinaryType =>
         newNdv = ndv.min(BigInt(hSet.size))
         if (update) {
-          val newStats = colStat.copy(distinctCount = newNdv, nullCount = 0)
+          val newStats = colStat.copy(distinctCount = Some(newNdv), nullCount = Some(0))
           colStatsMap.update(attr, newStats)
         }
     }
@@ -443,12 +447,17 @@ case class FilterEstimation(plan: Filter) extends Logging {
       literal: Literal,
       update: Boolean): Option[Double] = {
 
+    if (!colStatsMap.hasMinMaxStats(attr) || !colStatsMap.hasDistinctCount(attr)) {
+      logDebug("[CBO] No statistics for " + attr)
+      return None
+    }
+
     val colStat = colStatsMap(attr)
     val statsInterval =
       ValueInterval(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericValueInterval]
     val max = statsInterval.max
     val min = statsInterval.min
-    val ndv = colStat.distinctCount.toDouble
+    val ndv = colStat.distinctCount.get.toDouble
 
     // determine the overlapping degree between predicate interval and column's interval
     val numericLiteral = EstimationUtils.toDouble(literal.value, literal.dataType)
@@ -520,8 +529,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
             newMax = newValue
         }
 
-        val newStats = colStat.copy(distinctCount = ceil(ndv * percent),
-          min = newMin, max = newMax, nullCount = 0)
+        val newStats = colStat.copy(distinctCount = Some(ceil(ndv * percent)),
+          min = newMin, max = newMax, nullCount = Some(0))
 
         colStatsMap.update(attr, newStats)
       }
@@ -637,11 +646,11 @@ case class FilterEstimation(plan: Filter) extends Logging {
       attrRight: Attribute,
       update: Boolean): Option[Double] = {
 
-    if (!colStatsMap.contains(attrLeft)) {
+    if (!colStatsMap.hasCountStats(attrLeft)) {
       logDebug("[CBO] No statistics for " + attrLeft)
       return None
     }
-    if (!colStatsMap.contains(attrRight)) {
+    if (!colStatsMap.hasCountStats(attrRight)) {
       logDebug("[CBO] No statistics for " + attrRight)
       return None
     }
@@ -668,7 +677,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
     val minRight = statsIntervalRight.min
 
     // determine the overlapping degree between predicate interval and column's interval
-    val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
+    val allNotNull = (colStatLeft.nullCount.get == 0) && (colStatRight.nullCount.get == 0)
     val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
       // Left < Right or Left <= Right
       // - no overlap:
@@ -707,14 +716,14 @@ case class FilterEstimation(plan: Filter) extends Logging {
       case _: EqualTo =>
         ((maxLeft < minRight) || (maxRight < minLeft),
           (minLeft == minRight) && (maxLeft == maxRight) && allNotNull
-          && (colStatLeft.distinctCount == colStatRight.distinctCount)
+          && (colStatLeft.distinctCount.get == colStatRight.distinctCount.get)
         )
       case _: EqualNullSafe =>
         // For null-safe equality, we use a very restrictive condition to evaluate its overlap.
         // If null values exists, we set it to partial overlap.
         (((maxLeft < minRight) || (maxRight < minLeft)) && allNotNull,
           (minLeft == minRight) && (maxLeft == maxRight) && allNotNull
-            && (colStatLeft.distinctCount == colStatRight.distinctCount)
+            && (colStatLeft.distinctCount.get == colStatRight.distinctCount.get)
         )
     }
 
@@ -731,9 +740,9 @@ case class FilterEstimation(plan: Filter) extends Logging {
       if (update) {
         // Need to adjust new min/max after the filter condition is applied
 
-        val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+        val ndvLeft = BigDecimal(colStatLeft.distinctCount.get)
         val newNdvLeft = ceil(ndvLeft * percent)
-        val ndvRight = BigDecimal(colStatRight.distinctCount)
+        val ndvRight = BigDecimal(colStatRight.distinctCount.get)
         val newNdvRight = ceil(ndvRight * percent)
 
         var newMaxLeft = colStatLeft.max
@@ -817,10 +826,10 @@ case class FilterEstimation(plan: Filter) extends Logging {
           }
         }
 
-        val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft,
+        val newStatsLeft = colStatLeft.copy(distinctCount = Some(newNdvLeft), min = newMinLeft,
           max = newMaxLeft)
         colStatsMap(attrLeft) = newStatsLeft
-        val newStatsRight = colStatRight.copy(distinctCount = newNdvRight, min = newMinRight,
+        val newStatsRight = colStatRight.copy(distinctCount = Some(newNdvRight), min = newMinRight,
           max = newMaxRight)
         colStatsMap(attrRight) = newStatsRight
       }
@@ -849,17 +858,35 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
   def contains(a: Attribute): Boolean = updatedMap.contains(a.exprId) || originalMap.contains(a)
 
   /**
-   * Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
-   * originalMap, because updatedMap has the latest (updated) column stats.
+   * Gets an Option of column stat for the given attribute.
+   * Prefer the column stat in updatedMap than that in originalMap,
+   * because updatedMap has the latest (updated) column stats.
    */
-  def apply(a: Attribute): ColumnStat = {
+  def get(a: Attribute): Option[ColumnStat] = {
     if (updatedMap.contains(a.exprId)) {
-      updatedMap(a.exprId)._2
+      updatedMap.get(a.exprId).map(_._2)
     } else {
-      originalMap(a)
+      originalMap.get(a)
     }
   }
 
+  def hasCountStats(a: Attribute): Boolean =
+    get(a).map(_.hasCountStats).getOrElse(false)
+
+  def hasDistinctCount(a: Attribute): Boolean =
+    get(a).map(_.distinctCount.isDefined).getOrElse(false)
+
+  def hasMinMaxStats(a: Attribute): Boolean =
+    get(a).map(_.hasCountStats).getOrElse(false)
+
+  /**
+   * Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
+   * originalMap, because updatedMap has the latest (updated) column stats.
+   */
+  def apply(a: Attribute): ColumnStat = {
+    get(a).get
+  }
+
   /** Updates column stats in updatedMap. */
   def update(a: Attribute, stats: ColumnStat): Unit = updatedMap.update(a.exprId, a -> stats)
 
@@ -871,11 +898,14 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
     : AttributeMap[ColumnStat] = {
     val newColumnStats = originalMap.map { case (attr, oriColStat) =>
       val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
-      val newNdv = if (colStat.distinctCount > 1) {
+      val newNdv = if (colStat.distinctCount.isEmpty) {
+        // No NDV in the original stats.
+        None
+      } else if (colStat.distinctCount.get > 1) {
         // Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
         // decreases; otherwise keep it unchanged.
-        EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
-          newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
+        Some(EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
+          newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount.get))
       } else {
         // no need to scale down since it is already down to 1 (for skewed distribution case)
         colStat.distinctCount

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
index f0294a4..2543e38 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
@@ -85,7 +85,8 @@ case class JoinEstimation(join: Join) extends Logging {
       // 3. Update statistics based on the output of join
       val inputAttrStats = AttributeMap(
         leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq)
-      val attributesWithStat = join.output.filter(a => inputAttrStats.contains(a))
+      val attributesWithStat = join.output.filter(a =>
+        inputAttrStats.get(a).map(_.hasCountStats).getOrElse(false))
       val (fromLeft, fromRight) = attributesWithStat.partition(join.left.outputSet.contains(_))
 
       val outputStats: Seq[(Attribute, ColumnStat)] = if (outputRows == 0) {
@@ -106,10 +107,10 @@ case class JoinEstimation(join: Join) extends Logging {
           case FullOuter =>
             fromLeft.map { a =>
               val oriColStat = inputAttrStats(a)
-              (a, oriColStat.copy(nullCount = oriColStat.nullCount + rightRows))
+              (a, oriColStat.copy(nullCount = Some(oriColStat.nullCount.get + rightRows)))
             } ++ fromRight.map { a =>
               val oriColStat = inputAttrStats(a)
-              (a, oriColStat.copy(nullCount = oriColStat.nullCount + leftRows))
+              (a, oriColStat.copy(nullCount = Some(oriColStat.nullCount.get + leftRows)))
             }
           case _ =>
             assert(joinType == Inner || joinType == Cross)
@@ -219,19 +220,27 @@ case class JoinEstimation(join: Join) extends Logging {
   private def computeByNdv(
       leftKey: AttributeReference,
       rightKey: AttributeReference,
-      newMin: Option[Any],
-      newMax: Option[Any]): (BigInt, ColumnStat) = {
+      min: Option[Any],
+      max: Option[Any]): (BigInt, ColumnStat) = {
     val leftKeyStat = leftStats.attributeStats(leftKey)
     val rightKeyStat = rightStats.attributeStats(rightKey)
-    val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
+    val maxNdv = leftKeyStat.distinctCount.get.max(rightKeyStat.distinctCount.get)
     // Compute cardinality by the basic formula.
     val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
 
     // Get the intersected column stat.
-    val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
-    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
-    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
-    val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)
+    val newNdv = Some(leftKeyStat.distinctCount.get.min(rightKeyStat.distinctCount.get))
+    val newMaxLen = if (leftKeyStat.maxLen.isDefined && rightKeyStat.maxLen.isDefined) {
+      Some(math.min(leftKeyStat.maxLen.get, rightKeyStat.maxLen.get))
+    } else {
+      None
+    }
+    val newAvgLen = if (leftKeyStat.avgLen.isDefined && rightKeyStat.avgLen.isDefined) {
+      Some((leftKeyStat.avgLen.get + rightKeyStat.avgLen.get) / 2)
+    } else {
+      None
+    }
+    val newStats = ColumnStat(newNdv, min, max, Some(0), newAvgLen, newMaxLen)
 
     (ceil(card), newStats)
   }
@@ -267,9 +276,17 @@ case class JoinEstimation(join: Join) extends Logging {
 
     val leftKeyStat = leftStats.attributeStats(leftKey)
     val rightKeyStat = rightStats.attributeStats(rightKey)
-    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
-    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
-    val newStats = ColumnStat(ceil(totalNdv), newMin, newMax, 0, newAvgLen, newMaxLen)
+    val newMaxLen = if (leftKeyStat.maxLen.isDefined && rightKeyStat.maxLen.isDefined) {
+      Some(math.min(leftKeyStat.maxLen.get, rightKeyStat.maxLen.get))
+    } else {
+      None
+    }
+    val newAvgLen = if (leftKeyStat.avgLen.isDefined && rightKeyStat.avgLen.isDefined) {
+      Some((leftKeyStat.avgLen.get + rightKeyStat.avgLen.get) / 2)
+    } else {
+      None
+    }
+    val newStats = ColumnStat(Some(ceil(totalNdv)), newMin, newMax, Some(0), newAvgLen, newMaxLen)
     (ceil(card), newStats)
   }
 
@@ -292,10 +309,14 @@ case class JoinEstimation(join: Join) extends Logging {
       } else {
         val oldColStat = oldAttrStats(a)
         val oldNdv = oldColStat.distinctCount
-        val newNdv = if (join.left.outputSet.contains(a)) {
-          updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv)
+        val newNdv = if (oldNdv.isDefined) {
+          Some(if (join.left.outputSet.contains(a)) {
+            updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv.get)
+          } else {
+            updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv.get)
+          })
         } else {
-          updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv)
+          None
         }
         val newColStat = oldColStat.copy(distinctCount = newNdv)
         // TODO: support nullCount updates for specific outer joins
@@ -313,7 +334,7 @@ case class JoinEstimation(join: Join) extends Logging {
       // Note: join keys from EqualNullSafe also fall into this case (Coalesce), consider to
       // support it in the future by using `nullCount` in column stats.
       case (lk: AttributeReference, rk: AttributeReference)
-        if columnStatsExist((leftStats, lk), (rightStats, rk)) => (lk, rk)
+        if columnStatsWithCountsExist((leftStats, lk), (rightStats, rk)) => (lk, rk)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 2fb587d..565b0a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -62,24 +62,15 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
     }
   }
 
-  /** Set up tables and columns for testing */
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
-    attr("t1.k-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t1.v-1-10") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t2.k-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t3.v-1-100") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t4.k-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t4.v-1-10") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t5.k-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t5.v-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4)
+    attr("t1.k-1-2") -> rangeColumnStat(2, 0),
+    attr("t1.v-1-10") -> rangeColumnStat(10, 0),
+    attr("t2.k-1-5") -> rangeColumnStat(5, 0),
+    attr("t3.v-1-100") -> rangeColumnStat(100, 0),
+    attr("t4.k-1-2") -> rangeColumnStat(2, 0),
+    attr("t4.v-1-10") -> rangeColumnStat(10, 0),
+    attr("t5.k-1-5") -> rangeColumnStat(5, 0),
+    attr("t5.v-1-5") -> rangeColumnStat(5, 0)
   ))
 
   private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index ada6e2a..d4d23ad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -68,88 +68,56 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
 
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
     // F1 (fact table)
-    attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("f1_fk1") -> rangeColumnStat(100, 0),
+    attr("f1_fk2") -> rangeColumnStat(100, 0),
+    attr("f1_fk3") -> rangeColumnStat(100, 0),
+    attr("f1_c1") -> rangeColumnStat(100, 0),
+    attr("f1_c2") -> rangeColumnStat(100, 0),
 
     // D1 (dimension)
-    attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d1_pk") -> rangeColumnStat(100, 0),
+    attr("d1_c2") -> rangeColumnStat(50, 0),
+    attr("d1_c3") -> rangeColumnStat(50, 0),
 
     // D2 (dimension)
-    attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d2_pk") -> rangeColumnStat(20, 0),
+    attr("d2_c2") -> rangeColumnStat(10, 0),
+    attr("d2_c3") -> rangeColumnStat(10, 0),
 
     // D3 (dimension)
-    attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d3_pk") -> rangeColumnStat(10, 0),
+    attr("d3_c2") -> rangeColumnStat(5, 0),
+    attr("d3_c3") -> rangeColumnStat(5, 0),
 
     // T1 (regular table i.e. outside star)
-    attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t1_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t1_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t1_c1") -> rangeColumnStat(20, 1),
+    attr("t1_c2") -> rangeColumnStat(10, 1),
+    attr("t1_c3") -> rangeColumnStat(10, 1),
 
     // T2 (regular table)
-    attr("t2_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t2_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t2_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t2_c1") -> rangeColumnStat(5, 1),
+    attr("t2_c2") -> rangeColumnStat(5, 1),
+    attr("t2_c3") -> rangeColumnStat(5, 1),
 
     // T3 (regular table)
-    attr("t3_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t3_c1") -> rangeColumnStat(5, 1),
+    attr("t3_c2") -> rangeColumnStat(5, 1),
+    attr("t3_c3") -> rangeColumnStat(5, 1),
 
     // T4 (regular table)
-    attr("t4_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t4_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t4_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t4_c1") -> rangeColumnStat(5, 1),
+    attr("t4_c2") -> rangeColumnStat(5, 1),
+    attr("t4_c3") -> rangeColumnStat(5, 1),
 
     // T5 (regular table)
-    attr("t5_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t5_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t5_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t5_c1") -> rangeColumnStat(5, 1),
+    attr("t5_c2") -> rangeColumnStat(5, 1),
+    attr("t5_c3") -> rangeColumnStat(5, 1),
 
     // T6 (regular table)
-    attr("t6_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t6_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t6_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4)
+    attr("t6_c1") -> rangeColumnStat(5, 1),
+    attr("t6_c2") -> rangeColumnStat(5, 1),
+    attr("t6_c3") -> rangeColumnStat(5, 1)
 
   ))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 777c563..4e0883e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -70,59 +70,40 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
   // Tables' cardinality: f1 > d3 > d1 > d2 > s3
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
     // F1
-    attr("f1_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_fk2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_fk3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_c4") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("f1_fk1") -> rangeColumnStat(3, 0),
+    attr("f1_fk2") -> rangeColumnStat(3, 0),
+    attr("f1_fk3") -> rangeColumnStat(4, 0),
+    attr("f1_c4") -> rangeColumnStat(4, 0),
     // D1
-    attr("d1_pk1") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c4") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d1_pk1") -> rangeColumnStat(4, 0),
+    attr("d1_c2") -> rangeColumnStat(3, 0),
+    attr("d1_c3") -> rangeColumnStat(4, 0),
+    attr("d1_c4") -> ColumnStat(distinctCount = Some(2), min = Some("2"), max = Some("3"),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     // D2
-    attr("d2_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("d2_pk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d2_c3") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d2_c4") -> ColumnStat(distinctCount = 2, min = Some(3), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d2_c2") -> ColumnStat(distinctCount = Some(3), min = Some("1"), max = Some("3"),
+      nullCount = Some(1), avgLen = Some(4), maxLen = Some(4)),
+    attr("d2_pk1") -> rangeColumnStat(3, 0),
+    attr("d2_c3") -> rangeColumnStat(3, 0),
+    attr("d2_c4") -> ColumnStat(distinctCount = Some(2), min = Some("3"), max = Some("4"),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     // D3
-    attr("d3_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_pk1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_c4") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d3_fk1") -> rangeColumnStat(3, 0),
+    attr("d3_c2") -> rangeColumnStat(3, 0),
+    attr("d3_pk1") -> rangeColumnStat(5, 0),
+    attr("d3_c4") -> ColumnStat(distinctCount = Some(2), min = Some("2"), max = Some("3"),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     // S3
-    attr("s3_pk1") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("s3_c2") -> ColumnStat(distinctCount = 1, min = Some(3), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("s3_c3") -> ColumnStat(distinctCount = 1, min = Some(3), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("s3_c4") -> ColumnStat(distinctCount = 2, min = Some(3), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("s3_pk1") -> rangeColumnStat(2, 0),
+    attr("s3_c2") -> rangeColumnStat(1, 0),
+    attr("s3_c3") -> rangeColumnStat(1, 0),
+    attr("s3_c4") -> ColumnStat(distinctCount = Some(2), min = Some("3"), max = Some("4"),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     // F11
-    attr("f11_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f11_fk2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f11_fk3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f11_c4") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4)
+    attr("f11_fk1") -> rangeColumnStat(3, 0),
+    attr("f11_fk2") -> rangeColumnStat(3, 0),
+    attr("f11_fk3") -> rangeColumnStat(4, 0),
+    attr("f11_c4") -> rangeColumnStat(4, 0)
   ))
 
   private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
index 23f95a6..8213d56 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
@@ -29,16 +29,16 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
 
   /** Columns for testing */
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
-    attr("key11") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key12") -> ColumnStat(distinctCount = 4, min = Some(10), max = Some(40), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key21") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key22") -> ColumnStat(distinctCount = 2, min = Some(10), max = Some(20), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key31") -> ColumnStat(distinctCount = 0, min = None, max = None, nullCount = 0,
-      avgLen = 4, maxLen = 4)
+    attr("key11") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key12") -> ColumnStat(distinctCount = Some(4), min = Some(10), max = Some(40),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key21") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None,
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
   ))
 
   private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
@@ -63,8 +63,8 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
       tableRowCount = 6,
       groupByColumns = Seq("key21", "key22"),
       // Row count = product of ndv
-      expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount * nameToColInfo("key22")._2
-        .distinctCount)
+      expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount.get *
+        nameToColInfo("key22")._2.distinctCount.get)
   }
 
   test("empty group-by column") {

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 7d532ff..953094c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.IntegerType
 
 class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
   val attribute = attr("key")
-  val colStat = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-    nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStat = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   val plan = StatsTestPlan(
     outputList = Seq(attribute),
@@ -116,13 +116,17 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
         sizeInBytes = 40,
         rowCount = Some(10),
         attributeStats = AttributeMap(Seq(
-          AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))))
+          AttributeReference("c1", IntegerType)() -> ColumnStat(distinctCount = Some(10),
+            min = Some(1), max = Some(10),
+            nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))))
     val expectedCboStats =
       Statistics(
         sizeInBytes = 4,
         rowCount = Some(1),
         attributeStats = AttributeMap(Seq(
-          AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))))
+          AttributeReference("c1", IntegerType)() -> ColumnStat(distinctCount = Some(10),
+            min = Some(5), max = Some(5),
+            nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))))
 
     val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
     checkStats(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org