You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by li...@apache.org on 2018/02/27 07:37:36 UTC

[1/3] spark git commit: [SPARK-23445] ColumnStat refactoring

Repository: spark
Updated Branches:
  refs/heads/master 7ec83658f -> 8077bb04f


http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
index 1ee1d57..28c340a 100644
--- a/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
+++ b/sql/hive/src/main/scala/org/apache/spark/sql/hive/HiveExternalCatalog.scala
@@ -663,14 +663,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     requireTableExists(db, table)
     val rawTable = getRawTable(db, table)
 
-    // For datasource tables and hive serde tables created by spark 2.1 or higher,
-    // the data schema is stored in the table properties.
-    val schema = restoreTableMetadata(rawTable).schema
-
     // convert table statistics to properties so that we can persist them through hive client
     val statsProperties =
       if (stats.isDefined) {
-        statsToProperties(stats.get, schema)
+        statsToProperties(stats.get)
       } else {
         new mutable.HashMap[String, String]()
       }
@@ -1028,9 +1024,7 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     currentFullPath
   }
 
-  private def statsToProperties(
-      stats: CatalogStatistics,
-      schema: StructType): Map[String, String] = {
+  private def statsToProperties(stats: CatalogStatistics): Map[String, String] = {
 
     val statsProperties = new mutable.HashMap[String, String]()
     statsProperties += STATISTICS_TOTAL_SIZE -> stats.sizeInBytes.toString()
@@ -1038,11 +1032,12 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
       statsProperties += STATISTICS_NUM_ROWS -> stats.rowCount.get.toString()
     }
 
-    val colNameTypeMap: Map[String, DataType] =
-      schema.fields.map(f => (f.name, f.dataType)).toMap
     stats.colStats.foreach { case (colName, colStat) =>
-      colStat.toMap(colName, colNameTypeMap(colName)).foreach { case (k, v) =>
-        statsProperties += (columnStatKeyPropName(colName, k) -> v)
+      colStat.toMap(colName).foreach { case (k, v) =>
+        // Fully qualified name used in table properties for a particular column stat.
+        // For example, for column "mycol", and "min" stat, this should return
+        // "spark.sql.statistics.colStats.mycol.min".
+        statsProperties += (STATISTICS_COL_STATS_PREFIX + k -> v)
       }
     }
 
@@ -1058,23 +1053,20 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
     if (statsProps.isEmpty) {
       None
     } else {
+      val colStats = new mutable.HashMap[String, CatalogColumnStat]
+      val colStatsProps = properties.filterKeys(_.startsWith(STATISTICS_COL_STATS_PREFIX)).map {
+        case (k, v) => k.drop(STATISTICS_COL_STATS_PREFIX.length) -> v
+      }
 
-      val colStats = new mutable.HashMap[String, ColumnStat]
-
-      // For each column, recover its column stats. Note that this is currently a O(n^2) operation,
-      // but given the number of columns it usually not enormous, this is probably OK as a start.
-      // If we want to map this a linear operation, we'd need a stronger contract between the
-      // naming convention used for serialization.
-      schema.foreach { field =>
-        if (statsProps.contains(columnStatKeyPropName(field.name, ColumnStat.KEY_VERSION))) {
-          // If "version" field is defined, then the column stat is defined.
-          val keyPrefix = columnStatKeyPropName(field.name, "")
-          val colStatMap = statsProps.filterKeys(_.startsWith(keyPrefix)).map { case (k, v) =>
-            (k.drop(keyPrefix.length), v)
-          }
-          ColumnStat.fromMap(table, field, colStatMap).foreach { cs =>
-            colStats += field.name -> cs
-          }
+      // Find all the column names by matching the KEY_VERSION properties for them.
+      colStatsProps.keys.filter {
+        k => k.endsWith(CatalogColumnStat.KEY_VERSION)
+      }.map { k =>
+        k.dropRight(CatalogColumnStat.KEY_VERSION.length + 1)
+      }.foreach { fieldName =>
+        // and for each, create a column stat.
+        CatalogColumnStat.fromMap(table, fieldName, colStatsProps).foreach { cs =>
+          colStats += fieldName -> cs
         }
       }
 
@@ -1093,14 +1085,10 @@ private[spark] class HiveExternalCatalog(conf: SparkConf, hadoopConf: Configurat
 
     val rawTable = getRawTable(db, table)
 
-    // For datasource tables and hive serde tables created by spark 2.1 or higher,
-    // the data schema is stored in the table properties.
-    val schema = restoreTableMetadata(rawTable).schema
-
     // convert partition statistics to properties so that we can persist them through hive api
     val withStatsProps = lowerCasedParts.map { p =>
       if (p.stats.isDefined) {
-        val statsProperties = statsToProperties(p.stats.get, schema)
+        val statsProperties = statsToProperties(p.stats.get)
         p.copy(parameters = p.parameters ++ statsProperties)
       } else {
         p
@@ -1310,15 +1298,6 @@ object HiveExternalCatalog {
   val EMPTY_DATA_SCHEMA = new StructType()
     .add("col", "array<string>", nullable = true, comment = "from deserializer")
 
-  /**
-   * Returns the fully qualified name used in table properties for a particular column stat.
-   * For example, for column "mycol", and "min" stat, this should return
-   * "spark.sql.statistics.colStats.mycol.min".
-   */
-  private def columnStatKeyPropName(columnName: String, statKey: String): String = {
-    STATISTICS_COL_STATS_PREFIX + columnName + "." + statKey
-  }
-
   // A persisted data source table always store its schema in the catalog.
   private def getSchemaFromTableProperties(metadata: CatalogTable): StructType = {
     val errorMessage = "Could not read schema from the hive metastore because it is corrupted."

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
index 3af8af0..61cec82 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/StatisticsSuite.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.hive.common.StatsSetupConst
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.analysis.NoSuchPartitionException
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, HiveTableRelation}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, HiveTableRelation}
 import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, HistogramBin, HistogramSerializer}
 import org.apache.spark.sql.catalyst.util.{DateTimeUtils, StringUtils}
 import org.apache.spark.sql.execution.command.DDLUtils
@@ -177,8 +177,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val fetchedStats0 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2))
       assert(fetchedStats0.get.colStats == Map(
-        "a" -> ColumnStat(2, Some(1), Some(2), 0, 4, 4),
-        "b" -> ColumnStat(1, Some(1), Some(1), 0, 4, 4)))
+        "a" -> CatalogColumnStat(Some(2), Some("1"), Some("2"), Some(0), Some(4), Some(4)),
+        "b" -> CatalogColumnStat(Some(1), Some("1"), Some("1"), Some(0), Some(4), Some(4))))
     }
   }
 
@@ -208,8 +208,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val fetchedStats1 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
       assert(fetchedStats1.colStats == Map(
-        "C1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
-          avgLen = 4, maxLen = 4)))
+        "C1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))))
     }
   }
 
@@ -596,7 +596,8 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c1")
       val fetchedStats0 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(0))
-      assert(fetchedStats0.get.colStats == Map("c1" -> ColumnStat(0, None, None, 0, 4, 4)))
+      assert(fetchedStats0.get.colStats ==
+        Map("c1" -> CatalogColumnStat(Some(0), None, None, Some(0), Some(4), Some(4))))
 
       // Insert new data and analyze: have the latest column stats.
       sql(s"INSERT INTO TABLE $table SELECT 1, 'a', 10.0")
@@ -604,18 +605,18 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val fetchedStats1 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
       assert(fetchedStats1.colStats == Map(
-        "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
-          avgLen = 4, maxLen = 4)))
+        "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))))
 
       // Analyze another column: since the table is not changed, the precious column stats are kept.
       sql(s"ANALYZE TABLE $table COMPUTE STATISTICS FOR COLUMNS c2")
       val fetchedStats2 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(1)).get
       assert(fetchedStats2.colStats == Map(
-        "c1" -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(1), nullCount = 0,
-          avgLen = 4, maxLen = 4),
-        "c2" -> ColumnStat(distinctCount = 1, min = None, max = None, nullCount = 0,
-          avgLen = 1, maxLen = 1)))
+        "c1" -> CatalogColumnStat(distinctCount = Some(1), min = Some("1"), max = Some("1"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        "c2" -> CatalogColumnStat(distinctCount = Some(1), min = None, max = None,
+          nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))))
 
       // Insert new data and analyze: stale column stats are removed and newly collected column
       // stats are added.
@@ -624,10 +625,10 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
       val fetchedStats3 =
         checkTableStats(table, hasSizeInBytes = true, expectedRowCounts = Some(2)).get
       assert(fetchedStats3.colStats == Map(
-        "c1" -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
-          avgLen = 4, maxLen = 4),
-        "c3" -> ColumnStat(distinctCount = 2, min = Some(10.0), max = Some(20.0), nullCount = 0,
-          avgLen = 8, maxLen = 8)))
+        "c1" -> CatalogColumnStat(distinctCount = Some(2), min = Some("1"), max = Some("2"),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        "c3" -> CatalogColumnStat(distinctCount = Some(2), min = Some("10.0"), max = Some("20.0"),
+          nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))))
     }
   }
 
@@ -999,115 +1000,11 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
   test("verify serialized column stats after analyzing columns") {
     import testImplicits._
 
-    val tableName = "column_stats_test2"
+    val tableName = "column_stats_test_ser"
     // (data.head.productArity - 1) because the last column does not support stats collection.
     assert(stats.size == data.head.productArity - 1)
     val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
 
-    val expectedSerializedColStats = Map(
-      "spark.sql.statistics.colStats.cbinary.avgLen" -> "3",
-      "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cbinary.maxLen" -> "3",
-      "spark.sql.statistics.colStats.cbinary.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cbinary.version" -> "1",
-      "spark.sql.statistics.colStats.cbool.avgLen" -> "1",
-      "spark.sql.statistics.colStats.cbool.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cbool.max" -> "true",
-      "spark.sql.statistics.colStats.cbool.maxLen" -> "1",
-      "spark.sql.statistics.colStats.cbool.min" -> "false",
-      "spark.sql.statistics.colStats.cbool.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cbool.version" -> "1",
-      "spark.sql.statistics.colStats.cbyte.avgLen" -> "1",
-      "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cbyte.max" -> "2",
-      "spark.sql.statistics.colStats.cbyte.maxLen" -> "1",
-      "spark.sql.statistics.colStats.cbyte.min" -> "1",
-      "spark.sql.statistics.colStats.cbyte.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cbyte.version" -> "1",
-      "spark.sql.statistics.colStats.cdate.avgLen" -> "4",
-      "spark.sql.statistics.colStats.cdate.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09",
-      "spark.sql.statistics.colStats.cdate.maxLen" -> "4",
-      "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08",
-      "spark.sql.statistics.colStats.cdate.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cdate.version" -> "1",
-      "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16",
-      "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000",
-      "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16",
-      "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000",
-      "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cdecimal.version" -> "1",
-      "spark.sql.statistics.colStats.cdouble.avgLen" -> "8",
-      "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cdouble.max" -> "6.0",
-      "spark.sql.statistics.colStats.cdouble.maxLen" -> "8",
-      "spark.sql.statistics.colStats.cdouble.min" -> "1.0",
-      "spark.sql.statistics.colStats.cdouble.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cdouble.version" -> "1",
-      "spark.sql.statistics.colStats.cfloat.avgLen" -> "4",
-      "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cfloat.max" -> "7.0",
-      "spark.sql.statistics.colStats.cfloat.maxLen" -> "4",
-      "spark.sql.statistics.colStats.cfloat.min" -> "1.0",
-      "spark.sql.statistics.colStats.cfloat.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cfloat.version" -> "1",
-      "spark.sql.statistics.colStats.cint.avgLen" -> "4",
-      "spark.sql.statistics.colStats.cint.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cint.max" -> "4",
-      "spark.sql.statistics.colStats.cint.maxLen" -> "4",
-      "spark.sql.statistics.colStats.cint.min" -> "1",
-      "spark.sql.statistics.colStats.cint.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cint.version" -> "1",
-      "spark.sql.statistics.colStats.clong.avgLen" -> "8",
-      "spark.sql.statistics.colStats.clong.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.clong.max" -> "5",
-      "spark.sql.statistics.colStats.clong.maxLen" -> "8",
-      "spark.sql.statistics.colStats.clong.min" -> "1",
-      "spark.sql.statistics.colStats.clong.nullCount" -> "1",
-      "spark.sql.statistics.colStats.clong.version" -> "1",
-      "spark.sql.statistics.colStats.cshort.avgLen" -> "2",
-      "spark.sql.statistics.colStats.cshort.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cshort.max" -> "3",
-      "spark.sql.statistics.colStats.cshort.maxLen" -> "2",
-      "spark.sql.statistics.colStats.cshort.min" -> "1",
-      "spark.sql.statistics.colStats.cshort.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cshort.version" -> "1",
-      "spark.sql.statistics.colStats.cstring.avgLen" -> "3",
-      "spark.sql.statistics.colStats.cstring.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.cstring.maxLen" -> "3",
-      "spark.sql.statistics.colStats.cstring.nullCount" -> "1",
-      "spark.sql.statistics.colStats.cstring.version" -> "1",
-      "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8",
-      "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2",
-      "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 00:00:02.0",
-      "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
-      "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.0",
-      "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
-      "spark.sql.statistics.colStats.ctimestamp.version" -> "1"
-    )
-
-    val expectedSerializedHistograms = Map(
-      "spark.sql.statistics.colStats.cbyte.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cbyte").histogram.get),
-      "spark.sql.statistics.colStats.cshort.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cshort").histogram.get),
-      "spark.sql.statistics.colStats.cint.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cint").histogram.get),
-      "spark.sql.statistics.colStats.clong.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("clong").histogram.get),
-      "spark.sql.statistics.colStats.cdouble.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cdouble").histogram.get),
-      "spark.sql.statistics.colStats.cfloat.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cfloat").histogram.get),
-      "spark.sql.statistics.colStats.cdecimal.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cdecimal").histogram.get),
-      "spark.sql.statistics.colStats.cdate.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get),
-      "spark.sql.statistics.colStats.ctimestamp.histogram" ->
-        HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get)
-    )
-
     def checkColStatsProps(expected: Map[String, String]): Unit = {
       sql(s"ANALYZE TABLE $tableName COMPUTE STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
       val table = hiveClient.getTable("default", tableName)
@@ -1129,6 +1026,29 @@ class StatisticsSuite extends StatisticsCollectionTestBase with TestHiveSingleto
     }
   }
 
+  test("verify column stats can be deserialized from tblproperties") {
+    import testImplicits._
+
+    val tableName = "column_stats_test_de"
+    // (data.head.productArity - 1) because the last column does not support stats collection.
+    assert(stats.size == data.head.productArity - 1)
+    val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+
+    withTable(tableName) {
+      df.write.saveAsTable(tableName)
+
+      // Put in stats properties manually.
+      val table = getCatalogTable(tableName)
+      val newTable = table.copy(
+        properties = table.properties ++
+          expectedSerializedColStats ++ expectedSerializedHistograms +
+          ("spark.sql.statistics.totalSize" -> "1") /* totalSize always required */)
+      hiveClient.alterTable(newTable)
+
+      validateColStats(tableName, statsWithHgms)
+    }
+  }
+
   test("serialization and deserialization of histograms to/from hive metastore") {
     import testImplicits._
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[3/3] spark git commit: [SPARK-23445] ColumnStat refactoring

Posted by li...@apache.org.
[SPARK-23445] ColumnStat refactoring

## What changes were proposed in this pull request?

Refactor ColumnStat to be more flexible.

* Split `ColumnStat` and `CatalogColumnStat` just like `CatalogStatistics` is split from `Statistics`. This detaches how the statistics are stored from how they are processed in the query plan. `CatalogColumnStat` keeps `min` and `max` as `String`, making it not depend on dataType information.
* For `CatalogColumnStat`, parse column names from property names in the metastore (`KEY_VERSION` property), not from metastore schema. This means that `CatalogColumnStat`s can be created for columns even if the schema itself is not stored in the metastore.
* Make all fields optional. `min`, `max` and `histogram` for columns were optional already. Having them all optional is more consistent, and gives flexibility to e.g. drop some of the fields through transformations if they are difficult / impossible to calculate.

The added flexibility will make it possible to have alternative implementations for stats, and separates stats collection from stats and estimation processing in plans.

## How was this patch tested?

Refactored existing tests to work with refactored `ColumnStat` and `CatalogColumnStat`.
New tests added in `StatisticsSuite` checking that backwards / forwards compatibility is not broken.

Author: Juliusz Sompolski <ju...@databricks.com>

Closes #20624 from juliuszsompolski/SPARK-23445.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/8077bb04
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/8077bb04
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/8077bb04

Branch: refs/heads/master
Commit: 8077bb04f350fd35df83ef896135c0672dc3f7b0
Parents: 7ec8365
Author: Juliusz Sompolski <ju...@databricks.com>
Authored: Mon Feb 26 23:37:31 2018 -0800
Committer: gatorsmile <ga...@gmail.com>
Committed: Mon Feb 26 23:37:31 2018 -0800

----------------------------------------------------------------------
 .../spark/sql/catalyst/catalog/interface.scala  | 146 +++++++++-
 .../optimizer/StarSchemaDetection.scala         |   6 +-
 .../sql/catalyst/plans/logical/Statistics.scala | 256 ++---------------
 .../statsEstimation/AggregateEstimation.scala   |   6 +-
 .../statsEstimation/EstimationUtils.scala       |  20 +-
 .../statsEstimation/FilterEstimation.scala      |  98 ++++---
 .../statsEstimation/JoinEstimation.scala        |  55 ++--
 .../catalyst/optimizer/JoinReorderSuite.scala   |  25 +-
 .../StarJoinCostBasedReorderSuite.scala         |  96 +++----
 .../optimizer/StarJoinReorderSuite.scala        |  77 ++---
 .../AggregateEstimationSuite.scala              |  24 +-
 .../BasicStatsEstimationSuite.scala             |  12 +-
 .../statsEstimation/FilterEstimationSuite.scala | 279 ++++++++++---------
 .../statsEstimation/JoinEstimationSuite.scala   | 138 +++++----
 .../ProjectEstimationSuite.scala                |  70 +++--
 .../StatsEstimationTestBase.scala               |  10 +-
 .../command/AnalyzeColumnCommand.scala          | 138 ++++++++-
 .../spark/sql/execution/command/tables.scala    |   9 +-
 .../spark/sql/StatisticsCollectionSuite.scala   |   9 +-
 .../sql/StatisticsCollectionTestBase.scala      | 168 +++++++++--
 .../spark/sql/hive/HiveExternalCatalog.scala    |  63 ++---
 .../apache/spark/sql/hive/StatisticsSuite.scala | 162 +++--------
 22 files changed, 995 insertions(+), 872 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
index 95b6fbb..f3e67dc 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/interface.scala
@@ -21,7 +21,9 @@ import java.net.URI
 import java.util.Date
 
 import scala.collection.mutable
+import scala.util.control.NonFatal
 
+import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.{FunctionIdentifier, InternalRow, TableIdentifier}
 import org.apache.spark.sql.catalyst.analysis.MultiInstanceRelation
@@ -30,7 +32,7 @@ import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.plans.logical.statsEstimation.EstimationUtils
 import org.apache.spark.sql.catalyst.util.{CaseInsensitiveMap, DateTimeUtils}
 import org.apache.spark.sql.catalyst.util.quoteIdentifier
-import org.apache.spark.sql.types.StructType
+import org.apache.spark.sql.types._
 
 
 /**
@@ -361,7 +363,7 @@ object CatalogTable {
 case class CatalogStatistics(
     sizeInBytes: BigInt,
     rowCount: Option[BigInt] = None,
-    colStats: Map[String, ColumnStat] = Map.empty) {
+    colStats: Map[String, CatalogColumnStat] = Map.empty) {
 
   /**
    * Convert [[CatalogStatistics]] to [[Statistics]], and match column stats to attributes based
@@ -369,7 +371,8 @@ case class CatalogStatistics(
    */
   def toPlanStats(planOutput: Seq[Attribute], cboEnabled: Boolean): Statistics = {
     if (cboEnabled && rowCount.isDefined) {
-      val attrStats = AttributeMap(planOutput.flatMap(a => colStats.get(a.name).map(a -> _)))
+      val attrStats = AttributeMap(planOutput
+        .flatMap(a => colStats.get(a.name).map(a -> _.toPlanStat(a.name, a.dataType))))
       // Estimate size as number of rows * row size.
       val size = EstimationUtils.getOutputSize(planOutput, rowCount.get, attrStats)
       Statistics(sizeInBytes = size, rowCount = rowCount, attributeStats = attrStats)
@@ -387,6 +390,143 @@ case class CatalogStatistics(
   }
 }
 
+/**
+ * This class of statistics for a column is used in [[CatalogTable]] to interact with metastore.
+ */
+case class CatalogColumnStat(
+    distinctCount: Option[BigInt] = None,
+    min: Option[String] = None,
+    max: Option[String] = None,
+    nullCount: Option[BigInt] = None,
+    avgLen: Option[Long] = None,
+    maxLen: Option[Long] = None,
+    histogram: Option[Histogram] = None) {
+
+  /**
+   * Returns a map from string to string that can be used to serialize the column stats.
+   * The key is the name of the column and name of the field (e.g. "colName.distinctCount"),
+   * and the value is the string representation for the value.
+   * min/max values are stored as Strings. They can be deserialized using
+   * [[CatalogColumnStat.fromExternalString]].
+   *
+   * As part of the protocol, the returned map always contains a key called "version".
+   * Any of the fields that are null (None) won't appear in the map.
+   */
+  def toMap(colName: String): Map[String, String] = {
+    val map = new scala.collection.mutable.HashMap[String, String]
+    map.put(s"${colName}.${CatalogColumnStat.KEY_VERSION}", "1")
+    distinctCount.foreach { v =>
+      map.put(s"${colName}.${CatalogColumnStat.KEY_DISTINCT_COUNT}", v.toString)
+    }
+    nullCount.foreach { v =>
+      map.put(s"${colName}.${CatalogColumnStat.KEY_NULL_COUNT}", v.toString)
+    }
+    avgLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_AVG_LEN}", v.toString) }
+    maxLen.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_LEN}", v.toString) }
+    min.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MIN_VALUE}", v) }
+    max.foreach { v => map.put(s"${colName}.${CatalogColumnStat.KEY_MAX_VALUE}", v) }
+    histogram.foreach { h =>
+      map.put(s"${colName}.${CatalogColumnStat.KEY_HISTOGRAM}", HistogramSerializer.serialize(h))
+    }
+    map.toMap
+  }
+
+  /** Convert [[CatalogColumnStat]] to [[ColumnStat]]. */
+  def toPlanStat(
+      colName: String,
+      dataType: DataType): ColumnStat =
+    ColumnStat(
+      distinctCount = distinctCount,
+      min = min.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+      max = max.map(CatalogColumnStat.fromExternalString(_, colName, dataType)),
+      nullCount = nullCount,
+      avgLen = avgLen,
+      maxLen = maxLen,
+      histogram = histogram)
+}
+
+object CatalogColumnStat extends Logging {
+
+  // List of string keys used to serialize CatalogColumnStat
+  val KEY_VERSION = "version"
+  private val KEY_DISTINCT_COUNT = "distinctCount"
+  private val KEY_MIN_VALUE = "min"
+  private val KEY_MAX_VALUE = "max"
+  private val KEY_NULL_COUNT = "nullCount"
+  private val KEY_AVG_LEN = "avgLen"
+  private val KEY_MAX_LEN = "maxLen"
+  private val KEY_HISTOGRAM = "histogram"
+
+  /**
+   * Converts from string representation of data type to the corresponding Catalyst data type.
+   */
+  def fromExternalString(s: String, name: String, dataType: DataType): Any = {
+    dataType match {
+      case BooleanType => s.toBoolean
+      case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
+      case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
+      case ByteType => s.toByte
+      case ShortType => s.toShort
+      case IntegerType => s.toInt
+      case LongType => s.toLong
+      case FloatType => s.toFloat
+      case DoubleType => s.toDouble
+      case _: DecimalType => Decimal(s)
+      // This version of Spark does not use min/max for binary/string types so we ignore it.
+      case BinaryType | StringType => null
+      case _ =>
+        throw new AnalysisException("Column statistics deserialization is not supported for " +
+          s"column $name of data type: $dataType.")
+    }
+  }
+
+  /**
+   * Converts the given value from Catalyst data type to string representation of external
+   * data type.
+   */
+  def toExternalString(v: Any, colName: String, dataType: DataType): String = {
+    val externalValue = dataType match {
+      case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
+      case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
+      case BooleanType | _: IntegralType | FloatType | DoubleType => v
+      case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
+      // This version of Spark does not use min/max for binary/string types so we ignore it.
+      case _ =>
+        throw new AnalysisException("Column statistics serialization is not supported for " +
+          s"column $colName of data type: $dataType.")
+    }
+    externalValue.toString
+  }
+
+
+  /**
+   * Creates a [[CatalogColumnStat]] object from the given map.
+   * This is used to deserialize column stats from some external storage.
+   * The serialization side is defined in [[CatalogColumnStat.toMap]].
+   */
+  def fromMap(
+    table: String,
+    colName: String,
+    map: Map[String, String]): Option[CatalogColumnStat] = {
+
+    try {
+      Some(CatalogColumnStat(
+        distinctCount = map.get(s"${colName}.${KEY_DISTINCT_COUNT}").map(v => BigInt(v.toLong)),
+        min = map.get(s"${colName}.${KEY_MIN_VALUE}"),
+        max = map.get(s"${colName}.${KEY_MAX_VALUE}"),
+        nullCount = map.get(s"${colName}.${KEY_NULL_COUNT}").map(v => BigInt(v.toLong)),
+        avgLen = map.get(s"${colName}.${KEY_AVG_LEN}").map(_.toLong),
+        maxLen = map.get(s"${colName}.${KEY_MAX_LEN}").map(_.toLong),
+        histogram = map.get(s"${colName}.${KEY_HISTOGRAM}").map(HistogramSerializer.deserialize)
+      ))
+    } catch {
+      case NonFatal(e) =>
+        logWarning(s"Failed to parse column statistics for column ${colName} in table $table", e)
+        None
+    }
+  }
+}
+
 
 case class CatalogTableType private(name: String)
 object CatalogTableType {

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
index 1f20b76..2aa762e 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/optimizer/StarSchemaDetection.scala
@@ -187,11 +187,11 @@ object StarSchemaDetection extends PredicateHelper {
           stats.rowCount match {
             case Some(rowCount) if rowCount >= 0 =>
               if (stats.attributeStats.nonEmpty && stats.attributeStats.contains(col)) {
-                val colStats = stats.attributeStats.get(col)
-                if (colStats.get.nullCount > 0) {
+                val colStats = stats.attributeStats.get(col).get
+                if (!colStats.hasCountStats || colStats.nullCount.get > 0) {
                   false
                 } else {
-                  val distinctCount = colStats.get.distinctCount
+                  val distinctCount = colStats.distinctCount.get
                   val relDiff = math.abs((distinctCount.toDouble / rowCount.toDouble) - 1.0d)
                   // ndvMaxErr adjusted based on TPCDS 1TB data results
                   relDiff <= conf.ndvMaxError * 2

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 96b199d..b3a4886 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -27,6 +27,7 @@ import net.jpountz.lz4.{LZ4BlockInputStream, LZ4BlockOutputStream}
 import org.apache.spark.internal.Logging
 import org.apache.spark.sql.AnalysisException
 import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.util.{ArrayData, DateTimeUtils}
@@ -79,11 +80,10 @@ case class Statistics(
 /**
  * Statistics collected for a column.
  *
- * 1. Supported data types are defined in `ColumnStat.supportsType`.
- * 2. The JVM data type stored in min/max is the internal data type for the corresponding
+ * 1. The JVM data type stored in min/max is the internal data type for the corresponding
  *    Catalyst data type. For example, the internal type of DateType is Int, and that the internal
  *    type of TimestampType is Long.
- * 3. There is no guarantee that the statistics collected are accurate. Approximation algorithms
+ * 2. There is no guarantee that the statistics collected are accurate. Approximation algorithms
  *    (sketches) might have been used, and the data collected can also be stale.
  *
  * @param distinctCount number of distinct values
@@ -95,240 +95,32 @@ case class Statistics(
  * @param histogram histogram of the values
  */
 case class ColumnStat(
-    distinctCount: BigInt,
-    min: Option[Any],
-    max: Option[Any],
-    nullCount: BigInt,
-    avgLen: Long,
-    maxLen: Long,
+    distinctCount: Option[BigInt] = None,
+    min: Option[Any] = None,
+    max: Option[Any] = None,
+    nullCount: Option[BigInt] = None,
+    avgLen: Option[Long] = None,
+    maxLen: Option[Long] = None,
     histogram: Option[Histogram] = None) {
 
-  // We currently don't store min/max for binary/string type. This can change in the future and
-  // then we need to remove this require.
-  require(min.isEmpty || (!min.get.isInstanceOf[Array[Byte]] && !min.get.isInstanceOf[String]))
-  require(max.isEmpty || (!max.get.isInstanceOf[Array[Byte]] && !max.get.isInstanceOf[String]))
-
-  /**
-   * Returns a map from string to string that can be used to serialize the column stats.
-   * The key is the name of the field (e.g. "distinctCount" or "min"), and the value is the string
-   * representation for the value. min/max values are converted to the external data type. For
-   * example, for DateType we store java.sql.Date, and for TimestampType we store
-   * java.sql.Timestamp. The deserialization side is defined in [[ColumnStat.fromMap]].
-   *
-   * As part of the protocol, the returned map always contains a key called "version".
-   * In the case min/max values are null (None), they won't appear in the map.
-   */
-  def toMap(colName: String, dataType: DataType): Map[String, String] = {
-    val map = new scala.collection.mutable.HashMap[String, String]
-    map.put(ColumnStat.KEY_VERSION, "1")
-    map.put(ColumnStat.KEY_DISTINCT_COUNT, distinctCount.toString)
-    map.put(ColumnStat.KEY_NULL_COUNT, nullCount.toString)
-    map.put(ColumnStat.KEY_AVG_LEN, avgLen.toString)
-    map.put(ColumnStat.KEY_MAX_LEN, maxLen.toString)
-    min.foreach { v => map.put(ColumnStat.KEY_MIN_VALUE, toExternalString(v, colName, dataType)) }
-    max.foreach { v => map.put(ColumnStat.KEY_MAX_VALUE, toExternalString(v, colName, dataType)) }
-    histogram.foreach { h => map.put(ColumnStat.KEY_HISTOGRAM, HistogramSerializer.serialize(h)) }
-    map.toMap
-  }
-
-  /**
-   * Converts the given value from Catalyst data type to string representation of external
-   * data type.
-   */
-  private def toExternalString(v: Any, colName: String, dataType: DataType): String = {
-    val externalValue = dataType match {
-      case DateType => DateTimeUtils.toJavaDate(v.asInstanceOf[Int])
-      case TimestampType => DateTimeUtils.toJavaTimestamp(v.asInstanceOf[Long])
-      case BooleanType | _: IntegralType | FloatType | DoubleType => v
-      case _: DecimalType => v.asInstanceOf[Decimal].toJavaBigDecimal
-      // This version of Spark does not use min/max for binary/string types so we ignore it.
-      case _ =>
-        throw new AnalysisException("Column statistics deserialization is not supported for " +
-          s"column $colName of data type: $dataType.")
-    }
-    externalValue.toString
-  }
-
-}
+  // Are distinctCount and nullCount statistics defined?
+  val hasCountStats = distinctCount.isDefined && nullCount.isDefined
 
+  // Are min and max statistics defined?
+  val hasMinMaxStats = min.isDefined && max.isDefined
 
-object ColumnStat extends Logging {
-
-  // List of string keys used to serialize ColumnStat
-  val KEY_VERSION = "version"
-  private val KEY_DISTINCT_COUNT = "distinctCount"
-  private val KEY_MIN_VALUE = "min"
-  private val KEY_MAX_VALUE = "max"
-  private val KEY_NULL_COUNT = "nullCount"
-  private val KEY_AVG_LEN = "avgLen"
-  private val KEY_MAX_LEN = "maxLen"
-  private val KEY_HISTOGRAM = "histogram"
-
-  /** Returns true iff the we support gathering column statistics on column of the given type. */
-  def supportsType(dataType: DataType): Boolean = dataType match {
-    case _: IntegralType => true
-    case _: DecimalType => true
-    case DoubleType | FloatType => true
-    case BooleanType => true
-    case DateType => true
-    case TimestampType => true
-    case BinaryType | StringType => true
-    case _ => false
-  }
-
-  /** Returns true iff the we support gathering histogram on column of the given type. */
-  def supportsHistogram(dataType: DataType): Boolean = dataType match {
-    case _: IntegralType => true
-    case _: DecimalType => true
-    case DoubleType | FloatType => true
-    case DateType => true
-    case TimestampType => true
-    case _ => false
-  }
-
-  /**
-   * Creates a [[ColumnStat]] object from the given map. This is used to deserialize column stats
-   * from some external storage. The serialization side is defined in [[ColumnStat.toMap]].
-   */
-  def fromMap(table: String, field: StructField, map: Map[String, String]): Option[ColumnStat] = {
-    try {
-      Some(ColumnStat(
-        distinctCount = BigInt(map(KEY_DISTINCT_COUNT).toLong),
-        // Note that flatMap(Option.apply) turns Option(null) into None.
-        min = map.get(KEY_MIN_VALUE)
-          .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
-        max = map.get(KEY_MAX_VALUE)
-          .map(fromExternalString(_, field.name, field.dataType)).flatMap(Option.apply),
-        nullCount = BigInt(map(KEY_NULL_COUNT).toLong),
-        avgLen = map.getOrElse(KEY_AVG_LEN, field.dataType.defaultSize.toString).toLong,
-        maxLen = map.getOrElse(KEY_MAX_LEN, field.dataType.defaultSize.toString).toLong,
-        histogram = map.get(KEY_HISTOGRAM).map(HistogramSerializer.deserialize)
-      ))
-    } catch {
-      case NonFatal(e) =>
-        logWarning(s"Failed to parse column statistics for column ${field.name} in table $table", e)
-        None
-    }
-  }
-
-  /**
-   * Converts from string representation of external data type to the corresponding Catalyst data
-   * type.
-   */
-  private def fromExternalString(s: String, name: String, dataType: DataType): Any = {
-    dataType match {
-      case BooleanType => s.toBoolean
-      case DateType => DateTimeUtils.fromJavaDate(java.sql.Date.valueOf(s))
-      case TimestampType => DateTimeUtils.fromJavaTimestamp(java.sql.Timestamp.valueOf(s))
-      case ByteType => s.toByte
-      case ShortType => s.toShort
-      case IntegerType => s.toInt
-      case LongType => s.toLong
-      case FloatType => s.toFloat
-      case DoubleType => s.toDouble
-      case _: DecimalType => Decimal(s)
-      // This version of Spark does not use min/max for binary/string types so we ignore it.
-      case BinaryType | StringType => null
-      case _ =>
-        throw new AnalysisException("Column statistics deserialization is not supported for " +
-          s"column $name of data type: $dataType.")
-    }
-  }
-
-  /**
-   * Constructs an expression to compute column statistics for a given column.
-   *
-   * The expression should create a single struct column with the following schema:
-   * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long,
-   * distinctCountsForIntervals: Array[Long]
-   *
-   * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and
-   * as a result should stay in sync with it.
-   */
-  def statExprs(
-      col: Attribute,
-      conf: SQLConf,
-      colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = {
-    def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr =>
-      expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() }
-    })
-    val one = Literal(1, LongType)
-
-    // the approximate ndv (num distinct value) should never be larger than the number of rows
-    val numNonNulls = if (col.nullable) Count(col) else Count(one)
-    val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls))
-    val numNulls = Subtract(Count(one), numNonNulls)
-    val defaultSize = Literal(col.dataType.defaultSize, LongType)
-    val nullArray = Literal(null, ArrayType(LongType))
-
-    def fixedLenTypeStruct: CreateNamedStruct = {
-      val genHistogram =
-        ColumnStat.supportsHistogram(col.dataType) && colPercentiles.contains(col)
-      val intervalNdvsExpr = if (genHistogram) {
-        ApproxCountDistinctForIntervals(col,
-          Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError)
-      } else {
-        nullArray
-      }
-      // For fixed width types, avg size should be the same as max size.
-      struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls,
-        defaultSize, defaultSize, intervalNdvsExpr)
-    }
-
-    col.dataType match {
-      case _: IntegralType => fixedLenTypeStruct
-      case _: DecimalType => fixedLenTypeStruct
-      case DoubleType | FloatType => fixedLenTypeStruct
-      case BooleanType => fixedLenTypeStruct
-      case DateType => fixedLenTypeStruct
-      case TimestampType => fixedLenTypeStruct
-      case BinaryType | StringType =>
-        // For string and binary type, we don't compute min, max or histogram
-        val nullLit = Literal(null, col.dataType)
-        struct(
-          ndv, nullLit, nullLit, numNulls,
-          // Set avg/max size to default size if all the values are null or there is no value.
-          Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
-          Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
-          nullArray)
-      case _ =>
-        throw new AnalysisException("Analyzing column statistics is not supported for column " +
-          s"${col.name} of data type: ${col.dataType}.")
-    }
-  }
-
-  /** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */
-  def rowToColumnStat(
-      row: InternalRow,
-      attr: Attribute,
-      rowCount: Long,
-      percentiles: Option[ArrayData]): ColumnStat = {
-    // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins.
-    val cs = ColumnStat(
-      distinctCount = BigInt(row.getLong(0)),
-      // for string/binary min/max, get should return null
-      min = Option(row.get(1, attr.dataType)),
-      max = Option(row.get(2, attr.dataType)),
-      nullCount = BigInt(row.getLong(3)),
-      avgLen = row.getLong(4),
-      maxLen = row.getLong(5)
-    )
-    if (row.isNullAt(6)) {
-      cs
-    } else {
-      val ndvs = row.getArray(6).toLongArray()
-      assert(percentiles.get.numElements() == ndvs.length + 1)
-      val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
-      // Construct equi-height histogram
-      val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
-        HistogramBin(endpoints(i), endpoints(i + 1), ndv)
-      }
-      val nonNullRows = rowCount - cs.nullCount
-      val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
-      cs.copy(histogram = Some(histogram))
-    }
-  }
+  // Are avgLen and maxLen statistics defined?
+  val hasLenStats = avgLen.isDefined && maxLen.isDefined
 
+  def toCatalogColumnStat(colName: String, dataType: DataType): CatalogColumnStat =
+    CatalogColumnStat(
+      distinctCount = distinctCount,
+      min = min.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
+      max = max.map(CatalogColumnStat.toExternalString(_, colName, dataType)),
+      nullCount = nullCount,
+      avgLen = avgLen,
+      maxLen = maxLen,
+      histogram = histogram)
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
index c41fac4..111c594 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/AggregateEstimation.scala
@@ -32,13 +32,15 @@ object AggregateEstimation {
     val childStats = agg.child.stats
     // Check if we have column stats for all group-by columns.
     val colStatsExist = agg.groupingExpressions.forall { e =>
-      e.isInstanceOf[Attribute] && childStats.attributeStats.contains(e.asInstanceOf[Attribute])
+      e.isInstanceOf[Attribute] &&
+        childStats.attributeStats.get(e.asInstanceOf[Attribute]).exists(_.hasCountStats)
     }
     if (rowCountsExist(agg.child) && colStatsExist) {
       // Multiply distinct counts of group-by columns. This is an upper bound, which assumes
       // the data contains all combinations of distinct values of group-by columns.
       var outputRows: BigInt = agg.groupingExpressions.foldLeft(BigInt(1))(
-        (res, expr) => res * childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount)
+        (res, expr) => res *
+          childStats.attributeStats(expr.asInstanceOf[Attribute]).distinctCount.get)
 
       outputRows = if (agg.groupingExpressions.isEmpty) {
         // If there's no group-by columns, the output is a single row containing values of aggregate

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
index d793f77..0f147f0 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/EstimationUtils.scala
@@ -17,6 +17,7 @@
 
 package org.apache.spark.sql.catalyst.plans.logical.statsEstimation
 
+import scala.collection.mutable
 import scala.collection.mutable.ArrayBuffer
 import scala.math.BigDecimal.RoundingMode
 
@@ -38,9 +39,18 @@ object EstimationUtils {
     }
   }
 
+  /** Check if each attribute has column stat containing distinct and null counts
+   *  in the corresponding statistic. */
+  def columnStatsWithCountsExist(statsAndAttr: (Statistics, Attribute)*): Boolean = {
+    statsAndAttr.forall { case (stats, attr) =>
+      stats.attributeStats.get(attr).map(_.hasCountStats).getOrElse(false)
+    }
+  }
+
+  /** Statistics for a Column containing only NULLs. */
   def nullColumnStat(dataType: DataType, rowCount: BigInt): ColumnStat = {
-    ColumnStat(distinctCount = 0, min = None, max = None, nullCount = rowCount,
-      avgLen = dataType.defaultSize, maxLen = dataType.defaultSize)
+    ColumnStat(distinctCount = Some(0), min = None, max = None, nullCount = Some(rowCount),
+      avgLen = Some(dataType.defaultSize), maxLen = Some(dataType.defaultSize))
   }
 
   /**
@@ -70,13 +80,13 @@ object EstimationUtils {
     // We assign a generic overhead for a Row object, the actual overhead is different for different
     // Row format.
     val sizePerRow = 8 + attributes.map { attr =>
-      if (attrStats.contains(attr)) {
+      if (attrStats.get(attr).map(_.avgLen.isDefined).getOrElse(false)) {
         attr.dataType match {
           case StringType =>
             // UTF8String: base + offset + numBytes
-            attrStats(attr).avgLen + 8 + 4
+            attrStats(attr).avgLen.get + 8 + 4
           case _ =>
-            attrStats(attr).avgLen
+            attrStats(attr).avgLen.get
         }
       } else {
         attr.dataType.defaultSize

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
index 4cc32de..0538c9d 100755
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/FilterEstimation.scala
@@ -225,7 +225,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
       attr: Attribute,
       isNull: Boolean,
       update: Boolean): Option[Double] = {
-    if (!colStatsMap.contains(attr)) {
+    if (!colStatsMap.contains(attr) || !colStatsMap(attr).hasCountStats) {
       logDebug("[CBO] No statistics for " + attr)
       return None
     }
@@ -234,14 +234,14 @@ case class FilterEstimation(plan: Filter) extends Logging {
     val nullPercent: Double = if (rowCountValue == 0) {
       0
     } else {
-      (BigDecimal(colStat.nullCount) / BigDecimal(rowCountValue)).toDouble
+      (BigDecimal(colStat.nullCount.get) / BigDecimal(rowCountValue)).toDouble
     }
 
     if (update) {
       val newStats = if (isNull) {
-        colStat.copy(distinctCount = 0, min = None, max = None)
+        colStat.copy(distinctCount = Some(0), min = None, max = None)
       } else {
-        colStat.copy(nullCount = 0)
+        colStat.copy(nullCount = Some(0))
       }
       colStatsMap.update(attr, newStats)
     }
@@ -322,17 +322,21 @@ case class FilterEstimation(plan: Filter) extends Logging {
         // value.
         val newStats = attr.dataType match {
           case StringType | BinaryType =>
-            colStat.copy(distinctCount = 1, nullCount = 0)
+            colStat.copy(distinctCount = Some(1), nullCount = Some(0))
           case _ =>
-            colStat.copy(distinctCount = 1, min = Some(literal.value),
-              max = Some(literal.value), nullCount = 0)
+            colStat.copy(distinctCount = Some(1), min = Some(literal.value),
+              max = Some(literal.value), nullCount = Some(0))
         }
         colStatsMap.update(attr, newStats)
       }
 
       if (colStat.histogram.isEmpty) {
-        // returns 1/ndv if there is no histogram
-        Some(1.0 / colStat.distinctCount.toDouble)
+        if (!colStat.distinctCount.isEmpty) {
+          // returns 1/ndv if there is no histogram
+          Some(1.0 / colStat.distinctCount.get.toDouble)
+        } else {
+          None
+        }
       } else {
         Some(computeEqualityPossibilityByHistogram(literal, colStat))
       }
@@ -378,13 +382,13 @@ case class FilterEstimation(plan: Filter) extends Logging {
       attr: Attribute,
       hSet: Set[Any],
       update: Boolean): Option[Double] = {
-    if (!colStatsMap.contains(attr)) {
+    if (!colStatsMap.hasDistinctCount(attr)) {
       logDebug("[CBO] No statistics for " + attr)
       return None
     }
 
     val colStat = colStatsMap(attr)
-    val ndv = colStat.distinctCount
+    val ndv = colStat.distinctCount.get
     val dataType = attr.dataType
     var newNdv = ndv
 
@@ -407,8 +411,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
         // 1 and 6. The predicate column IN (1, 2, 3, 4, 5). validQuerySet.size is 5.
         newNdv = ndv.min(BigInt(validQuerySet.size))
         if (update) {
-          val newStats = colStat.copy(distinctCount = newNdv, min = Some(newMin),
-            max = Some(newMax), nullCount = 0)
+          val newStats = colStat.copy(distinctCount = Some(newNdv), min = Some(newMin),
+            max = Some(newMax), nullCount = Some(0))
           colStatsMap.update(attr, newStats)
         }
 
@@ -416,7 +420,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
       case StringType | BinaryType =>
         newNdv = ndv.min(BigInt(hSet.size))
         if (update) {
-          val newStats = colStat.copy(distinctCount = newNdv, nullCount = 0)
+          val newStats = colStat.copy(distinctCount = Some(newNdv), nullCount = Some(0))
           colStatsMap.update(attr, newStats)
         }
     }
@@ -443,12 +447,17 @@ case class FilterEstimation(plan: Filter) extends Logging {
       literal: Literal,
       update: Boolean): Option[Double] = {
 
+    if (!colStatsMap.hasMinMaxStats(attr) || !colStatsMap.hasDistinctCount(attr)) {
+      logDebug("[CBO] No statistics for " + attr)
+      return None
+    }
+
     val colStat = colStatsMap(attr)
     val statsInterval =
       ValueInterval(colStat.min, colStat.max, attr.dataType).asInstanceOf[NumericValueInterval]
     val max = statsInterval.max
     val min = statsInterval.min
-    val ndv = colStat.distinctCount.toDouble
+    val ndv = colStat.distinctCount.get.toDouble
 
     // determine the overlapping degree between predicate interval and column's interval
     val numericLiteral = EstimationUtils.toDouble(literal.value, literal.dataType)
@@ -520,8 +529,8 @@ case class FilterEstimation(plan: Filter) extends Logging {
             newMax = newValue
         }
 
-        val newStats = colStat.copy(distinctCount = ceil(ndv * percent),
-          min = newMin, max = newMax, nullCount = 0)
+        val newStats = colStat.copy(distinctCount = Some(ceil(ndv * percent)),
+          min = newMin, max = newMax, nullCount = Some(0))
 
         colStatsMap.update(attr, newStats)
       }
@@ -637,11 +646,11 @@ case class FilterEstimation(plan: Filter) extends Logging {
       attrRight: Attribute,
       update: Boolean): Option[Double] = {
 
-    if (!colStatsMap.contains(attrLeft)) {
+    if (!colStatsMap.hasCountStats(attrLeft)) {
       logDebug("[CBO] No statistics for " + attrLeft)
       return None
     }
-    if (!colStatsMap.contains(attrRight)) {
+    if (!colStatsMap.hasCountStats(attrRight)) {
       logDebug("[CBO] No statistics for " + attrRight)
       return None
     }
@@ -668,7 +677,7 @@ case class FilterEstimation(plan: Filter) extends Logging {
     val minRight = statsIntervalRight.min
 
     // determine the overlapping degree between predicate interval and column's interval
-    val allNotNull = (colStatLeft.nullCount == 0) && (colStatRight.nullCount == 0)
+    val allNotNull = (colStatLeft.nullCount.get == 0) && (colStatRight.nullCount.get == 0)
     val (noOverlap: Boolean, completeOverlap: Boolean) = op match {
       // Left < Right or Left <= Right
       // - no overlap:
@@ -707,14 +716,14 @@ case class FilterEstimation(plan: Filter) extends Logging {
       case _: EqualTo =>
         ((maxLeft < minRight) || (maxRight < minLeft),
           (minLeft == minRight) && (maxLeft == maxRight) && allNotNull
-          && (colStatLeft.distinctCount == colStatRight.distinctCount)
+          && (colStatLeft.distinctCount.get == colStatRight.distinctCount.get)
         )
       case _: EqualNullSafe =>
         // For null-safe equality, we use a very restrictive condition to evaluate its overlap.
         // If null values exists, we set it to partial overlap.
         (((maxLeft < minRight) || (maxRight < minLeft)) && allNotNull,
           (minLeft == minRight) && (maxLeft == maxRight) && allNotNull
-            && (colStatLeft.distinctCount == colStatRight.distinctCount)
+            && (colStatLeft.distinctCount.get == colStatRight.distinctCount.get)
         )
     }
 
@@ -731,9 +740,9 @@ case class FilterEstimation(plan: Filter) extends Logging {
       if (update) {
         // Need to adjust new min/max after the filter condition is applied
 
-        val ndvLeft = BigDecimal(colStatLeft.distinctCount)
+        val ndvLeft = BigDecimal(colStatLeft.distinctCount.get)
         val newNdvLeft = ceil(ndvLeft * percent)
-        val ndvRight = BigDecimal(colStatRight.distinctCount)
+        val ndvRight = BigDecimal(colStatRight.distinctCount.get)
         val newNdvRight = ceil(ndvRight * percent)
 
         var newMaxLeft = colStatLeft.max
@@ -817,10 +826,10 @@ case class FilterEstimation(plan: Filter) extends Logging {
           }
         }
 
-        val newStatsLeft = colStatLeft.copy(distinctCount = newNdvLeft, min = newMinLeft,
+        val newStatsLeft = colStatLeft.copy(distinctCount = Some(newNdvLeft), min = newMinLeft,
           max = newMaxLeft)
         colStatsMap(attrLeft) = newStatsLeft
-        val newStatsRight = colStatRight.copy(distinctCount = newNdvRight, min = newMinRight,
+        val newStatsRight = colStatRight.copy(distinctCount = Some(newNdvRight), min = newMinRight,
           max = newMaxRight)
         colStatsMap(attrRight) = newStatsRight
       }
@@ -849,17 +858,35 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
   def contains(a: Attribute): Boolean = updatedMap.contains(a.exprId) || originalMap.contains(a)
 
   /**
-   * Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
-   * originalMap, because updatedMap has the latest (updated) column stats.
+   * Gets an Option of column stat for the given attribute.
+   * Prefer the column stat in updatedMap than that in originalMap,
+   * because updatedMap has the latest (updated) column stats.
    */
-  def apply(a: Attribute): ColumnStat = {
+  def get(a: Attribute): Option[ColumnStat] = {
     if (updatedMap.contains(a.exprId)) {
-      updatedMap(a.exprId)._2
+      updatedMap.get(a.exprId).map(_._2)
     } else {
-      originalMap(a)
+      originalMap.get(a)
     }
   }
 
+  def hasCountStats(a: Attribute): Boolean =
+    get(a).map(_.hasCountStats).getOrElse(false)
+
+  def hasDistinctCount(a: Attribute): Boolean =
+    get(a).map(_.distinctCount.isDefined).getOrElse(false)
+
+  def hasMinMaxStats(a: Attribute): Boolean =
+    get(a).map(_.hasCountStats).getOrElse(false)
+
+  /**
+   * Gets column stat for the given attribute. Prefer the column stat in updatedMap than that in
+   * originalMap, because updatedMap has the latest (updated) column stats.
+   */
+  def apply(a: Attribute): ColumnStat = {
+    get(a).get
+  }
+
   /** Updates column stats in updatedMap. */
   def update(a: Attribute, stats: ColumnStat): Unit = updatedMap.update(a.exprId, a -> stats)
 
@@ -871,11 +898,14 @@ case class ColumnStatsMap(originalMap: AttributeMap[ColumnStat]) {
     : AttributeMap[ColumnStat] = {
     val newColumnStats = originalMap.map { case (attr, oriColStat) =>
       val colStat = updatedMap.get(attr.exprId).map(_._2).getOrElse(oriColStat)
-      val newNdv = if (colStat.distinctCount > 1) {
+      val newNdv = if (colStat.distinctCount.isEmpty) {
+        // No NDV in the original stats.
+        None
+      } else if (colStat.distinctCount.get > 1) {
         // Update ndv based on the overall filter selectivity: scale down ndv if the number of rows
         // decreases; otherwise keep it unchanged.
-        EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
-          newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount)
+        Some(EstimationUtils.updateNdv(oldNumRows = rowsBeforeFilter,
+          newNumRows = rowsAfterFilter, oldNdv = oriColStat.distinctCount.get))
       } else {
         // no need to scale down since it is already down to 1 (for skewed distribution case)
         colStat.distinctCount

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
index f0294a4..2543e38 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/statsEstimation/JoinEstimation.scala
@@ -85,7 +85,8 @@ case class JoinEstimation(join: Join) extends Logging {
       // 3. Update statistics based on the output of join
       val inputAttrStats = AttributeMap(
         leftStats.attributeStats.toSeq ++ rightStats.attributeStats.toSeq)
-      val attributesWithStat = join.output.filter(a => inputAttrStats.contains(a))
+      val attributesWithStat = join.output.filter(a =>
+        inputAttrStats.get(a).map(_.hasCountStats).getOrElse(false))
       val (fromLeft, fromRight) = attributesWithStat.partition(join.left.outputSet.contains(_))
 
       val outputStats: Seq[(Attribute, ColumnStat)] = if (outputRows == 0) {
@@ -106,10 +107,10 @@ case class JoinEstimation(join: Join) extends Logging {
           case FullOuter =>
             fromLeft.map { a =>
               val oriColStat = inputAttrStats(a)
-              (a, oriColStat.copy(nullCount = oriColStat.nullCount + rightRows))
+              (a, oriColStat.copy(nullCount = Some(oriColStat.nullCount.get + rightRows)))
             } ++ fromRight.map { a =>
               val oriColStat = inputAttrStats(a)
-              (a, oriColStat.copy(nullCount = oriColStat.nullCount + leftRows))
+              (a, oriColStat.copy(nullCount = Some(oriColStat.nullCount.get + leftRows)))
             }
           case _ =>
             assert(joinType == Inner || joinType == Cross)
@@ -219,19 +220,27 @@ case class JoinEstimation(join: Join) extends Logging {
   private def computeByNdv(
       leftKey: AttributeReference,
       rightKey: AttributeReference,
-      newMin: Option[Any],
-      newMax: Option[Any]): (BigInt, ColumnStat) = {
+      min: Option[Any],
+      max: Option[Any]): (BigInt, ColumnStat) = {
     val leftKeyStat = leftStats.attributeStats(leftKey)
     val rightKeyStat = rightStats.attributeStats(rightKey)
-    val maxNdv = leftKeyStat.distinctCount.max(rightKeyStat.distinctCount)
+    val maxNdv = leftKeyStat.distinctCount.get.max(rightKeyStat.distinctCount.get)
     // Compute cardinality by the basic formula.
     val card = BigDecimal(leftStats.rowCount.get * rightStats.rowCount.get) / BigDecimal(maxNdv)
 
     // Get the intersected column stat.
-    val newNdv = leftKeyStat.distinctCount.min(rightKeyStat.distinctCount)
-    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
-    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
-    val newStats = ColumnStat(newNdv, newMin, newMax, 0, newAvgLen, newMaxLen)
+    val newNdv = Some(leftKeyStat.distinctCount.get.min(rightKeyStat.distinctCount.get))
+    val newMaxLen = if (leftKeyStat.maxLen.isDefined && rightKeyStat.maxLen.isDefined) {
+      Some(math.min(leftKeyStat.maxLen.get, rightKeyStat.maxLen.get))
+    } else {
+      None
+    }
+    val newAvgLen = if (leftKeyStat.avgLen.isDefined && rightKeyStat.avgLen.isDefined) {
+      Some((leftKeyStat.avgLen.get + rightKeyStat.avgLen.get) / 2)
+    } else {
+      None
+    }
+    val newStats = ColumnStat(newNdv, min, max, Some(0), newAvgLen, newMaxLen)
 
     (ceil(card), newStats)
   }
@@ -267,9 +276,17 @@ case class JoinEstimation(join: Join) extends Logging {
 
     val leftKeyStat = leftStats.attributeStats(leftKey)
     val rightKeyStat = rightStats.attributeStats(rightKey)
-    val newMaxLen = math.min(leftKeyStat.maxLen, rightKeyStat.maxLen)
-    val newAvgLen = (leftKeyStat.avgLen + rightKeyStat.avgLen) / 2
-    val newStats = ColumnStat(ceil(totalNdv), newMin, newMax, 0, newAvgLen, newMaxLen)
+    val newMaxLen = if (leftKeyStat.maxLen.isDefined && rightKeyStat.maxLen.isDefined) {
+      Some(math.min(leftKeyStat.maxLen.get, rightKeyStat.maxLen.get))
+    } else {
+      None
+    }
+    val newAvgLen = if (leftKeyStat.avgLen.isDefined && rightKeyStat.avgLen.isDefined) {
+      Some((leftKeyStat.avgLen.get + rightKeyStat.avgLen.get) / 2)
+    } else {
+      None
+    }
+    val newStats = ColumnStat(Some(ceil(totalNdv)), newMin, newMax, Some(0), newAvgLen, newMaxLen)
     (ceil(card), newStats)
   }
 
@@ -292,10 +309,14 @@ case class JoinEstimation(join: Join) extends Logging {
       } else {
         val oldColStat = oldAttrStats(a)
         val oldNdv = oldColStat.distinctCount
-        val newNdv = if (join.left.outputSet.contains(a)) {
-          updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv)
+        val newNdv = if (oldNdv.isDefined) {
+          Some(if (join.left.outputSet.contains(a)) {
+            updateNdv(oldNumRows = leftRows, newNumRows = outputRows, oldNdv = oldNdv.get)
+          } else {
+            updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv.get)
+          })
         } else {
-          updateNdv(oldNumRows = rightRows, newNumRows = outputRows, oldNdv = oldNdv)
+          None
         }
         val newColStat = oldColStat.copy(distinctCount = newNdv)
         // TODO: support nullCount updates for specific outer joins
@@ -313,7 +334,7 @@ case class JoinEstimation(join: Join) extends Logging {
       // Note: join keys from EqualNullSafe also fall into this case (Coalesce), consider to
       // support it in the future by using `nullCount` in column stats.
       case (lk: AttributeReference, rk: AttributeReference)
-        if columnStatsExist((leftStats, lk), (rightStats, rk)) => (lk, rk)
+        if columnStatsWithCountsExist((leftStats, lk), (rightStats, rk)) => (lk, rk)
     }
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
index 2fb587d..565b0a1 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/JoinReorderSuite.scala
@@ -62,24 +62,15 @@ class JoinReorderSuite extends PlanTest with StatsEstimationTestBase {
     }
   }
 
-  /** Set up tables and columns for testing */
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
-    attr("t1.k-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t1.v-1-10") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t2.k-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t3.v-1-100") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t4.k-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t4.v-1-10") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t5.k-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("t5.v-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4)
+    attr("t1.k-1-2") -> rangeColumnStat(2, 0),
+    attr("t1.v-1-10") -> rangeColumnStat(10, 0),
+    attr("t2.k-1-5") -> rangeColumnStat(5, 0),
+    attr("t3.v-1-100") -> rangeColumnStat(100, 0),
+    attr("t4.k-1-2") -> rangeColumnStat(2, 0),
+    attr("t4.v-1-10") -> rangeColumnStat(10, 0),
+    attr("t5.k-1-5") -> rangeColumnStat(5, 0),
+    attr("t5.v-1-5") -> rangeColumnStat(5, 0)
   ))
 
   private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
index ada6e2a..d4d23ad 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinCostBasedReorderSuite.scala
@@ -68,88 +68,56 @@ class StarJoinCostBasedReorderSuite extends PlanTest with StatsEstimationTestBas
 
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
     // F1 (fact table)
-    attr("f1_fk1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_fk2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_fk3") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_c1") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_c2") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("f1_fk1") -> rangeColumnStat(100, 0),
+    attr("f1_fk2") -> rangeColumnStat(100, 0),
+    attr("f1_fk3") -> rangeColumnStat(100, 0),
+    attr("f1_c1") -> rangeColumnStat(100, 0),
+    attr("f1_c2") -> rangeColumnStat(100, 0),
 
     // D1 (dimension)
-    attr("d1_pk") -> ColumnStat(distinctCount = 100, min = Some(1), max = Some(100),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c2") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c3") -> ColumnStat(distinctCount = 50, min = Some(1), max = Some(50),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d1_pk") -> rangeColumnStat(100, 0),
+    attr("d1_c2") -> rangeColumnStat(50, 0),
+    attr("d1_c3") -> rangeColumnStat(50, 0),
 
     // D2 (dimension)
-    attr("d2_pk") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d2_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d2_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d2_pk") -> rangeColumnStat(20, 0),
+    attr("d2_c2") -> rangeColumnStat(10, 0),
+    attr("d2_c3") -> rangeColumnStat(10, 0),
 
     // D3 (dimension)
-    attr("d3_pk") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d3_pk") -> rangeColumnStat(10, 0),
+    attr("d3_c2") -> rangeColumnStat(5, 0),
+    attr("d3_c3") -> rangeColumnStat(5, 0),
 
     // T1 (regular table i.e. outside star)
-    attr("t1_c1") -> ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t1_c2") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t1_c3") -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t1_c1") -> rangeColumnStat(20, 1),
+    attr("t1_c2") -> rangeColumnStat(10, 1),
+    attr("t1_c3") -> rangeColumnStat(10, 1),
 
     // T2 (regular table)
-    attr("t2_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t2_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t2_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t2_c1") -> rangeColumnStat(5, 1),
+    attr("t2_c2") -> rangeColumnStat(5, 1),
+    attr("t2_c3") -> rangeColumnStat(5, 1),
 
     // T3 (regular table)
-    attr("t3_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t3_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t3_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t3_c1") -> rangeColumnStat(5, 1),
+    attr("t3_c2") -> rangeColumnStat(5, 1),
+    attr("t3_c3") -> rangeColumnStat(5, 1),
 
     // T4 (regular table)
-    attr("t4_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t4_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t4_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t4_c1") -> rangeColumnStat(5, 1),
+    attr("t4_c2") -> rangeColumnStat(5, 1),
+    attr("t4_c3") -> rangeColumnStat(5, 1),
 
     // T5 (regular table)
-    attr("t5_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t5_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t5_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
+    attr("t5_c1") -> rangeColumnStat(5, 1),
+    attr("t5_c2") -> rangeColumnStat(5, 1),
+    attr("t5_c3") -> rangeColumnStat(5, 1),
 
     // T6 (regular table)
-    attr("t6_c1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t6_c2") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("t6_c3") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 1, avgLen = 4, maxLen = 4)
+    attr("t6_c1") -> rangeColumnStat(5, 1),
+    attr("t6_c2") -> rangeColumnStat(5, 1),
+    attr("t6_c3") -> rangeColumnStat(5, 1)
 
   ))
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
index 777c563..4e0883e 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/optimizer/StarJoinReorderSuite.scala
@@ -70,59 +70,40 @@ class StarJoinReorderSuite extends PlanTest with StatsEstimationTestBase {
   // Tables' cardinality: f1 > d3 > d1 > d2 > s3
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
     // F1
-    attr("f1_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_fk2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_fk3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f1_c4") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("f1_fk1") -> rangeColumnStat(3, 0),
+    attr("f1_fk2") -> rangeColumnStat(3, 0),
+    attr("f1_fk3") -> rangeColumnStat(4, 0),
+    attr("f1_c4") -> rangeColumnStat(4, 0),
     // D1
-    attr("d1_pk1") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d1_c4") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d1_pk1") -> rangeColumnStat(4, 0),
+    attr("d1_c2") -> rangeColumnStat(3, 0),
+    attr("d1_c3") -> rangeColumnStat(4, 0),
+    attr("d1_c4") -> ColumnStat(distinctCount = Some(2), min = Some("2"), max = Some("3"),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     // D2
-    attr("d2_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 1, avgLen = 4, maxLen = 4),
-    attr("d2_pk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d2_c3") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d2_c4") -> ColumnStat(distinctCount = 2, min = Some(3), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d2_c2") -> ColumnStat(distinctCount = Some(3), min = Some("1"), max = Some("3"),
+      nullCount = Some(1), avgLen = Some(4), maxLen = Some(4)),
+    attr("d2_pk1") -> rangeColumnStat(3, 0),
+    attr("d2_c3") -> rangeColumnStat(3, 0),
+    attr("d2_c4") -> ColumnStat(distinctCount = Some(2), min = Some("3"), max = Some("4"),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     // D3
-    attr("d3_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_c2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_pk1") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("d3_c4") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("d3_fk1") -> rangeColumnStat(3, 0),
+    attr("d3_c2") -> rangeColumnStat(3, 0),
+    attr("d3_pk1") -> rangeColumnStat(5, 0),
+    attr("d3_c4") -> ColumnStat(distinctCount = Some(2), min = Some("2"), max = Some("3"),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     // S3
-    attr("s3_pk1") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("s3_c2") -> ColumnStat(distinctCount = 1, min = Some(3), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("s3_c3") -> ColumnStat(distinctCount = 1, min = Some(3), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("s3_c4") -> ColumnStat(distinctCount = 2, min = Some(3), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
+    attr("s3_pk1") -> rangeColumnStat(2, 0),
+    attr("s3_c2") -> rangeColumnStat(1, 0),
+    attr("s3_c3") -> rangeColumnStat(1, 0),
+    attr("s3_c4") -> ColumnStat(distinctCount = Some(2), min = Some("3"), max = Some("4"),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
     // F11
-    attr("f11_fk1") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f11_fk2") -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f11_fk3") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4),
-    attr("f11_c4") -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(4),
-      nullCount = 0, avgLen = 4, maxLen = 4)
+    attr("f11_fk1") -> rangeColumnStat(3, 0),
+    attr("f11_fk2") -> rangeColumnStat(3, 0),
+    attr("f11_fk3") -> rangeColumnStat(4, 0),
+    attr("f11_c4") -> rangeColumnStat(4, 0)
   ))
 
   private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
index 23f95a6..8213d56 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/AggregateEstimationSuite.scala
@@ -29,16 +29,16 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
 
   /** Columns for testing */
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
-    attr("key11") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key12") -> ColumnStat(distinctCount = 4, min = Some(10), max = Some(40), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key21") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key22") -> ColumnStat(distinctCount = 2, min = Some(10), max = Some(20), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key31") -> ColumnStat(distinctCount = 0, min = None, max = None, nullCount = 0,
-      avgLen = 4, maxLen = 4)
+    attr("key11") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key12") -> ColumnStat(distinctCount = Some(4), min = Some(10), max = Some(40),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key21") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key22") -> ColumnStat(distinctCount = Some(2), min = Some(10), max = Some(20),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key31") -> ColumnStat(distinctCount = Some(0), min = None, max = None,
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
   ))
 
   private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
@@ -63,8 +63,8 @@ class AggregateEstimationSuite extends StatsEstimationTestBase with PlanTest {
       tableRowCount = 6,
       groupByColumns = Seq("key21", "key22"),
       // Row count = product of ndv
-      expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount * nameToColInfo("key22")._2
-        .distinctCount)
+      expectedOutputRowCount = nameToColInfo("key21")._2.distinctCount.get *
+        nameToColInfo("key22")._2.distinctCount.get)
   }
 
   test("empty group-by column") {

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
index 7d532ff..953094c 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/BasicStatsEstimationSuite.scala
@@ -28,8 +28,8 @@ import org.apache.spark.sql.types.IntegerType
 
 class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
   val attribute = attr("key")
-  val colStat = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-    nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStat = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   val plan = StatsTestPlan(
     outputList = Seq(attribute),
@@ -116,13 +116,17 @@ class BasicStatsEstimationSuite extends PlanTest with StatsEstimationTestBase {
         sizeInBytes = 40,
         rowCount = Some(10),
         attributeStats = AttributeMap(Seq(
-          AttributeReference("c1", IntegerType)() -> ColumnStat(10, Some(1), Some(10), 0, 4, 4))))
+          AttributeReference("c1", IntegerType)() -> ColumnStat(distinctCount = Some(10),
+            min = Some(1), max = Some(10),
+            nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))))
     val expectedCboStats =
       Statistics(
         sizeInBytes = 4,
         rowCount = Some(1),
         attributeStats = AttributeMap(Seq(
-          AttributeReference("c1", IntegerType)() -> ColumnStat(1, Some(5), Some(5), 0, 4, 4))))
+          AttributeReference("c1", IntegerType)() -> ColumnStat(distinctCount = Some(10),
+            min = Some(5), max = Some(5),
+            nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))))
 
     val plan = DummyLogicalPlan(defaultStats = expectedDefaultStats, cboStats = expectedCboStats)
     checkStats(


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/3] spark git commit: [SPARK-23445] ColumnStat refactoring

Posted by li...@apache.org.
http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
index 2b1fe98..43440d5 100755
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/FilterEstimationSuite.scala
@@ -37,59 +37,61 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   // column cint has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10
   // Hence, distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
   val attrInt = AttributeReference("cint", IntegerType)()
-  val colStatInt = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-    nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatInt = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cbool has only 2 distinct values
   val attrBool = AttributeReference("cbool", BooleanType)()
-  val colStatBool = ColumnStat(distinctCount = 2, min = Some(false), max = Some(true),
-    nullCount = 0, avgLen = 1, maxLen = 1)
+  val colStatBool = ColumnStat(distinctCount = Some(2), min = Some(false), max = Some(true),
+    nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))
 
   // column cdate has 10 values from 2017-01-01 through 2017-01-10.
   val dMin = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-01"))
   val dMax = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-10"))
   val attrDate = AttributeReference("cdate", DateType)()
-  val colStatDate = ColumnStat(distinctCount = 10, min = Some(dMin), max = Some(dMax),
-    nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatDate = ColumnStat(distinctCount = Some(10),
+    min = Some(dMin), max = Some(dMax),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cdecimal has 4 values from 0.20 through 0.80 at increment of 0.20.
   val decMin = Decimal("0.200000000000000000")
   val decMax = Decimal("0.800000000000000000")
   val attrDecimal = AttributeReference("cdecimal", DecimalType(18, 18))()
-  val colStatDecimal = ColumnStat(distinctCount = 4, min = Some(decMin), max = Some(decMax),
-    nullCount = 0, avgLen = 8, maxLen = 8)
+  val colStatDecimal = ColumnStat(distinctCount = Some(4),
+    min = Some(decMin), max = Some(decMax),
+    nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
 
   // column cdouble has 10 double values: 1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 7.0, 8.0, 9.0, 10.0
   val attrDouble = AttributeReference("cdouble", DoubleType)()
-  val colStatDouble = ColumnStat(distinctCount = 10, min = Some(1.0), max = Some(10.0),
-    nullCount = 0, avgLen = 8, maxLen = 8)
+  val colStatDouble = ColumnStat(distinctCount = Some(10), min = Some(1.0), max = Some(10.0),
+    nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
 
   // column cstring has 10 String values:
   // "A0", "A1", "A2", "A3", "A4", "A5", "A6", "A7", "A8", "A9"
   val attrString = AttributeReference("cstring", StringType)()
-  val colStatString = ColumnStat(distinctCount = 10, min = None, max = None,
-    nullCount = 0, avgLen = 2, maxLen = 2)
+  val colStatString = ColumnStat(distinctCount = Some(10), min = None, max = None,
+    nullCount = Some(0), avgLen = Some(2), maxLen = Some(2))
 
   // column cint2 has values: 7, 8, 9, 10, 11, 12, 13, 14, 15, 16
   // Hence, distinctCount:10, min:7, max:16, nullCount:0, avgLen:4, maxLen:4
   // This column is created to test "cint < cint2
   val attrInt2 = AttributeReference("cint2", IntegerType)()
-  val colStatInt2 = ColumnStat(distinctCount = 10, min = Some(7), max = Some(16),
-    nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatInt2 = ColumnStat(distinctCount = Some(10), min = Some(7), max = Some(16),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cint3 has values: 30, 31, 32, 33, 34, 35, 36, 37, 38, 39
   // Hence, distinctCount:10, min:30, max:39, nullCount:0, avgLen:4, maxLen:4
   // This column is created to test "cint = cint3 without overlap at all.
   val attrInt3 = AttributeReference("cint3", IntegerType)()
-  val colStatInt3 = ColumnStat(distinctCount = 10, min = Some(30), max = Some(39),
-    nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatInt3 = ColumnStat(distinctCount = Some(10), min = Some(30), max = Some(39),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cint4 has values in the range from 1 to 10
   // distinctCount:10, min:1, max:10, nullCount:0, avgLen:4, maxLen:4
   // This column is created to test complete overlap
   val attrInt4 = AttributeReference("cint4", IntegerType)()
-  val colStatInt4 = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-    nullCount = 0, avgLen = 4, maxLen = 4)
+  val colStatInt4 = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
   // column cintHgm has values: 1, 2, 3, 4, 5, 6, 7, 8, 9, 10 with histogram.
   // Note that cintHgm has an even distribution with histogram information built.
@@ -98,8 +100,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   val hgmInt = Histogram(2.0, Array(HistogramBin(1.0, 2.0, 2),
     HistogramBin(2.0, 4.0, 2), HistogramBin(4.0, 6.0, 2),
     HistogramBin(6.0, 8.0, 2), HistogramBin(8.0, 10.0, 2)))
-  val colStatIntHgm = ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-    nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))
+  val colStatIntHgm = ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))
 
   // column cintSkewHgm has values: 1, 4, 4, 5, 5, 5, 5, 6, 6, 10 with histogram.
   // Note that cintSkewHgm has a skewed distribution with histogram information built.
@@ -108,8 +110,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   val hgmIntSkew = Histogram(2.0, Array(HistogramBin(1.0, 4.0, 2),
     HistogramBin(4.0, 5.0, 2), HistogramBin(5.0, 5.0, 1),
     HistogramBin(5.0, 6.0, 2), HistogramBin(6.0, 10.0, 2)))
-  val colStatIntSkewHgm = ColumnStat(distinctCount = 5, min = Some(1), max = Some(10),
-    nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))
+  val colStatIntSkewHgm = ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(10),
+    nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))
 
   val attributeMap = AttributeMap(Seq(
     attrInt -> colStatInt,
@@ -172,7 +174,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Or(LessThan(attrInt, Literal(3)), Literal(null, IntegerType))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> colStatInt.copy(distinctCount = 3)),
+      Seq(attrInt -> colStatInt.copy(distinctCount = Some(3))),
       expectedRowCount = 3)
   }
 
@@ -180,7 +182,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Not(And(LessThan(attrInt, Literal(3)), Literal(null, IntegerType)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> colStatInt.copy(distinctCount = 8)),
+      Seq(attrInt -> colStatInt.copy(distinctCount = Some(8))),
       expectedRowCount = 8)
   }
 
@@ -196,23 +198,23 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Not(And(LessThan(attrInt, Literal(3)), Not(Literal(null, IntegerType))))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> colStatInt.copy(distinctCount = 8)),
+      Seq(attrInt -> colStatInt.copy(distinctCount = Some(8))),
       expectedRowCount = 8)
   }
 
   test("cint = 2") {
     validateEstimatedStats(
       Filter(EqualTo(attrInt, Literal(2)), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 1, min = Some(2), max = Some(2),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(1), min = Some(2), max = Some(2),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 1)
   }
 
   test("cint <=> 2") {
     validateEstimatedStats(
       Filter(EqualNullSafe(attrInt, Literal(2)), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 1, min = Some(2), max = Some(2),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(1), min = Some(2), max = Some(2),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 1)
   }
 
@@ -227,8 +229,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   test("cint < 3") {
     validateEstimatedStats(
       Filter(LessThan(attrInt, Literal(3)), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(3), min = Some(1), max = Some(3),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 3)
   }
 
@@ -243,16 +245,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   test("cint <= 3") {
     validateEstimatedStats(
       Filter(LessThanOrEqual(attrInt, Literal(3)), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(3), min = Some(1), max = Some(3),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 3)
   }
 
   test("cint > 6") {
     validateEstimatedStats(
       Filter(GreaterThan(attrInt, Literal(6)), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(5), min = Some(6), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 5)
   }
 
@@ -267,8 +269,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   test("cint >= 6") {
     validateEstimatedStats(
       Filter(GreaterThanOrEqual(attrInt, Literal(6)), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(5), min = Some(6), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 5)
   }
 
@@ -282,8 +284,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   test("cint IS NOT NULL") {
     validateEstimatedStats(
       Filter(IsNotNull(attrInt), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 10)
   }
 
@@ -301,8 +303,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = And(GreaterThan(attrInt, Literal(3)), LessThanOrEqual(attrInt, Literal(6)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(3), max = Some(6),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 4)
   }
 
@@ -310,7 +312,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Or(EqualTo(attrInt, Literal(3)), EqualTo(attrInt, Literal(6)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> colStatInt.copy(distinctCount = 2)),
+      Seq(attrInt -> colStatInt.copy(distinctCount = Some(2))),
       expectedRowCount = 2)
   }
 
@@ -318,7 +320,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Not(And(GreaterThan(attrInt, Literal(3)), LessThanOrEqual(attrInt, Literal(6))))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> colStatInt.copy(distinctCount = 6)),
+      Seq(attrInt -> colStatInt.copy(distinctCount = Some(6))),
       expectedRowCount = 6)
   }
 
@@ -326,7 +328,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Not(Or(LessThanOrEqual(attrInt, Literal(3)), GreaterThan(attrInt, Literal(6))))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> colStatInt.copy(distinctCount = 5)),
+      Seq(attrInt -> colStatInt.copy(distinctCount = Some(5))),
       expectedRowCount = 5)
   }
 
@@ -342,47 +344,47 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Not(Or(EqualTo(attrInt, Literal(3)), LessThan(attrString, Literal("A8"))))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt, attrString), 10L)),
-      Seq(attrInt -> colStatInt.copy(distinctCount = 9),
-        attrString -> colStatString.copy(distinctCount = 9)),
+      Seq(attrInt -> colStatInt.copy(distinctCount = Some(9)),
+        attrString -> colStatString.copy(distinctCount = Some(9))),
       expectedRowCount = 9)
   }
 
   test("cint IN (3, 4, 5)") {
     validateEstimatedStats(
       Filter(InSet(attrInt, Set(3, 4, 5)), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 3, min = Some(3), max = Some(5),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(3), min = Some(3), max = Some(5),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 3)
   }
 
   test("cint NOT IN (3, 4, 5)") {
     validateEstimatedStats(
       Filter(Not(InSet(attrInt, Set(3, 4, 5))), childStatsTestPlan(Seq(attrInt), 10L)),
-      Seq(attrInt -> colStatInt.copy(distinctCount = 7)),
+      Seq(attrInt -> colStatInt.copy(distinctCount = Some(7))),
       expectedRowCount = 7)
   }
 
   test("cbool IN (true)") {
     validateEstimatedStats(
       Filter(InSet(attrBool, Set(true)), childStatsTestPlan(Seq(attrBool), 10L)),
-      Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = Some(true),
-        nullCount = 0, avgLen = 1, maxLen = 1)),
+      Seq(attrBool -> ColumnStat(distinctCount = Some(1), min = Some(true), max = Some(true),
+        nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))),
       expectedRowCount = 5)
   }
 
   test("cbool = true") {
     validateEstimatedStats(
       Filter(EqualTo(attrBool, Literal(true)), childStatsTestPlan(Seq(attrBool), 10L)),
-      Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(true), max = Some(true),
-        nullCount = 0, avgLen = 1, maxLen = 1)),
+      Seq(attrBool -> ColumnStat(distinctCount = Some(1), min = Some(true), max = Some(true),
+        nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))),
       expectedRowCount = 5)
   }
 
   test("cbool > false") {
     validateEstimatedStats(
       Filter(GreaterThan(attrBool, Literal(false)), childStatsTestPlan(Seq(attrBool), 10L)),
-      Seq(attrBool -> ColumnStat(distinctCount = 1, min = Some(false), max = Some(true),
-        nullCount = 0, avgLen = 1, maxLen = 1)),
+      Seq(attrBool -> ColumnStat(distinctCount = Some(1), min = Some(false), max = Some(true),
+        nullCount = Some(0), avgLen = Some(1), maxLen = Some(1))),
       expectedRowCount = 5)
   }
 
@@ -391,18 +393,21 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     validateEstimatedStats(
       Filter(EqualTo(attrDate, Literal(d20170102, DateType)),
         childStatsTestPlan(Seq(attrDate), 10L)),
-      Seq(attrDate -> ColumnStat(distinctCount = 1, min = Some(d20170102), max = Some(d20170102),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrDate -> ColumnStat(distinctCount = Some(1),
+        min = Some(d20170102), max = Some(d20170102),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 1)
   }
 
   test("cdate < cast('2017-01-03' AS DATE)") {
+    val d20170101 = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-01"))
     val d20170103 = DateTimeUtils.fromJavaDate(Date.valueOf("2017-01-03"))
     validateEstimatedStats(
       Filter(LessThan(attrDate, Literal(d20170103, DateType)),
         childStatsTestPlan(Seq(attrDate), 10L)),
-      Seq(attrDate -> ColumnStat(distinctCount = 3, min = Some(dMin), max = Some(d20170103),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrDate -> ColumnStat(distinctCount = Some(3),
+        min = Some(d20170101), max = Some(d20170103),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 3)
   }
 
@@ -414,8 +419,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     validateEstimatedStats(
       Filter(In(attrDate, Seq(Literal(d20170103, DateType), Literal(d20170104, DateType),
         Literal(d20170105, DateType))), childStatsTestPlan(Seq(attrDate), 10L)),
-      Seq(attrDate -> ColumnStat(distinctCount = 3, min = Some(d20170103), max = Some(d20170105),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrDate -> ColumnStat(distinctCount = Some(3),
+        min = Some(d20170103), max = Some(d20170105),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 3)
   }
 
@@ -424,42 +430,45 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     validateEstimatedStats(
       Filter(EqualTo(attrDecimal, Literal(dec_0_40)),
         childStatsTestPlan(Seq(attrDecimal), 4L)),
-      Seq(attrDecimal -> ColumnStat(distinctCount = 1, min = Some(dec_0_40), max = Some(dec_0_40),
-        nullCount = 0, avgLen = 8, maxLen = 8)),
+      Seq(attrDecimal -> ColumnStat(distinctCount = Some(1),
+        min = Some(dec_0_40), max = Some(dec_0_40),
+        nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))),
       expectedRowCount = 1)
   }
 
   test("cdecimal < 0.60 ") {
+    val dec_0_20 = Decimal("0.200000000000000000")
     val dec_0_60 = Decimal("0.600000000000000000")
     validateEstimatedStats(
       Filter(LessThan(attrDecimal, Literal(dec_0_60)),
         childStatsTestPlan(Seq(attrDecimal), 4L)),
-      Seq(attrDecimal -> ColumnStat(distinctCount = 3, min = Some(decMin), max = Some(dec_0_60),
-        nullCount = 0, avgLen = 8, maxLen = 8)),
+      Seq(attrDecimal -> ColumnStat(distinctCount = Some(3),
+        min = Some(dec_0_20), max = Some(dec_0_60),
+        nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))),
       expectedRowCount = 3)
   }
 
   test("cdouble < 3.0") {
     validateEstimatedStats(
       Filter(LessThan(attrDouble, Literal(3.0)), childStatsTestPlan(Seq(attrDouble), 10L)),
-      Seq(attrDouble -> ColumnStat(distinctCount = 3, min = Some(1.0), max = Some(3.0),
-        nullCount = 0, avgLen = 8, maxLen = 8)),
+      Seq(attrDouble -> ColumnStat(distinctCount = Some(3), min = Some(1.0), max = Some(3.0),
+        nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))),
       expectedRowCount = 3)
   }
 
   test("cstring = 'A2'") {
     validateEstimatedStats(
       Filter(EqualTo(attrString, Literal("A2")), childStatsTestPlan(Seq(attrString), 10L)),
-      Seq(attrString -> ColumnStat(distinctCount = 1, min = None, max = None,
-        nullCount = 0, avgLen = 2, maxLen = 2)),
+      Seq(attrString -> ColumnStat(distinctCount = Some(1), min = None, max = None,
+        nullCount = Some(0), avgLen = Some(2), maxLen = Some(2))),
       expectedRowCount = 1)
   }
 
   test("cstring < 'A2' - unsupported condition") {
     validateEstimatedStats(
       Filter(LessThan(attrString, Literal("A2")), childStatsTestPlan(Seq(attrString), 10L)),
-      Seq(attrString -> ColumnStat(distinctCount = 10, min = None, max = None,
-        nullCount = 0, avgLen = 2, maxLen = 2)),
+      Seq(attrString -> ColumnStat(distinctCount = Some(10), min = None, max = None,
+        nullCount = Some(0), avgLen = Some(2), maxLen = Some(2))),
       expectedRowCount = 10)
   }
 
@@ -468,8 +477,9 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     // valid values in IN clause is greater than the number of distinct values for a given column.
     // For example, column has only 2 distinct values 1 and 6.
     // The predicate is: column IN (1, 2, 3, 4, 5).
-    val cornerChildColStatInt = ColumnStat(distinctCount = 2, min = Some(1), max = Some(6),
-      nullCount = 0, avgLen = 4, maxLen = 4)
+    val cornerChildColStatInt = ColumnStat(distinctCount = Some(2),
+      min = Some(1), max = Some(6),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
     val cornerChildStatsTestplan = StatsTestPlan(
       outputList = Seq(attrInt),
       rowCount = 2L,
@@ -477,16 +487,17 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     )
     validateEstimatedStats(
       Filter(InSet(attrInt, Set(1, 2, 3, 4, 5)), cornerChildStatsTestplan),
-      Seq(attrInt -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(5),
-        nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(5),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 2)
   }
 
   // This is a limitation test. We should remove it after the limitation is removed.
   test("don't estimate IsNull or IsNotNull if the child is a non-leaf node") {
     val attrIntLargerRange = AttributeReference("c1", IntegerType)()
-    val colStatIntLargerRange = ColumnStat(distinctCount = 20, min = Some(1), max = Some(20),
-      nullCount = 10, avgLen = 4, maxLen = 4)
+    val colStatIntLargerRange = ColumnStat(distinctCount = Some(20),
+      min = Some(1), max = Some(20),
+      nullCount = Some(10), avgLen = Some(4), maxLen = Some(4))
     val smallerTable = childStatsTestPlan(Seq(attrInt), 10L)
     val largerTable = StatsTestPlan(
       outputList = Seq(attrIntLargerRange),
@@ -508,10 +519,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     // partial overlap case
     validateEstimatedStats(
       Filter(EqualTo(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4),
-        attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
-          nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        attrInt2 -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(10),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 4)
   }
 
@@ -519,10 +530,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     // partial overlap case
     validateEstimatedStats(
       Filter(GreaterThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4),
-        attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(10),
-          nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        attrInt2 -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(10),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 4)
   }
 
@@ -530,10 +541,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     // partial overlap case
     validateEstimatedStats(
       Filter(LessThan(attrInt, attrInt2), childStatsTestPlan(Seq(attrInt, attrInt2), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4),
-        attrInt2 -> ColumnStat(distinctCount = 4, min = Some(7), max = Some(16),
-          nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(1), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        attrInt2 -> ColumnStat(distinctCount = Some(4), min = Some(7), max = Some(16),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 4)
   }
 
@@ -541,10 +552,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     // complete overlap case
     validateEstimatedStats(
       Filter(EqualTo(attrInt, attrInt4), childStatsTestPlan(Seq(attrInt, attrInt4), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4),
-        attrInt4 -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-          nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        attrInt4 -> ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 10)
   }
 
@@ -552,10 +563,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     // partial overlap case
     validateEstimatedStats(
       Filter(LessThan(attrInt, attrInt4), childStatsTestPlan(Seq(attrInt, attrInt4), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4),
-        attrInt4 -> ColumnStat(distinctCount = 4, min = Some(1), max = Some(10),
-          nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(4), min = Some(1), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        attrInt4 -> ColumnStat(distinctCount = Some(4), min = Some(1), max = Some(10),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 4)
   }
 
@@ -571,10 +582,10 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     // all table records qualify.
     validateEstimatedStats(
       Filter(LessThan(attrInt, attrInt3), childStatsTestPlan(Seq(attrInt, attrInt3), 10L)),
-      Seq(attrInt -> ColumnStat(distinctCount = 10, min = Some(1), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4),
-        attrInt3 -> ColumnStat(distinctCount = 10, min = Some(30), max = Some(39),
-          nullCount = 0, avgLen = 4, maxLen = 4)),
+      Seq(attrInt -> ColumnStat(distinctCount = Some(10), min = Some(1), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        attrInt3 -> ColumnStat(distinctCount = Some(10), min = Some(30), max = Some(39),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))),
       expectedRowCount = 10)
   }
 
@@ -592,11 +603,11 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrInt, attrInt4, attrString), 10L)),
       Seq(
-        attrInt -> ColumnStat(distinctCount = 5, min = Some(3), max = Some(10),
-          nullCount = 0, avgLen = 4, maxLen = 4),
-        attrInt4 -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(6),
-          nullCount = 0, avgLen = 4, maxLen = 4),
-        attrString -> colStatString.copy(distinctCount = 5)),
+        attrInt -> ColumnStat(distinctCount = Some(5), min = Some(3), max = Some(10),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        attrInt4 -> ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(6),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        attrString -> colStatString.copy(distinctCount = Some(5))),
       expectedRowCount = 5)
   }
 
@@ -606,15 +617,15 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Not(And(LessThan(attrIntHgm, Literal(3)), Literal(null, IntegerType)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
-      Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 7)),
+      Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = Some(7))),
       expectedRowCount = 7)
   }
 
   test("cintHgm = 5") {
     validateEstimatedStats(
       Filter(EqualTo(attrIntHgm, Literal(5)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
-      Seq(attrIntHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = Some(5),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = Some(1), min = Some(5), max = Some(5),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
       expectedRowCount = 1)
   }
 
@@ -629,8 +640,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   test("cintHgm < 3") {
     validateEstimatedStats(
       Filter(LessThan(attrIntHgm, Literal(3)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
-      Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = Some(3), min = Some(1), max = Some(3),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
       expectedRowCount = 3)
   }
 
@@ -645,16 +656,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   test("cintHgm <= 3") {
     validateEstimatedStats(
       Filter(LessThanOrEqual(attrIntHgm, Literal(3)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
-      Seq(attrIntHgm -> ColumnStat(distinctCount = 3, min = Some(1), max = Some(3),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = Some(3), min = Some(1), max = Some(3),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
       expectedRowCount = 3)
   }
 
   test("cintHgm > 6") {
     validateEstimatedStats(
       Filter(GreaterThan(attrIntHgm, Literal(6)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
-      Seq(attrIntHgm -> ColumnStat(distinctCount = 4, min = Some(6), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = Some(4), min = Some(6), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
       expectedRowCount = 4)
   }
 
@@ -669,8 +680,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   test("cintHgm >= 6") {
     validateEstimatedStats(
       Filter(GreaterThanOrEqual(attrIntHgm, Literal(6)), childStatsTestPlan(Seq(attrIntHgm), 10L)),
-      Seq(attrIntHgm -> ColumnStat(distinctCount = 5, min = Some(6), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = Some(5), min = Some(6), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
       expectedRowCount = 5)
   }
 
@@ -679,8 +690,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
       Literal(3)), LessThanOrEqual(attrIntHgm, Literal(6)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
-      Seq(attrIntHgm -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmInt))),
+      Seq(attrIntHgm -> ColumnStat(distinctCount = Some(4), min = Some(3), max = Some(6),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmInt))),
       expectedRowCount = 4)
   }
 
@@ -688,7 +699,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Or(EqualTo(attrIntHgm, Literal(3)), EqualTo(attrIntHgm, Literal(6)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrIntHgm), 10L)),
-      Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = 3)),
+      Seq(attrIntHgm -> colStatIntHgm.copy(distinctCount = Some(3))),
       expectedRowCount = 3)
   }
 
@@ -698,15 +709,15 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Not(And(LessThan(attrIntSkewHgm, Literal(3)), Literal(null, IntegerType)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
-      Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = 5)),
+      Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = Some(5))),
       expectedRowCount = 9)
   }
 
   test("cintSkewHgm = 5") {
     validateEstimatedStats(
       Filter(EqualTo(attrIntSkewHgm, Literal(5)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
-      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(5), max = Some(5),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(1), min = Some(5), max = Some(5),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
       expectedRowCount = 4)
   }
 
@@ -721,8 +732,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
   test("cintSkewHgm < 3") {
     validateEstimatedStats(
       Filter(LessThan(attrIntSkewHgm, Literal(3)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
-      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(3),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(1), min = Some(1), max = Some(3),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
       expectedRowCount = 2)
   }
 
@@ -738,16 +749,16 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     validateEstimatedStats(
       Filter(LessThanOrEqual(attrIntSkewHgm, Literal(3)),
         childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
-      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(1), max = Some(3),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(1), min = Some(1), max = Some(3),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
       expectedRowCount = 2)
   }
 
   test("cintSkewHgm > 6") {
     validateEstimatedStats(
       Filter(GreaterThan(attrIntSkewHgm, Literal(6)), childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
-      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 1, min = Some(6), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(1), min = Some(6), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
       expectedRowCount = 2)
   }
 
@@ -764,8 +775,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     validateEstimatedStats(
       Filter(GreaterThanOrEqual(attrIntSkewHgm, Literal(6)),
         childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
-      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 2, min = Some(6), max = Some(10),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(2), min = Some(6), max = Some(10),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
       expectedRowCount = 3)
   }
 
@@ -774,8 +785,8 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
       Literal(3)), LessThanOrEqual(attrIntSkewHgm, Literal(6)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
-      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = 4, min = Some(3), max = Some(6),
-        nullCount = 0, avgLen = 4, maxLen = 4, histogram = Some(hgmIntSkew))),
+      Seq(attrIntSkewHgm -> ColumnStat(distinctCount = Some(4), min = Some(3), max = Some(6),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4), histogram = Some(hgmIntSkew))),
       expectedRowCount = 8)
   }
 
@@ -783,7 +794,7 @@ class FilterEstimationSuite extends StatsEstimationTestBase {
     val condition = Or(EqualTo(attrIntSkewHgm, Literal(3)), EqualTo(attrIntSkewHgm, Literal(6)))
     validateEstimatedStats(
       Filter(condition, childStatsTestPlan(Seq(attrIntSkewHgm), 10L)),
-      Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = 2)),
+      Seq(attrIntSkewHgm -> colStatIntSkewHgm.copy(distinctCount = Some(2))),
       expectedRowCount = 3)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
index 26139d8..12c0a7b 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/JoinEstimationSuite.scala
@@ -33,16 +33,16 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
 
   /** Set up tables and its columns for testing */
   private val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
-    attr("key-1-5") -> ColumnStat(distinctCount = 5, min = Some(1), max = Some(5), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key-5-9") -> ColumnStat(distinctCount = 5, min = Some(5), max = Some(9), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key-1-2") -> ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key-2-4") -> ColumnStat(distinctCount = 3, min = Some(2), max = Some(4), nullCount = 0,
-      avgLen = 4, maxLen = 4),
-    attr("key-2-3") -> ColumnStat(distinctCount = 2, min = Some(2), max = Some(3), nullCount = 0,
-      avgLen = 4, maxLen = 4)
+    attr("key-1-5") -> ColumnStat(distinctCount = Some(5), min = Some(1), max = Some(5),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key-5-9") -> ColumnStat(distinctCount = Some(5), min = Some(5), max = Some(9),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key-1-2") -> ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key-2-4") -> ColumnStat(distinctCount = Some(3), min = Some(2), max = Some(4),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+    attr("key-2-3") -> ColumnStat(distinctCount = Some(2), min = Some(2), max = Some(3),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
   ))
 
   private val nameToAttr: Map[String, Attribute] = columnInfo.map(kv => kv._1.name -> kv._1)
@@ -70,8 +70,8 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
   private def estimateByHistogram(
       leftHistogram: Histogram,
       rightHistogram: Histogram,
-      expectedMin: Double,
-      expectedMax: Double,
+      expectedMin: Any,
+      expectedMax: Any,
       expectedNdv: Long,
       expectedRows: Long): Unit = {
     val col1 = attr("key1")
@@ -86,9 +86,11 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
       rowCount = Some(expectedRows),
       attributeStats = AttributeMap(Seq(
         col1 -> c1.stats.attributeStats(col1).copy(
-          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax)),
+          distinctCount = Some(expectedNdv),
+          min = Some(expectedMin), max = Some(expectedMax)),
         col2 -> c2.stats.attributeStats(col2).copy(
-          distinctCount = expectedNdv, min = Some(expectedMin), max = Some(expectedMax))))
+          distinctCount = Some(expectedNdv),
+          min = Some(expectedMin), max = Some(expectedMax))))
     )
 
     // Join order should not affect estimation result.
@@ -100,9 +102,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
   private def generateJoinChild(
       col: Attribute,
       histogram: Histogram,
-      expectedMin: Double,
-      expectedMax: Double): LogicalPlan = {
-    val colStat = inferColumnStat(histogram)
+      expectedMin: Any,
+      expectedMax: Any): LogicalPlan = {
+    val colStat = inferColumnStat(histogram, expectedMin, expectedMax)
     StatsTestPlan(
       outputList = Seq(col),
       rowCount = (histogram.height * histogram.bins.length).toLong,
@@ -110,7 +112,11 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
   }
 
   /** Column statistics should be consistent with histograms in tests. */
-  private def inferColumnStat(histogram: Histogram): ColumnStat = {
+  private def inferColumnStat(
+      histogram: Histogram,
+      expectedMin: Any,
+      expectedMax: Any): ColumnStat = {
+
     var ndv = 0L
     for (i <- histogram.bins.indices) {
       val bin = histogram.bins(i)
@@ -118,8 +124,9 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         ndv += bin.ndv
       }
     }
-    ColumnStat(distinctCount = ndv, min = Some(histogram.bins.head.lo),
-      max = Some(histogram.bins.last.hi), nullCount = 0, avgLen = 4, maxLen = 4,
+    ColumnStat(distinctCount = Some(ndv),
+      min = Some(expectedMin), max = Some(expectedMax),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4),
       histogram = Some(histogram))
   }
 
@@ -343,10 +350,10 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
       rowCount = Some(5 + 3),
       attributeStats = AttributeMap(
         // Update null count in column stats.
-        Seq(nameToAttr("key-1-5") -> columnInfo(nameToAttr("key-1-5")).copy(nullCount = 3),
-          nameToAttr("key-5-9") -> columnInfo(nameToAttr("key-5-9")).copy(nullCount = 3),
-          nameToAttr("key-1-2") -> columnInfo(nameToAttr("key-1-2")).copy(nullCount = 5),
-          nameToAttr("key-2-4") -> columnInfo(nameToAttr("key-2-4")).copy(nullCount = 5))))
+        Seq(nameToAttr("key-1-5") -> columnInfo(nameToAttr("key-1-5")).copy(nullCount = Some(3)),
+          nameToAttr("key-5-9") -> columnInfo(nameToAttr("key-5-9")).copy(nullCount = Some(3)),
+          nameToAttr("key-1-2") -> columnInfo(nameToAttr("key-1-2")).copy(nullCount = Some(5)),
+          nameToAttr("key-2-4") -> columnInfo(nameToAttr("key-2-4")).copy(nullCount = Some(5)))))
     assert(join.stats == expectedStats)
   }
 
@@ -356,11 +363,11 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
     val join = Join(table1, table2, Inner,
       Some(EqualTo(nameToAttr("key-1-5"), nameToAttr("key-1-2"))))
     // Update column stats for equi-join keys (key-1-5 and key-1-2).
-    val joinedColStat = ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
-      avgLen = 4, maxLen = 4)
+    val joinedColStat = ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
     // Update column stat for other column if #outputRow / #sideRow < 1 (key-5-9), or keep it
     // unchanged (key-2-4).
-    val colStatForkey59 = nameToColInfo("key-5-9")._2.copy(distinctCount = 5 * 3 / 5)
+    val colStatForkey59 = nameToColInfo("key-5-9")._2.copy(distinctCount = Some(5 * 3 / 5))
 
     val expectedStats = Statistics(
       sizeInBytes = 3 * (8 + 4 * 4),
@@ -379,10 +386,10 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
         EqualTo(nameToAttr("key-2-4"), nameToAttr("key-2-3")))))
 
     // Update column stats for join keys.
-    val joinedColStat1 = ColumnStat(distinctCount = 2, min = Some(1), max = Some(2), nullCount = 0,
-        avgLen = 4, maxLen = 4)
-    val joinedColStat2 = ColumnStat(distinctCount = 2, min = Some(2), max = Some(3), nullCount = 0,
-      avgLen = 4, maxLen = 4)
+    val joinedColStat1 = ColumnStat(distinctCount = Some(2), min = Some(1), max = Some(2),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
+    val joinedColStat2 = ColumnStat(distinctCount = Some(2), min = Some(2), max = Some(3),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
     val expectedStats = Statistics(
       sizeInBytes = 2 * (8 + 4 * 4),
@@ -398,8 +405,8 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
     // table3 (key-1-2 int, key-2-3 int): (1, 2), (2, 3)
     val join = Join(table3, table2, LeftOuter,
       Some(EqualTo(nameToAttr("key-2-3"), nameToAttr("key-2-4"))))
-    val joinedColStat = ColumnStat(distinctCount = 2, min = Some(2), max = Some(3), nullCount = 0,
-      avgLen = 4, maxLen = 4)
+    val joinedColStat = ColumnStat(distinctCount = Some(2), min = Some(2), max = Some(3),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
     val expectedStats = Statistics(
       sizeInBytes = 2 * (8 + 4 * 4),
@@ -416,8 +423,8 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
     // table3 (key-1-2 int, key-2-3 int): (1, 2), (2, 3)
     val join = Join(table2, table3, RightOuter,
       Some(EqualTo(nameToAttr("key-2-4"), nameToAttr("key-2-3"))))
-    val joinedColStat = ColumnStat(distinctCount = 2, min = Some(2), max = Some(3), nullCount = 0,
-      avgLen = 4, maxLen = 4)
+    val joinedColStat = ColumnStat(distinctCount = Some(2), min = Some(2), max = Some(3),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 
     val expectedStats = Statistics(
       sizeInBytes = 2 * (8 + 4 * 4),
@@ -466,30 +473,40 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
       val date = DateTimeUtils.fromJavaDate(Date.valueOf("2016-05-08"))
       val timestamp = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-05-08 00:00:01"))
       mutable.LinkedHashMap[Attribute, ColumnStat](
-        AttributeReference("cbool", BooleanType)() -> ColumnStat(distinctCount = 1,
-          min = Some(false), max = Some(false), nullCount = 0, avgLen = 1, maxLen = 1),
-        AttributeReference("cbyte", ByteType)() -> ColumnStat(distinctCount = 1,
-          min = Some(1.toByte), max = Some(1.toByte), nullCount = 0, avgLen = 1, maxLen = 1),
-        AttributeReference("cshort", ShortType)() -> ColumnStat(distinctCount = 1,
-          min = Some(1.toShort), max = Some(1.toShort), nullCount = 0, avgLen = 2, maxLen = 2),
-        AttributeReference("cint", IntegerType)() -> ColumnStat(distinctCount = 1,
-          min = Some(1), max = Some(1), nullCount = 0, avgLen = 4, maxLen = 4),
-        AttributeReference("clong", LongType)() -> ColumnStat(distinctCount = 1,
-          min = Some(1L), max = Some(1L), nullCount = 0, avgLen = 8, maxLen = 8),
-        AttributeReference("cdouble", DoubleType)() -> ColumnStat(distinctCount = 1,
-          min = Some(1.0), max = Some(1.0), nullCount = 0, avgLen = 8, maxLen = 8),
-        AttributeReference("cfloat", FloatType)() -> ColumnStat(distinctCount = 1,
-          min = Some(1.0f), max = Some(1.0f), nullCount = 0, avgLen = 4, maxLen = 4),
-        AttributeReference("cdec", DecimalType.SYSTEM_DEFAULT)() -> ColumnStat(distinctCount = 1,
-          min = Some(dec), max = Some(dec), nullCount = 0, avgLen = 16, maxLen = 16),
-        AttributeReference("cstring", StringType)() -> ColumnStat(distinctCount = 1,
-          min = None, max = None, nullCount = 0, avgLen = 3, maxLen = 3),
-        AttributeReference("cbinary", BinaryType)() -> ColumnStat(distinctCount = 1,
-          min = None, max = None, nullCount = 0, avgLen = 3, maxLen = 3),
-        AttributeReference("cdate", DateType)() -> ColumnStat(distinctCount = 1,
-          min = Some(date), max = Some(date), nullCount = 0, avgLen = 4, maxLen = 4),
-        AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = 1,
-          min = Some(timestamp), max = Some(timestamp), nullCount = 0, avgLen = 8, maxLen = 8)
+        AttributeReference("cbool", BooleanType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(false), max = Some(false),
+          nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)),
+        AttributeReference("cbyte", ByteType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(1.toByte), max = Some(1.toByte),
+          nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)),
+        AttributeReference("cshort", ShortType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(1.toShort), max = Some(1.toShort),
+          nullCount = Some(0), avgLen = Some(2), maxLen = Some(2)),
+        AttributeReference("cint", IntegerType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(1), max = Some(1),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        AttributeReference("clong", LongType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(1L), max = Some(1L),
+          nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+        AttributeReference("cdouble", DoubleType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(1.0), max = Some(1.0),
+          nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+        AttributeReference("cfloat", FloatType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(1.0f), max = Some(1.0f),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        AttributeReference("cdec", DecimalType.SYSTEM_DEFAULT)() -> ColumnStat(
+          distinctCount = Some(1), min = Some(dec), max = Some(dec),
+          nullCount = Some(0), avgLen = Some(16), maxLen = Some(16)),
+        AttributeReference("cstring", StringType)() -> ColumnStat(distinctCount = Some(1),
+          min = None, max = None, nullCount = Some(0), avgLen = Some(3), maxLen = Some(3)),
+        AttributeReference("cbinary", BinaryType)() -> ColumnStat(distinctCount = Some(1),
+          min = None, max = None, nullCount = Some(0), avgLen = Some(3), maxLen = Some(3)),
+        AttributeReference("cdate", DateType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(date), max = Some(date),
+          nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+        AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = Some(1),
+          min = Some(timestamp), max = Some(timestamp),
+          nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
       )
     }
 
@@ -520,7 +537,8 @@ class JoinEstimationSuite extends StatsEstimationTestBase {
 
   test("join with null column") {
     val (nullColumn, nullColStat) = (attr("cnull"),
-      ColumnStat(distinctCount = 0, min = None, max = None, nullCount = 1, avgLen = 4, maxLen = 4))
+      ColumnStat(distinctCount = Some(0), min = None, max = None,
+        nullCount = Some(1), avgLen = Some(4), maxLen = Some(4)))
     val nullTable = StatsTestPlan(
       outputList = Seq(nullColumn),
       rowCount = 1,

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
index cda54fa..dcb3701 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/ProjectEstimationSuite.scala
@@ -28,10 +28,10 @@ import org.apache.spark.sql.types._
 class ProjectEstimationSuite extends StatsEstimationTestBase {
 
   test("project with alias") {
-    val (ar1, colStat1) = (attr("key1"), ColumnStat(distinctCount = 2, min = Some(1),
-      max = Some(2), nullCount = 0, avgLen = 4, maxLen = 4))
-    val (ar2, colStat2) = (attr("key2"), ColumnStat(distinctCount = 1, min = Some(10),
-      max = Some(10), nullCount = 0, avgLen = 4, maxLen = 4))
+    val (ar1, colStat1) = (attr("key1"), ColumnStat(distinctCount = Some(2), min = Some(1),
+      max = Some(2), nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))
+    val (ar2, colStat2) = (attr("key2"), ColumnStat(distinctCount = Some(1), min = Some(10),
+      max = Some(10), nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))
 
     val child = StatsTestPlan(
       outputList = Seq(ar1, ar2),
@@ -49,8 +49,8 @@ class ProjectEstimationSuite extends StatsEstimationTestBase {
   }
 
   test("project on empty table") {
-    val (ar1, colStat1) = (attr("key1"), ColumnStat(distinctCount = 0, min = None, max = None,
-      nullCount = 0, avgLen = 4, maxLen = 4))
+    val (ar1, colStat1) = (attr("key1"), ColumnStat(distinctCount = Some(0), min = None, max = None,
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)))
     val child = StatsTestPlan(
       outputList = Seq(ar1),
       rowCount = 0,
@@ -71,30 +71,40 @@ class ProjectEstimationSuite extends StatsEstimationTestBase {
     val t2 = DateTimeUtils.fromJavaTimestamp(Timestamp.valueOf("2016-05-09 00:00:02"))
 
     val columnInfo: AttributeMap[ColumnStat] = AttributeMap(Seq(
-      AttributeReference("cbool", BooleanType)() -> ColumnStat(distinctCount = 2,
-        min = Some(false), max = Some(true), nullCount = 0, avgLen = 1, maxLen = 1),
-      AttributeReference("cbyte", ByteType)() -> ColumnStat(distinctCount = 2,
-        min = Some(1.toByte), max = Some(2.toByte), nullCount = 0, avgLen = 1, maxLen = 1),
-      AttributeReference("cshort", ShortType)() -> ColumnStat(distinctCount = 2,
-        min = Some(1.toShort), max = Some(3.toShort), nullCount = 0, avgLen = 2, maxLen = 2),
-      AttributeReference("cint", IntegerType)() -> ColumnStat(distinctCount = 2,
-        min = Some(1), max = Some(4), nullCount = 0, avgLen = 4, maxLen = 4),
-      AttributeReference("clong", LongType)() -> ColumnStat(distinctCount = 2,
-        min = Some(1L), max = Some(5L), nullCount = 0, avgLen = 8, maxLen = 8),
-      AttributeReference("cdouble", DoubleType)() -> ColumnStat(distinctCount = 2,
-        min = Some(1.0), max = Some(6.0), nullCount = 0, avgLen = 8, maxLen = 8),
-      AttributeReference("cfloat", FloatType)() -> ColumnStat(distinctCount = 2,
-        min = Some(1.0f), max = Some(7.0f), nullCount = 0, avgLen = 4, maxLen = 4),
-      AttributeReference("cdecimal", DecimalType.SYSTEM_DEFAULT)() -> ColumnStat(distinctCount = 2,
-        min = Some(dec1), max = Some(dec2), nullCount = 0, avgLen = 16, maxLen = 16),
-      AttributeReference("cstring", StringType)() -> ColumnStat(distinctCount = 2,
-        min = None, max = None, nullCount = 0, avgLen = 3, maxLen = 3),
-      AttributeReference("cbinary", BinaryType)() -> ColumnStat(distinctCount = 2,
-        min = None, max = None, nullCount = 0, avgLen = 3, maxLen = 3),
-      AttributeReference("cdate", DateType)() -> ColumnStat(distinctCount = 2,
-        min = Some(d1), max = Some(d2), nullCount = 0, avgLen = 4, maxLen = 4),
-      AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = 2,
-        min = Some(t1), max = Some(t2), nullCount = 0, avgLen = 8, maxLen = 8)
+      AttributeReference("cbool", BooleanType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(false), max = Some(true),
+        nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)),
+      AttributeReference("cbyte", ByteType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(1), max = Some(2),
+        nullCount = Some(0), avgLen = Some(1), maxLen = Some(1)),
+      AttributeReference("cshort", ShortType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(1), max = Some(3),
+        nullCount = Some(0), avgLen = Some(2), maxLen = Some(2)),
+      AttributeReference("cint", IntegerType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(1), max = Some(4),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+      AttributeReference("clong", LongType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(1), max = Some(5),
+        nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+      AttributeReference("cdouble", DoubleType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(1.0), max = Some(6.0),
+        nullCount = Some(0), avgLen = Some(8), maxLen = Some(8)),
+      AttributeReference("cfloat", FloatType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(1.0), max = Some(7.0),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+      AttributeReference("cdecimal", DecimalType.SYSTEM_DEFAULT)() -> ColumnStat(
+        distinctCount = Some(2), min = Some(dec1), max = Some(dec2),
+        nullCount = Some(0), avgLen = Some(16), maxLen = Some(16)),
+      AttributeReference("cstring", StringType)() -> ColumnStat(distinctCount = Some(2),
+        min = None, max = None, nullCount = Some(0), avgLen = Some(3), maxLen = Some(3)),
+      AttributeReference("cbinary", BinaryType)() -> ColumnStat(distinctCount = Some(2),
+        min = None, max = None, nullCount = Some(0), avgLen = Some(3), maxLen = Some(3)),
+      AttributeReference("cdate", DateType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(d1), max = Some(d2),
+        nullCount = Some(0), avgLen = Some(4), maxLen = Some(4)),
+      AttributeReference("ctimestamp", TimestampType)() -> ColumnStat(distinctCount = Some(2),
+        min = Some(t1), max = Some(t2),
+        nullCount = Some(0), avgLen = Some(8), maxLen = Some(8))
     ))
     val columnSizes: Map[Attribute, Long] = columnInfo.map(kv => (kv._1, getColSize(kv._1, kv._2)))
     val child = StatsTestPlan(

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
index 31dea2e..9dceca5 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/statsEstimation/StatsEstimationTestBase.scala
@@ -42,8 +42,8 @@ trait StatsEstimationTestBase extends SparkFunSuite {
 
   def getColSize(attribute: Attribute, colStat: ColumnStat): Long = attribute.dataType match {
     // For UTF8String: base + offset + numBytes
-    case StringType => colStat.avgLen + 8 + 4
-    case _ => colStat.avgLen
+    case StringType => colStat.avgLen.getOrElse(attribute.dataType.defaultSize.toLong) + 8 + 4
+    case _ => colStat.avgLen.getOrElse(attribute.dataType.defaultSize)
   }
 
   def attr(colName: String): AttributeReference = AttributeReference(colName, IntegerType)()
@@ -54,6 +54,12 @@ trait StatsEstimationTestBase extends SparkFunSuite {
     val nameToAttr: Map[String, Attribute] = plan.output.map(a => (a.name, a)).toMap
     AttributeMap(colStats.map(kv => nameToAttr(kv._1) -> kv._2))
   }
+
+  /** Get a test ColumnStat with given distinctCount and nullCount */
+  def rangeColumnStat(distinctCount: Int, nullCount: Int): ColumnStat =
+    ColumnStat(distinctCount = Some(distinctCount),
+      min = Some(1), max = Some(distinctCount),
+      nullCount = Some(0), avgLen = Some(4), maxLen = Some(4))
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
index 1122522..640e013 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/AnalyzeColumnCommand.scala
@@ -20,13 +20,15 @@ package org.apache.spark.sql.execution.command
 import scala.collection.mutable
 
 import org.apache.spark.sql._
-import org.apache.spark.sql.catalyst.TableIdentifier
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTableType}
+import org.apache.spark.sql.catalyst.{InternalRow, TableIdentifier}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTableType}
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.expressions.aggregate._
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.catalyst.util.ArrayData
 import org.apache.spark.sql.execution.QueryExecution
+import org.apache.spark.sql.internal.SQLConf
+import org.apache.spark.sql.types._
 
 
 /**
@@ -64,12 +66,12 @@ case class AnalyzeColumnCommand(
 
   /**
    * Compute stats for the given columns.
-   * @return (row count, map from column name to ColumnStats)
+   * @return (row count, map from column name to CatalogColumnStats)
    */
   private def computeColumnStats(
       sparkSession: SparkSession,
       tableIdent: TableIdentifier,
-      columnNames: Seq[String]): (Long, Map[String, ColumnStat]) = {
+      columnNames: Seq[String]): (Long, Map[String, CatalogColumnStat]) = {
 
     val conf = sparkSession.sessionState.conf
     val relation = sparkSession.table(tableIdent).logicalPlan
@@ -81,7 +83,7 @@ case class AnalyzeColumnCommand(
 
     // Make sure the column types are supported for stats gathering.
     attributesToAnalyze.foreach { attr =>
-      if (!ColumnStat.supportsType(attr.dataType)) {
+      if (!supportsType(attr.dataType)) {
         throw new AnalysisException(
           s"Column ${attr.name} in table $tableIdent is of type ${attr.dataType}, " +
             "and Spark does not support statistics collection on this column type.")
@@ -103,7 +105,7 @@ case class AnalyzeColumnCommand(
     // will be structs containing all column stats.
     // The layout of each struct follows the layout of the ColumnStats.
     val expressions = Count(Literal(1)).toAggregateExpression() +:
-      attributesToAnalyze.map(ColumnStat.statExprs(_, conf, attributePercentiles))
+      attributesToAnalyze.map(statExprs(_, conf, attributePercentiles))
 
     val namedExpressions = expressions.map(e => Alias(e, e.toString)())
     val statsRow = new QueryExecution(sparkSession, Aggregate(Nil, namedExpressions, relation))
@@ -111,9 +113,9 @@ case class AnalyzeColumnCommand(
 
     val rowCount = statsRow.getLong(0)
     val columnStats = attributesToAnalyze.zipWithIndex.map { case (attr, i) =>
-      // according to `ColumnStat.statExprs`, the stats struct always have 7 fields.
-      (attr.name, ColumnStat.rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
-        attributePercentiles.get(attr)))
+      // according to `statExprs`, the stats struct always have 7 fields.
+      (attr.name, rowToColumnStat(statsRow.getStruct(i + 1, 7), attr, rowCount,
+        attributePercentiles.get(attr)).toCatalogColumnStat(attr.name, attr.dataType))
     }.toMap
     (rowCount, columnStats)
   }
@@ -124,7 +126,7 @@ case class AnalyzeColumnCommand(
       sparkSession: SparkSession,
       relation: LogicalPlan): AttributeMap[ArrayData] = {
     val attrsToGenHistogram = if (conf.histogramEnabled) {
-      attributesToAnalyze.filter(a => ColumnStat.supportsHistogram(a.dataType))
+      attributesToAnalyze.filter(a => supportsHistogram(a.dataType))
     } else {
       Nil
     }
@@ -154,4 +156,120 @@ case class AnalyzeColumnCommand(
     AttributeMap(attributePercentiles.toSeq)
   }
 
+  /** Returns true iff the we support gathering column statistics on column of the given type. */
+  private def supportsType(dataType: DataType): Boolean = dataType match {
+    case _: IntegralType => true
+    case _: DecimalType => true
+    case DoubleType | FloatType => true
+    case BooleanType => true
+    case DateType => true
+    case TimestampType => true
+    case BinaryType | StringType => true
+    case _ => false
+  }
+
+  /** Returns true iff the we support gathering histogram on column of the given type. */
+  private def supportsHistogram(dataType: DataType): Boolean = dataType match {
+    case _: IntegralType => true
+    case _: DecimalType => true
+    case DoubleType | FloatType => true
+    case DateType => true
+    case TimestampType => true
+    case _ => false
+  }
+
+  /**
+   * Constructs an expression to compute column statistics for a given column.
+   *
+   * The expression should create a single struct column with the following schema:
+   * distinctCount: Long, min: T, max: T, nullCount: Long, avgLen: Long, maxLen: Long,
+   * distinctCountsForIntervals: Array[Long]
+   *
+   * Together with [[rowToColumnStat]], this function is used to create [[ColumnStat]] and
+   * as a result should stay in sync with it.
+   */
+  private def statExprs(
+    col: Attribute,
+    conf: SQLConf,
+    colPercentiles: AttributeMap[ArrayData]): CreateNamedStruct = {
+    def struct(exprs: Expression*): CreateNamedStruct = CreateStruct(exprs.map { expr =>
+      expr.transformUp { case af: AggregateFunction => af.toAggregateExpression() }
+    })
+    val one = Literal(1, LongType)
+
+    // the approximate ndv (num distinct value) should never be larger than the number of rows
+    val numNonNulls = if (col.nullable) Count(col) else Count(one)
+    val ndv = Least(Seq(HyperLogLogPlusPlus(col, conf.ndvMaxError), numNonNulls))
+    val numNulls = Subtract(Count(one), numNonNulls)
+    val defaultSize = Literal(col.dataType.defaultSize, LongType)
+    val nullArray = Literal(null, ArrayType(LongType))
+
+    def fixedLenTypeStruct: CreateNamedStruct = {
+      val genHistogram =
+        supportsHistogram(col.dataType) && colPercentiles.contains(col)
+      val intervalNdvsExpr = if (genHistogram) {
+        ApproxCountDistinctForIntervals(col,
+          Literal(colPercentiles(col), ArrayType(col.dataType)), conf.ndvMaxError)
+      } else {
+        nullArray
+      }
+      // For fixed width types, avg size should be the same as max size.
+      struct(ndv, Cast(Min(col), col.dataType), Cast(Max(col), col.dataType), numNulls,
+        defaultSize, defaultSize, intervalNdvsExpr)
+    }
+
+    col.dataType match {
+      case _: IntegralType => fixedLenTypeStruct
+      case _: DecimalType => fixedLenTypeStruct
+      case DoubleType | FloatType => fixedLenTypeStruct
+      case BooleanType => fixedLenTypeStruct
+      case DateType => fixedLenTypeStruct
+      case TimestampType => fixedLenTypeStruct
+      case BinaryType | StringType =>
+        // For string and binary type, we don't compute min, max or histogram
+        val nullLit = Literal(null, col.dataType)
+        struct(
+          ndv, nullLit, nullLit, numNulls,
+          // Set avg/max size to default size if all the values are null or there is no value.
+          Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
+          Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)),
+          nullArray)
+      case _ =>
+        throw new AnalysisException("Analyzing column statistics is not supported for column " +
+          s"${col.name} of data type: ${col.dataType}.")
+    }
+  }
+
+  /** Convert a struct for column stats (defined in `statExprs`) into [[ColumnStat]]. */
+  private def rowToColumnStat(
+    row: InternalRow,
+    attr: Attribute,
+    rowCount: Long,
+    percentiles: Option[ArrayData]): ColumnStat = {
+    // The first 6 fields are basic column stats, the 7th is ndvs for histogram bins.
+    val cs = ColumnStat(
+      distinctCount = Option(BigInt(row.getLong(0))),
+      // for string/binary min/max, get should return null
+      min = Option(row.get(1, attr.dataType)),
+      max = Option(row.get(2, attr.dataType)),
+      nullCount = Option(BigInt(row.getLong(3))),
+      avgLen = Option(row.getLong(4)),
+      maxLen = Option(row.getLong(5))
+    )
+    if (row.isNullAt(6) || cs.nullCount.isEmpty) {
+      cs
+    } else {
+      val ndvs = row.getArray(6).toLongArray()
+      assert(percentiles.get.numElements() == ndvs.length + 1)
+      val endpoints = percentiles.get.toArray[Any](attr.dataType).map(_.toString.toDouble)
+      // Construct equi-height histogram
+      val bins = ndvs.zipWithIndex.map { case (ndv, i) =>
+        HistogramBin(endpoints(i), endpoints(i + 1), ndv)
+      }
+      val nonNullRows = rowCount - cs.nullCount.get
+      val histogram = Histogram(nonNullRows.toDouble / ndvs.length, bins)
+      cs.copy(histogram = Some(histogram))
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
index e400975..4474919 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/command/tables.scala
@@ -695,10 +695,11 @@ case class DescribeColumnCommand(
       // Show column stats when EXTENDED or FORMATTED is specified.
       buffer += Row("min", cs.flatMap(_.min.map(_.toString)).getOrElse("NULL"))
       buffer += Row("max", cs.flatMap(_.max.map(_.toString)).getOrElse("NULL"))
-      buffer += Row("num_nulls", cs.map(_.nullCount.toString).getOrElse("NULL"))
-      buffer += Row("distinct_count", cs.map(_.distinctCount.toString).getOrElse("NULL"))
-      buffer += Row("avg_col_len", cs.map(_.avgLen.toString).getOrElse("NULL"))
-      buffer += Row("max_col_len", cs.map(_.maxLen.toString).getOrElse("NULL"))
+      buffer += Row("num_nulls", cs.flatMap(_.nullCount.map(_.toString)).getOrElse("NULL"))
+      buffer += Row("distinct_count",
+        cs.flatMap(_.distinctCount.map(_.toString)).getOrElse("NULL"))
+      buffer += Row("avg_col_len", cs.flatMap(_.avgLen.map(_.toString)).getOrElse("NULL"))
+      buffer += Row("max_col_len", cs.flatMap(_.maxLen.map(_.toString)).getOrElse("NULL"))
       val histDesc = for {
         c <- cs
         hist <- c.histogram

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index b11e798..ed4ea02 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -20,6 +20,7 @@ package org.apache.spark.sql
 import scala.collection.mutable
 
 import org.apache.spark.sql.catalyst.TableIdentifier
+import org.apache.spark.sql.catalyst.catalog.CatalogColumnStat
 import org.apache.spark.sql.catalyst.plans.logical._
 import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.test.SharedSQLContext
@@ -95,7 +96,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
       assert(fetchedStats2.get.sizeInBytes == 0)
 
       val expectedColStat =
-        "key" -> ColumnStat(0, None, None, 0, IntegerType.defaultSize, IntegerType.defaultSize)
+        "key" -> CatalogColumnStat(Some(0), None, None, Some(0),
+          Some(IntegerType.defaultSize), Some(IntegerType.defaultSize))
 
       // There won't be histogram for empty column.
       Seq("true", "false").foreach { histogramEnabled =>
@@ -156,7 +158,7 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
     Seq(stats, statsWithHgms).foreach { s =>
       s.zip(df.schema).foreach { case ((k, v), field) =>
         withClue(s"column $k with type ${field.dataType}") {
-          val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap(k, field.dataType))
+          val roundtrip = CatalogColumnStat.fromMap("table_is_foo", field.name, v.toMap(k))
           assert(roundtrip == Some(v))
         }
       }
@@ -187,7 +189,8 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
     }.mkString(", "))
 
     val expectedColStats = dataTypes.map { case (tpe, idx) =>
-      (s"col$idx", ColumnStat(0, None, None, 1, tpe.defaultSize.toLong, tpe.defaultSize.toLong))
+      (s"col$idx", CatalogColumnStat(Some(0), None, None, Some(1),
+        Some(tpe.defaultSize.toLong), Some(tpe.defaultSize.toLong)))
     }
 
     // There won't be histograms for null columns.

http://git-wip-us.apache.org/repos/asf/spark/blob/8077bb04/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
index 65ccc19..bf4abb6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionTestBase.scala
@@ -24,8 +24,8 @@ import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.{QualifiedTableName, TableIdentifier}
-import org.apache.spark.sql.catalyst.catalog.{CatalogStatistics, CatalogTable, HiveTableRelation}
-import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram, HistogramBin, LogicalPlan}
+import org.apache.spark.sql.catalyst.catalog.{CatalogColumnStat, CatalogStatistics, CatalogTable, HiveTableRelation}
+import org.apache.spark.sql.catalyst.plans.logical.{ColumnStat, Histogram, HistogramBin, HistogramSerializer, LogicalPlan}
 import org.apache.spark.sql.catalyst.util.DateTimeUtils
 import org.apache.spark.sql.execution.datasources.LogicalRelation
 import org.apache.spark.sql.internal.{SQLConf, StaticSQLConf}
@@ -67,18 +67,21 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
 
   /** A mapping from column to the stats collected. */
   protected val stats = mutable.LinkedHashMap(
-    "cbool" -> ColumnStat(2, Some(false), Some(true), 1, 1, 1),
-    "cbyte" -> ColumnStat(2, Some(1.toByte), Some(2.toByte), 1, 1, 1),
-    "cshort" -> ColumnStat(2, Some(1.toShort), Some(3.toShort), 1, 2, 2),
-    "cint" -> ColumnStat(2, Some(1), Some(4), 1, 4, 4),
-    "clong" -> ColumnStat(2, Some(1L), Some(5L), 1, 8, 8),
-    "cdouble" -> ColumnStat(2, Some(1.0), Some(6.0), 1, 8, 8),
-    "cfloat" -> ColumnStat(2, Some(1.0f), Some(7.0f), 1, 4, 4),
-    "cdecimal" -> ColumnStat(2, Some(Decimal(dec1)), Some(Decimal(dec2)), 1, 16, 16),
-    "cstring" -> ColumnStat(2, None, None, 1, 3, 3),
-    "cbinary" -> ColumnStat(2, None, None, 1, 3, 3),
-    "cdate" -> ColumnStat(2, Some(d1Internal), Some(d2Internal), 1, 4, 4),
-    "ctimestamp" -> ColumnStat(2, Some(t1Internal), Some(t2Internal), 1, 8, 8)
+    "cbool" -> CatalogColumnStat(Some(2), Some("false"), Some("true"), Some(1), Some(1), Some(1)),
+    "cbyte" -> CatalogColumnStat(Some(2), Some("1"), Some("2"), Some(1), Some(1), Some(1)),
+    "cshort" -> CatalogColumnStat(Some(2), Some("1"), Some("3"), Some(1), Some(2), Some(2)),
+    "cint" -> CatalogColumnStat(Some(2), Some("1"), Some("4"), Some(1), Some(4), Some(4)),
+    "clong" -> CatalogColumnStat(Some(2), Some("1"), Some("5"), Some(1), Some(8), Some(8)),
+    "cdouble" -> CatalogColumnStat(Some(2), Some("1.0"), Some("6.0"), Some(1), Some(8), Some(8)),
+    "cfloat" -> CatalogColumnStat(Some(2), Some("1.0"), Some("7.0"), Some(1), Some(4), Some(4)),
+    "cdecimal" -> CatalogColumnStat(Some(2), Some(dec1.toString), Some(dec2.toString), Some(1),
+      Some(16), Some(16)),
+    "cstring" -> CatalogColumnStat(Some(2), None, None, Some(1), Some(3), Some(3)),
+    "cbinary" -> CatalogColumnStat(Some(2), None, None, Some(1), Some(3), Some(3)),
+    "cdate" -> CatalogColumnStat(Some(2), Some(d1.toString), Some(d2.toString), Some(1), Some(4),
+      Some(4)),
+    "ctimestamp" -> CatalogColumnStat(Some(2), Some(t1.toString), Some(t2.toString), Some(1),
+      Some(8), Some(8))
   )
 
   /**
@@ -110,6 +113,110 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     colStats
   }
 
+  val expectedSerializedColStats = Map(
+    "spark.sql.statistics.colStats.cbinary.avgLen" -> "3",
+    "spark.sql.statistics.colStats.cbinary.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cbinary.maxLen" -> "3",
+    "spark.sql.statistics.colStats.cbinary.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cbinary.version" -> "1",
+    "spark.sql.statistics.colStats.cbool.avgLen" -> "1",
+    "spark.sql.statistics.colStats.cbool.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cbool.max" -> "true",
+    "spark.sql.statistics.colStats.cbool.maxLen" -> "1",
+    "spark.sql.statistics.colStats.cbool.min" -> "false",
+    "spark.sql.statistics.colStats.cbool.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cbool.version" -> "1",
+    "spark.sql.statistics.colStats.cbyte.avgLen" -> "1",
+    "spark.sql.statistics.colStats.cbyte.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cbyte.max" -> "2",
+    "spark.sql.statistics.colStats.cbyte.maxLen" -> "1",
+    "spark.sql.statistics.colStats.cbyte.min" -> "1",
+    "spark.sql.statistics.colStats.cbyte.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cbyte.version" -> "1",
+    "spark.sql.statistics.colStats.cdate.avgLen" -> "4",
+    "spark.sql.statistics.colStats.cdate.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cdate.max" -> "2016-05-09",
+    "spark.sql.statistics.colStats.cdate.maxLen" -> "4",
+    "spark.sql.statistics.colStats.cdate.min" -> "2016-05-08",
+    "spark.sql.statistics.colStats.cdate.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cdate.version" -> "1",
+    "spark.sql.statistics.colStats.cdecimal.avgLen" -> "16",
+    "spark.sql.statistics.colStats.cdecimal.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cdecimal.max" -> "8.000000000000000000",
+    "spark.sql.statistics.colStats.cdecimal.maxLen" -> "16",
+    "spark.sql.statistics.colStats.cdecimal.min" -> "1.000000000000000000",
+    "spark.sql.statistics.colStats.cdecimal.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cdecimal.version" -> "1",
+    "spark.sql.statistics.colStats.cdouble.avgLen" -> "8",
+    "spark.sql.statistics.colStats.cdouble.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cdouble.max" -> "6.0",
+    "spark.sql.statistics.colStats.cdouble.maxLen" -> "8",
+    "spark.sql.statistics.colStats.cdouble.min" -> "1.0",
+    "spark.sql.statistics.colStats.cdouble.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cdouble.version" -> "1",
+    "spark.sql.statistics.colStats.cfloat.avgLen" -> "4",
+    "spark.sql.statistics.colStats.cfloat.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cfloat.max" -> "7.0",
+    "spark.sql.statistics.colStats.cfloat.maxLen" -> "4",
+    "spark.sql.statistics.colStats.cfloat.min" -> "1.0",
+    "spark.sql.statistics.colStats.cfloat.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cfloat.version" -> "1",
+    "spark.sql.statistics.colStats.cint.avgLen" -> "4",
+    "spark.sql.statistics.colStats.cint.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cint.max" -> "4",
+    "spark.sql.statistics.colStats.cint.maxLen" -> "4",
+    "spark.sql.statistics.colStats.cint.min" -> "1",
+    "spark.sql.statistics.colStats.cint.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cint.version" -> "1",
+    "spark.sql.statistics.colStats.clong.avgLen" -> "8",
+    "spark.sql.statistics.colStats.clong.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.clong.max" -> "5",
+    "spark.sql.statistics.colStats.clong.maxLen" -> "8",
+    "spark.sql.statistics.colStats.clong.min" -> "1",
+    "spark.sql.statistics.colStats.clong.nullCount" -> "1",
+    "spark.sql.statistics.colStats.clong.version" -> "1",
+    "spark.sql.statistics.colStats.cshort.avgLen" -> "2",
+    "spark.sql.statistics.colStats.cshort.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cshort.max" -> "3",
+    "spark.sql.statistics.colStats.cshort.maxLen" -> "2",
+    "spark.sql.statistics.colStats.cshort.min" -> "1",
+    "spark.sql.statistics.colStats.cshort.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cshort.version" -> "1",
+    "spark.sql.statistics.colStats.cstring.avgLen" -> "3",
+    "spark.sql.statistics.colStats.cstring.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.cstring.maxLen" -> "3",
+    "spark.sql.statistics.colStats.cstring.nullCount" -> "1",
+    "spark.sql.statistics.colStats.cstring.version" -> "1",
+    "spark.sql.statistics.colStats.ctimestamp.avgLen" -> "8",
+    "spark.sql.statistics.colStats.ctimestamp.distinctCount" -> "2",
+    "spark.sql.statistics.colStats.ctimestamp.max" -> "2016-05-09 00:00:02.0",
+    "spark.sql.statistics.colStats.ctimestamp.maxLen" -> "8",
+    "spark.sql.statistics.colStats.ctimestamp.min" -> "2016-05-08 00:00:01.0",
+    "spark.sql.statistics.colStats.ctimestamp.nullCount" -> "1",
+    "spark.sql.statistics.colStats.ctimestamp.version" -> "1"
+  )
+
+  val expectedSerializedHistograms = Map(
+    "spark.sql.statistics.colStats.cbyte.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("cbyte").histogram.get),
+    "spark.sql.statistics.colStats.cshort.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("cshort").histogram.get),
+    "spark.sql.statistics.colStats.cint.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("cint").histogram.get),
+    "spark.sql.statistics.colStats.clong.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("clong").histogram.get),
+    "spark.sql.statistics.colStats.cdouble.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("cdouble").histogram.get),
+    "spark.sql.statistics.colStats.cfloat.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("cfloat").histogram.get),
+    "spark.sql.statistics.colStats.cdecimal.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("cdecimal").histogram.get),
+    "spark.sql.statistics.colStats.cdate.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("cdate").histogram.get),
+    "spark.sql.statistics.colStats.ctimestamp.histogram" ->
+      HistogramSerializer.serialize(statsWithHgms("ctimestamp").histogram.get)
+  )
+
   private val randomName = new Random(31)
 
   def getCatalogTable(tableName: String): CatalogTable = {
@@ -151,7 +258,7 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
    */
   def checkColStats(
       df: DataFrame,
-      colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
+      colStats: mutable.LinkedHashMap[String, CatalogColumnStat]): Unit = {
     val tableName = "column_stats_test_" + randomName.nextInt(1000)
     withTable(tableName) {
       df.write.saveAsTable(tableName)
@@ -161,14 +268,24 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
         colStats.keys.mkString(", "))
 
       // Validate statistics
-      val table = getCatalogTable(tableName)
-      assert(table.stats.isDefined)
-      assert(table.stats.get.colStats.size == colStats.size)
-
-      colStats.foreach { case (k, v) =>
-        withClue(s"column $k") {
-          assert(table.stats.get.colStats(k) == v)
-        }
+      validateColStats(tableName, colStats)
+    }
+  }
+
+  /**
+   * Validate if the given catalog table has the provided statistics.
+   */
+  def validateColStats(
+      tableName: String,
+      colStats: mutable.LinkedHashMap[String, CatalogColumnStat]): Unit = {
+
+    val table = getCatalogTable(tableName)
+    assert(table.stats.isDefined)
+    assert(table.stats.get.colStats.size == colStats.size)
+
+    colStats.foreach { case (k, v) =>
+      withClue(s"column $k") {
+        assert(table.stats.get.colStats(k) == v)
       }
     }
   }
@@ -215,12 +332,13 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
       case catalogRel: HiveTableRelation => (catalogRel, catalogRel.tableMeta)
       case logicalRel: LogicalRelation => (logicalRel, logicalRel.catalogTable.get)
     }.head
-    val emptyColStat = ColumnStat(0, None, None, 0, 4, 4)
+    val emptyColStat = ColumnStat(Some(0), None, None, Some(0), Some(4), Some(4))
+    val emptyCatalogColStat = CatalogColumnStat(Some(0), None, None, Some(0), Some(4), Some(4))
     // Check catalog statistics
     assert(catalogTable.stats.isDefined)
     assert(catalogTable.stats.get.sizeInBytes == 0)
     assert(catalogTable.stats.get.rowCount == Some(0))
-    assert(catalogTable.stats.get.colStats == Map("c1" -> emptyColStat))
+    assert(catalogTable.stats.get.colStats == Map("c1" -> emptyCatalogColStat))
 
     // Check relation statistics
     withSQLConf(SQLConf.CBO_ENABLED.key -> "true") {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org