You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/12/11 05:25:31 UTC
spark git commit: [SPARK-18815][SQL] Fix NPE when collecting column
stats for string/binary column having only null values
Repository: spark
Updated Branches:
refs/heads/master e094d0115 -> a29ee55aa
[SPARK-18815][SQL] Fix NPE when collecting column stats for string/binary column having only null values
## What changes were proposed in this pull request?
During column stats collection, average and max length will be null if a column of string/binary type has only null values. To fix this, I use default size when avg/max length is null.
## How was this patch tested?
Add a test for handling null columns
Author: wangzhenhua <wa...@huawei.com>
Closes #16243 from wzhfy/nullStats.
Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/a29ee55a
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/a29ee55a
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/a29ee55a
Branch: refs/heads/master
Commit: a29ee55aaadfe43ac9abb0eaf8b022b1e6d7babb
Parents: e094d01
Author: wangzhenhua <wa...@huawei.com>
Authored: Sat Dec 10 21:25:29 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Sat Dec 10 21:25:29 2016 -0800
----------------------------------------------------------------------
.../sql/catalyst/plans/logical/Statistics.scala | 9 ++-
.../spark/sql/StatisticsCollectionSuite.scala | 67 ++++++++++++++------
2 files changed, 53 insertions(+), 23 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/spark/blob/a29ee55a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
index 7986560..465fbab 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/plans/logical/Statistics.scala
@@ -194,11 +194,12 @@ object ColumnStat extends Logging {
val numNonNulls = if (col.nullable) Count(col) else Count(one)
val ndv = Least(Seq(HyperLogLogPlusPlus(col, relativeSD), numNonNulls))
val numNulls = Subtract(Count(one), numNonNulls)
+ val defaultSize = Literal(col.dataType.defaultSize, LongType)
def fixedLenTypeStruct(castType: DataType) = {
// For fixed width types, avg size should be the same as max size.
- val avgSize = Literal(col.dataType.defaultSize, LongType)
- struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, avgSize, avgSize)
+ struct(ndv, Cast(Min(col), castType), Cast(Max(col), castType), numNulls, defaultSize,
+ defaultSize)
}
col.dataType match {
@@ -213,7 +214,9 @@ object ColumnStat extends Logging {
val nullLit = Literal(null, col.dataType)
struct(
ndv, nullLit, nullLit, numNulls,
- Ceil(Average(Length(col))), Cast(Max(Length(col)), LongType))
+ // Set avg/max size to default size if all the values are null or there is no value.
+ Coalesce(Seq(Ceil(Average(Length(col))), defaultSize)),
+ Coalesce(Seq(Cast(Max(Length(col)), LongType), defaultSize)))
case _ =>
throw new AnalysisException("Analyzing column statistics is not supported for column " +
s"${col.name} of data type: ${col.dataType}.")
http://git-wip-us.apache.org/repos/asf/spark/blob/a29ee55a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
index 1fcccd0..0740849 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/StatisticsCollectionSuite.scala
@@ -21,6 +21,7 @@ import java.{lang => jl}
import java.sql.{Date, Timestamp}
import scala.collection.mutable
+import scala.util.Random
import org.apache.spark.sql.catalyst.TableIdentifier
import org.apache.spark.sql.catalyst.plans.logical._
@@ -133,6 +134,40 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
}
}
+ test("column stats round trip serialization") {
+ // Make sure we serialize and then deserialize and we will get the result data
+ val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+ stats.zip(df.schema).foreach { case ((k, v), field) =>
+ withClue(s"column $k with type ${field.dataType}") {
+ val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap)
+ assert(roundtrip == Some(v))
+ }
+ }
+ }
+
+ test("analyze column command - result verification") {
+ // (data.head.productArity - 1) because the last column does not support stats collection.
+ assert(stats.size == data.head.productArity - 1)
+ val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+ checkColStats(df, stats)
+ }
+
+ test("column stats collection for null columns") {
+ val dataTypes: Seq[(DataType, Int)] = Seq(
+ BooleanType, ByteType, ShortType, IntegerType, LongType,
+ DoubleType, FloatType, DecimalType.SYSTEM_DEFAULT,
+ StringType, BinaryType, DateType, TimestampType
+ ).zipWithIndex
+
+ val df = sql("select " + dataTypes.map { case (tpe, idx) =>
+ s"cast(null as ${tpe.sql}) as col$idx"
+ }.mkString(", "))
+
+ val expectedColStats = dataTypes.map { case (tpe, idx) =>
+ (s"col$idx", ColumnStat(0, None, None, 1, tpe.defaultSize.toLong, tpe.defaultSize.toLong))
+ }
+ checkColStats(df, mutable.LinkedHashMap(expectedColStats: _*))
+ }
}
@@ -141,7 +176,6 @@ class StatisticsCollectionSuite extends StatisticsCollectionTestBase with Shared
* when using the Hive external catalog) as well as in the sql/core module.
*/
abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils {
- import testImplicits._
private val dec1 = new java.math.BigDecimal("1.000000000000000000")
private val dec2 = new java.math.BigDecimal("8.000000000000000000")
@@ -180,35 +214,28 @@ abstract class StatisticsCollectionTestBase extends QueryTest with SQLTestUtils
"ctimestamp" -> ColumnStat(2, Some(t1), Some(t2), 1, 8, 8)
)
- test("column stats round trip serialization") {
- // Make sure we serialize and then deserialize and we will get the result data
- val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
- stats.zip(df.schema).foreach { case ((k, v), field) =>
- withClue(s"column $k with type ${field.dataType}") {
- val roundtrip = ColumnStat.fromMap("table_is_foo", field, v.toMap)
- assert(roundtrip == Some(v))
- }
- }
- }
-
- test("analyze column command - result verification") {
- val tableName = "column_stats_test2"
- // (data.head.productArity - 1) because the last column does not support stats collection.
- assert(stats.size == data.head.productArity - 1)
- val df = data.toDF(stats.keys.toSeq :+ "carray" : _*)
+ private val randomName = new Random(31)
+ /**
+ * Compute column stats for the given DataFrame and compare it with colStats.
+ */
+ def checkColStats(
+ df: DataFrame,
+ colStats: mutable.LinkedHashMap[String, ColumnStat]): Unit = {
+ val tableName = "column_stats_test_" + randomName.nextInt(1000)
withTable(tableName) {
df.write.saveAsTable(tableName)
// Collect statistics
- sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " + stats.keys.mkString(", "))
+ sql(s"analyze table $tableName compute STATISTICS FOR COLUMNS " +
+ colStats.keys.mkString(", "))
// Validate statistics
val table = spark.sessionState.catalog.getTableMetadata(TableIdentifier(tableName))
assert(table.stats.isDefined)
- assert(table.stats.get.colStats.size == stats.size)
+ assert(table.stats.get.colStats.size == colStats.size)
- stats.foreach { case (k, v) =>
+ colStats.foreach { case (k, v) =>
withClue(s"column $k") {
assert(table.stats.get.colStats(k) == v)
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org