You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by ma...@apache.org on 2014/11/03 00:14:48 UTC

git commit: [SPARK-4182][SQL] Fixes ColumnStats classes for boolean, binary and complex data types

Repository: spark
Updated Branches:
  refs/heads/master 9c0eb57c7 -> e4b80894b


[SPARK-4182][SQL] Fixes ColumnStats classes for boolean, binary and complex data types

`NoopColumnStats` was once used for binary, boolean and complex data types. This `ColumnStats` doesn't return properly shaped column statistics and causes caching failure if a table contains columns of the aforementioned types.

This PR adds `BooleanColumnStats`, `BinaryColumnStats` and `GenericColumnStats`, used for boolean, binary and all complex data types respectively. In addition, `NoopColumnStats` returns properly shaped column statistics containing null count and row count, but this class is now used for testing purpose only.

Author: Cheng Lian <li...@databricks.com>

Closes #3059 from liancheng/spark-4182 and squashes the following commits:

b398cfd [Cheng Lian] Fixes failed test case
fb3ee85 [Cheng Lian] Fixes SPARK-4182


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/e4b80894
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/e4b80894
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/e4b80894

Branch: refs/heads/master
Commit: e4b80894bdb72c0acf8832fd48421c546fbc37e6
Parents: 9c0eb57
Author: Cheng Lian <li...@databricks.com>
Authored: Sun Nov 2 15:14:44 2014 -0800
Committer: Michael Armbrust <mi...@databricks.com>
Committed: Sun Nov 2 15:14:44 2014 -0800

----------------------------------------------------------------------
 .../spark/sql/columnar/ColumnBuilder.scala      | 10 +++--
 .../apache/spark/sql/columnar/ColumnStats.scala | 45 +++++++++++++++++++-
 .../columnar/InMemoryColumnarTableScan.scala    |  3 ++
 .../org/apache/spark/sql/SQLQuerySuite.scala    |  7 +--
 .../scala/org/apache/spark/sql/TestData.scala   |  8 ++++
 .../columnar/InMemoryColumnarQuerySuite.scala   | 28 +++++++-----
 6 files changed, 82 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/e4b80894/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 300cef1..c68dcee 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -79,8 +79,9 @@ private[sql] class BasicColumnBuilder[T <: DataType, JvmType](
 }
 
 private[sql] abstract class ComplexColumnBuilder[T <: DataType, JvmType](
+    columnStats: ColumnStats,
     columnType: ColumnType[T, JvmType])
-  extends BasicColumnBuilder[T, JvmType](new NoopColumnStats, columnType)
+  extends BasicColumnBuilder[T, JvmType](columnStats, columnType)
   with NullableColumnBuilder
 
 private[sql] abstract class NativeColumnBuilder[T <: NativeType](
@@ -91,7 +92,7 @@ private[sql] abstract class NativeColumnBuilder[T <: NativeType](
   with AllCompressionSchemes
   with CompressibleColumnBuilder[T]
 
-private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new NoopColumnStats, BOOLEAN)
+private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(new BooleanColumnStats, BOOLEAN)
 
 private[sql] class IntColumnBuilder extends NativeColumnBuilder(new IntColumnStats, INT)
 
@@ -112,10 +113,11 @@ private[sql] class DateColumnBuilder extends NativeColumnBuilder(new DateColumnS
 private[sql] class TimestampColumnBuilder
   extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP)
 
-private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(BINARY)
+private[sql] class BinaryColumnBuilder extends ComplexColumnBuilder(new BinaryColumnStats, BINARY)
 
 // TODO (lian) Add support for array, struct and map
-private[sql] class GenericColumnBuilder extends ComplexColumnBuilder(GENERIC)
+private[sql] class GenericColumnBuilder
+  extends ComplexColumnBuilder(new GenericColumnStats, GENERIC)
 
 private[sql] object ColumnBuilder {
   val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024

http://git-wip-us.apache.org/repos/asf/spark/blob/e4b80894/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index b9f9f82..668efe4 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -70,11 +70,30 @@ private[sql] sealed trait ColumnStats extends Serializable {
   def collectedStatistics: Row
 }
 
+/**
+ * A no-op ColumnStats only used for testing purposes.
+ */
 private[sql] class NoopColumnStats extends ColumnStats {
+  override def gatherStats(row: Row, ordinal: Int): Unit = super.gatherStats(row, ordinal)
+
+  def collectedStatistics = Row(null, null, nullCount, count, 0L)
+}
 
-  override def gatherStats(row: Row, ordinal: Int): Unit = {}
+private[sql] class BooleanColumnStats extends ColumnStats {
+  protected var upper = false
+  protected var lower = true
 
-  override def collectedStatistics = Row()
+  override def gatherStats(row: Row, ordinal: Int): Unit = {
+    super.gatherStats(row, ordinal)
+    if (!row.isNullAt(ordinal)) {
+      val value = row.getBoolean(ordinal)
+      if (value > upper) upper = value
+      if (value < lower) lower = value
+      sizeInBytes += BOOLEAN.defaultSize
+    }
+  }
+
+  def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
 }
 
 private[sql] class ByteColumnStats extends ColumnStats {
@@ -229,3 +248,25 @@ private[sql] class TimestampColumnStats extends ColumnStats {
 
   def collectedStatistics = Row(lower, upper, nullCount, count, sizeInBytes)
 }
+
+private[sql] class BinaryColumnStats extends ColumnStats {
+  override def gatherStats(row: Row, ordinal: Int): Unit = {
+    super.gatherStats(row, ordinal)
+    if (!row.isNullAt(ordinal)) {
+      sizeInBytes += BINARY.actualSize(row, ordinal)
+    }
+  }
+
+  def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes)
+}
+
+private[sql] class GenericColumnStats extends ColumnStats {
+  override def gatherStats(row: Row, ordinal: Int): Unit = {
+    super.gatherStats(row, ordinal)
+    if (!row.isNullAt(ordinal)) {
+      sizeInBytes += GENERIC.actualSize(row, ordinal)
+    }
+  }
+
+  def collectedStatistics = Row(null, null, nullCount, count, sizeInBytes)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/e4b80894/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
index ee63134..455b415 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/InMemoryColumnarTableScan.scala
@@ -161,6 +161,9 @@ private[sql] case class InMemoryRelation(
   }
 
   def cachedColumnBuffers = _cachedColumnBuffers
+
+  override protected def otherCopyArgs: Seq[AnyRef] =
+    Seq(_cachedColumnBuffers, statisticsToBePropagated)
 }
 
 private[sql] case class InMemoryColumnarTableScan(

http://git-wip-us.apache.org/repos/asf/spark/blob/e4b80894/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
index 6befe1b..6bf4393 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/SQLQuerySuite.scala
@@ -21,11 +21,12 @@ import java.util.TimeZone
 
 import org.scalatest.BeforeAndAfterAll
 
-import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.catalyst.errors.TreeNodeException
 import org.apache.spark.sql.catalyst.expressions._
 import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
 
+/* Implicits */
+import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.test.TestSQLContext._
 
 class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
@@ -719,7 +720,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
     validateMetadata(sql("SELECT * FROM personWithMeta JOIN salary ON id = personId"))
     validateMetadata(sql("SELECT name, salary FROM personWithMeta JOIN salary ON id = personId"))
   }
-  
+
   test("SPARK-3371 Renaming a function expression with group by gives error") {
     registerFunction("len", (s: String) => s.length)
     checkAnswer(
@@ -934,7 +935,7 @@ class SQLQuerySuite extends QueryTest with BeforeAndAfterAll {
   }
 
   test("SPARK-4154 Query does not work if it has 'not between' in Spark SQL and HQL") {
-    checkAnswer(sql("SELECT key FROM testData WHERE key not between 0 and 10 order by key"), 
+    checkAnswer(sql("SELECT key FROM testData WHERE key not between 0 and 10 order by key"),
         (11 to 100).map(i => Seq(i)))
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e4b80894/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
index 836dd17..ef87a23 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/TestData.scala
@@ -177,4 +177,12 @@ object TestData {
     Salary(0, 2000.0) ::
     Salary(1, 1000.0) :: Nil)
   salary.registerTempTable("salary")
+
+  case class ComplexData(m: Map[Int, String], s: TestData, a: Seq[Int], b: Boolean)
+  val complexData =
+    TestSQLContext.sparkContext.parallelize(
+      ComplexData(Map(1 -> "1"), TestData(1, "1"), Seq(1), true)
+        :: ComplexData(Map(2 -> "2"), TestData(2, "2"), Seq(2), false)
+        :: Nil).toSchemaRDD
+  complexData.registerTempTable("complexData")
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/e4b80894/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
index 9775dd2..15903d0 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/InMemoryColumnarQuerySuite.scala
@@ -17,17 +17,18 @@
 
 package org.apache.spark.sql.columnar
 
+import org.apache.spark.sql.TestData._
 import org.apache.spark.sql.catalyst.expressions.Row
-import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.test.TestSQLContext._
 import org.apache.spark.sql.{QueryTest, TestData}
 import org.apache.spark.storage.StorageLevel.MEMORY_ONLY
 
 class InMemoryColumnarQuerySuite extends QueryTest {
-  import org.apache.spark.sql.TestData._
-  import org.apache.spark.sql.test.TestSQLContext._
+  // Make sure the tables are loaded.
+  TestData
 
   test("simple columnar query") {
-    val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+    val plan = executePlan(testData.logicalPlan).executedPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
 
     checkAnswer(scan, testData.collect().toSeq)
@@ -42,7 +43,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
   }
 
   test("projection") {
-    val plan = TestSQLContext.executePlan(testData.select('value, 'key).logicalPlan).executedPlan
+    val plan = executePlan(testData.select('value, 'key).logicalPlan).executedPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
 
     checkAnswer(scan, testData.collect().map {
@@ -51,7 +52,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
   }
 
   test("SPARK-1436 regression: in-memory columns must be able to be accessed multiple times") {
-    val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+    val plan = executePlan(testData.logicalPlan).executedPlan
     val scan = InMemoryRelation(useCompression = true, 5, MEMORY_ONLY, plan)
 
     checkAnswer(scan, testData.collect().toSeq)
@@ -63,7 +64,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
       sql("SELECT * FROM repeatedData"),
       repeatedData.collect().toSeq)
 
-    TestSQLContext.cacheTable("repeatedData")
+    cacheTable("repeatedData")
 
     checkAnswer(
       sql("SELECT * FROM repeatedData"),
@@ -75,7 +76,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
       sql("SELECT * FROM nullableRepeatedData"),
       nullableRepeatedData.collect().toSeq)
 
-    TestSQLContext.cacheTable("nullableRepeatedData")
+    cacheTable("nullableRepeatedData")
 
     checkAnswer(
       sql("SELECT * FROM nullableRepeatedData"),
@@ -87,7 +88,7 @@ class InMemoryColumnarQuerySuite extends QueryTest {
       sql("SELECT time FROM timestamps"),
       timestamps.collect().toSeq)
 
-    TestSQLContext.cacheTable("timestamps")
+    cacheTable("timestamps")
 
     checkAnswer(
       sql("SELECT time FROM timestamps"),
@@ -99,10 +100,17 @@ class InMemoryColumnarQuerySuite extends QueryTest {
       sql("SELECT * FROM withEmptyParts"),
       withEmptyParts.collect().toSeq)
 
-    TestSQLContext.cacheTable("withEmptyParts")
+    cacheTable("withEmptyParts")
 
     checkAnswer(
       sql("SELECT * FROM withEmptyParts"),
       withEmptyParts.collect().toSeq)
   }
+
+  test("SPARK-4182 Caching complex types") {
+    complexData.cache().count()
+    // Shouldn't throw
+    complexData.count()
+    complexData.unpersist()
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org