You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/12/11 05:25:31 UTC

spark git commit: [SPARK-18815][SQL] Fix NPE when collecting column stats for string/binary column having only null values

Repository: spark
Updated Branches:
  refs/heads/master e094d0115 -> a29ee55aa


[SPARK-18815][SQL] Fix NPE when collecting column stats for string/binary column having only null values

## What changes were proposed in this pull request?

During column stats collection, average and max length will be null if a column of string/binary type has only null values. To fix this, I use default size when avg/max length is null.

## How was this patch tested?

Add a test for handling null columns

Author: wangzhenhua <wa...@huawei.com>

Closes #16243 from wzhfy/nullStats.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a29ee55a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a29ee55a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a29ee55a

Branch: refs/heads/master
Commit: a29ee55aaadfe43ac9abb0eaf8b022b1e6d7babb
Parents: e094d01
Author: wangzhenhua <wa...@huawei.com>
Authored: Sat Dec 10 21:25:29 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Dec 10 21:25:29 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/plans/logical/Statistics.scala |  9 ++-
 .../spark/sql/StatisticsCollectionSuite.scala   | 67 ++++++++++++++------
 2 files changed, 53 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/a29ee55a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 7986560..465fbab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -194,11 +194,12 @@ object ColumnStat extends Logging {
     val numNonNulls = if (col.nullable) Count(col) else Count(one)
     val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls))
     val numNulls = Subtract(Count(one), numNonNulls)
+    val defaultSize = Literal(col.dataType.defaultSize, LongType)
 
     def fixedLenTypeStruct(castType: DataType) = {
       // For fixed width types, avg size should be the same as max size.
-      val avgSize = Literal(col.dataType.defaultSize, LongType)
-      struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, avgSize, avgSize)
+      struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, defaultSize,
+        defaultSize)
     }
 
     col.dataType match {
@@ -213,7 +214,9 @@ object ColumnStat extends Logging {
         val nullLit = Literal(null, col.dataType)
         struct(
           ndv, nullLit, nullLit, numNulls,
-          Ceil(Average(Length(col))), Cast(Max(Length(col)), LongType))
+          // Set avg/max size to default size if all the values are null or there is no value.
+          Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
+          Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)))
       case _ =>
         throw new AnalysisException("Analyzing column statistics is not supported for column " +
             s"${col.name} of data type: ${col.dataType}.")

http://git-wip-us.apache.org/repos/asf/spark/blob/a29ee55a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 1fcccd0..0740849 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -21,6 +21,7 @@ import java.{lang => jl}
 import java.sql.{Date, Timestamp}
 
 import scala.collection.mutable
+import scala.util.Random
 
 import org.apache.spark.sql.catalyst.TableIdentifier
 import org.apache.spark.sql.catalyst.plans.logical._
@@ -133,6 +134,40 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
     }
   }
 
+  test("column stats round trip serialization") {
+    // Make sure we serialize and then deserialize and we will get the result data
+    val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+    stats.zip(df.schema).foreach { case ((k, v), field) =>
+      withClue(s"column $k with type ${field.dataType}") {
+        val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap)
+        assert(roundtrip == Some(v))
+      }
+    }
+  }
+
+  test("analyze column command - result verification") {
+    // (data.head.productArity - 1) because the last column does not support stats collection.
+    assert(stats.size == data.head.productArity - 1)
+    val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+    checkColStats(df, stats)
+  }
+
+  test("column stats collection for null columns") {
+    val dataTypes: Seq[(DataType, Int)] = Seq(
+      BooleanType, ByteType, ShortType, IntegerType, LongType,
+      DoubleType, FloatType, DecimalType.SYSTEM_DEFAULT,
+      StringType, BinaryType, DateType, TimestampType
+    ).zipWithIndex
+
+    val df = sql("select " + dataTypes.map { case (tpe, idx) =>
+      s"cast(null as ${tpe.sql}) as col$idx"
+    }.mkString(", "))
+
+    val expectedColStats = dataTypes.map { case (tpe, idx) =>
+      (s"col$idx", ColumnStat(0, None, None, 1, tpe.defaultSize.toLong, tpe.defaultSize.toLong))
+    }
+    checkColStats(df, mutable.LinkedHashMap(expectedColStats: _*))
+  }
 }
 
 
@@ -141,7 +176,6 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
  * when using the Hive external catalog) as well as in the sql/core module.
  */
 abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
-  import testImplicits._
 
   private val dec1 = new java.math.BigDecimal("1.000000000000000000")
   private val dec2 = new java.math.BigDecimal("8.000000000000000000")
@@ -180,35 +214,28 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
     "ctimestamp" -> ColumnStat(2, Some(t1), Some(t2), 1, 8, 8)
   )
 
-  test("column stats round trip serialization") {
-    // Make sure we serialize and then deserialize and we will get the result data
-    val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
-    stats.zip(df.schema).foreach { case ((k, v), field) =>
-      withClue(s"column $k with type ${field.dataType}") {
-        val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap)
-        assert(roundtrip == Some(v))
-      }
-    }
-  }
-
-  test("analyze column command - result verification") {
-    val tableName = "column_stats_test2"
-    // (data.head.productArity - 1) because the last column does not support stats collection.
-    assert(stats.size == data.head.productArity - 1)
-    val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+  private val randomName = new Random(31)
 
+  /**
+   * Compute column stats for the given DataFrame and compare it with colStats.
+   */
+  def checkColStats(
+      df: DataFrame,
+      colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
+    val tableName = "column_stats_test_" + randomName.nextInt(1000)
     withTable(tableName) {
       df.write.saveAsTable(tableName)
 
       // Collect statistics
-      sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
+      sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " +
+        colStats.keys.mkString(", "))
 
       // Validate statistics
       val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
       assert(table.stats.isDefined)
-      assert(table.stats.get.colStats.size == stats.size)
+      assert(table.stats.get.colStats.size == colStats.size)
 
-      stats.foreach { case (k, v) =>
+      colStats.foreach { case (k, v) =>
         withClue(s"column $k") {
           assert(table.stats.get.colStats(k) == v)
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org