You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by da...@apache.org on 2015/10/06 17:45:37 UTC

spark git commit: [SPARK-10938] [SQL] remove typeId in columnar cache

Repository: spark
Updated Branches:
  refs/heads/master 4e0027fea -> 27ecfe61f


[SPARK-10938] [SQL] remove typeId in columnar cache

This PR remove the typeId in columnar cache, it's not needed anymore, it also remove DATE and TIMESTAMP (use INT/LONG instead).

Author: Davies Liu <da...@databricks.com>

Closes #8989 from davies/refactor_cache.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/27ecfe61
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/27ecfe61
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/27ecfe61

Branch: refs/heads/master
Commit: 27ecfe61f07c8413a7b8b9fbdf36ed99cf05227d
Parents: 4e0027f
Author: Davies Liu <da...@databricks.com>
Authored: Tue Oct 6 08:45:31 2015 -0700
Committer: Davies Liu <da...@gmail.com>
Committed: Tue Oct 6 08:45:31 2015 -0700

----------------------------------------------------------------------
 project/MimaExcludes.scala                      |  4 +-
 .../spark/sql/columnar/ColumnAccessor.scala     | 36 ++++------
 .../spark/sql/columnar/ColumnBuilder.scala      | 16 ++---
 .../apache/spark/sql/columnar/ColumnStats.scala |  4 --
 .../apache/spark/sql/columnar/ColumnType.scala  | 71 ++++----------------
 .../sql/columnar/NullableColumnBuilder.scala    | 19 +++---
 .../compression/CompressibleColumnBuilder.scala | 27 ++++----
 .../compression/CompressionScheme.scala         |  6 +-
 .../spark/sql/columnar/ColumnStatsSuite.scala   |  3 -
 .../spark/sql/columnar/ColumnTypeSuite.scala    | 10 +--
 .../spark/sql/columnar/ColumnarTestUtils.scala  |  5 +-
 .../columnar/NullableColumnAccessorSuite.scala  |  8 +--
 .../columnar/NullableColumnBuilderSuite.scala   |  5 +-
 13 files changed, 63 insertions(+), 151 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/project/MimaExcludes.scala
----------------------------------------------------------------------
diff --git a/project/MimaExcludes.scala b/project/MimaExcludes.scala
index b2e6be7..2d4d146 100644
--- a/project/MimaExcludes.scala
+++ b/project/MimaExcludes.scala
@@ -42,7 +42,9 @@ object MimaExcludes {
         excludePackage("org.spark-project.jetty"),
         MimaBuild.excludeSparkPackage("unused"),
         // SQL execution is considered private.
-        excludePackage("org.apache.spark.sql.execution")
+        excludePackage("org.apache.spark.sql.execution"),
+        // SQL columnar is considered private.
+        excludePackage("org.apache.spark.sql.columnar")
       ) ++
       MimaBuild.excludeSparkClass("streaming.flume.FlumeTestUtils") ++
       MimaBuild.excludeSparkClass("streaming.flume.PollingFlumeTestUtils") ++

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
index 4c29a09..2b1d700 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
@@ -103,35 +103,23 @@ private[sql] class GenericColumnAccessor(buffer: ByteBuffer, dataType: DataType)
   extends BasicColumnAccessor[Array[Byte]](buffer, GENERIC(dataType))
   with NullableColumnAccessor
 
-private[sql] class DateColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, DATE)
-
-private[sql] class TimestampColumnAccessor(buffer: ByteBuffer)
-  extends NativeColumnAccessor(buffer, TIMESTAMP)
-
 private[sql] object ColumnAccessor {
   def apply(dataType: DataType, buffer: ByteBuffer): ColumnAccessor = {
-    val dup = buffer.duplicate().order(ByteOrder.nativeOrder)
-
-    // The first 4 bytes in the buffer indicate the column type.  This field is not used now,
-    // because we always know the data type of the column ahead of time.
-    dup.getInt()
+    val buf = buffer.order(ByteOrder.nativeOrder)
 
     dataType match {
-      case BooleanType => new BooleanColumnAccessor(dup)
-      case ByteType => new ByteColumnAccessor(dup)
-      case ShortType => new ShortColumnAccessor(dup)
-      case IntegerType => new IntColumnAccessor(dup)
-      case DateType => new DateColumnAccessor(dup)
-      case LongType => new LongColumnAccessor(dup)
-      case TimestampType => new TimestampColumnAccessor(dup)
-      case FloatType => new FloatColumnAccessor(dup)
-      case DoubleType => new DoubleColumnAccessor(dup)
-      case StringType => new StringColumnAccessor(dup)
-      case BinaryType => new BinaryColumnAccessor(dup)
+      case BooleanType => new BooleanColumnAccessor(buf)
+      case ByteType => new ByteColumnAccessor(buf)
+      case ShortType => new ShortColumnAccessor(buf)
+      case IntegerType | DateType => new IntColumnAccessor(buf)
+      case LongType | TimestampType => new LongColumnAccessor(buf)
+      case FloatType => new FloatColumnAccessor(buf)
+      case DoubleType => new DoubleColumnAccessor(buf)
+      case StringType => new StringColumnAccessor(buf)
+      case BinaryType => new BinaryColumnAccessor(buf)
       case DecimalType.Fixed(precision, scale) if precision < 19 =>
-        new FixedDecimalColumnAccessor(dup, precision, scale)
-      case other => new GenericColumnAccessor(dup, other)
+        new FixedDecimalColumnAccessor(buf, precision, scale)
+      case other => new GenericColumnAccessor(buf, other)
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
index 1620fc4..2e60564 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -63,9 +63,8 @@ private[sql] class BasicColumnBuilder[JvmType](
     val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
     this.columnName = columnName
 
-    // Reserves 4 bytes for column type ID
-    buffer = ByteBuffer.allocate(4 + size * columnType.defaultSize)
-    buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
+    buffer = ByteBuffer.allocate(size * columnType.defaultSize)
+    buffer.order(ByteOrder.nativeOrder())
   }
 
   override def appendFrom(row: InternalRow, ordinal: Int): Unit = {
@@ -121,11 +120,6 @@ private[sql] class FixedDecimalColumnBuilder(
 private[sql] class GenericColumnBuilder(dataType: DataType)
   extends ComplexColumnBuilder(new GenericColumnStats(dataType), GENERIC(dataType))
 
-private[sql] class DateColumnBuilder extends NativeColumnBuilder(new DateColumnStats, DATE)
-
-private[sql] class TimestampColumnBuilder
-  extends NativeColumnBuilder(new TimestampColumnStats, TIMESTAMP)
-
 private[sql] object ColumnBuilder {
   val DEFAULT_INITIAL_BUFFER_SIZE = 1024 * 1024
 
@@ -154,10 +148,8 @@ private[sql] object ColumnBuilder {
       case BooleanType => new BooleanColumnBuilder
       case ByteType => new ByteColumnBuilder
       case ShortType => new ShortColumnBuilder
-      case IntegerType => new IntColumnBuilder
-      case DateType => new DateColumnBuilder
-      case LongType => new LongColumnBuilder
-      case TimestampType => new TimestampColumnBuilder
+      case IntegerType | DateType => new IntColumnBuilder
+      case LongType | TimestampType => new LongColumnBuilder
       case FloatType => new FloatColumnBuilder
       case DoubleType => new DoubleColumnBuilder
       case StringType => new StringColumnBuilder

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
index fbd51b7..3b5052b 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnStats.scala
@@ -266,7 +266,3 @@ private[sql] class GenericColumnStats(dataType: DataType) extends ColumnStats {
   override def collectedStatistics: GenericInternalRow =
     new GenericInternalRow(Array[Any](null, null, nullCount, count, sizeInBytes))
 }
-
-private[sql] class DateColumnStats extends IntColumnStats
-
-private[sql] class TimestampColumnStats extends LongColumnStats

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
index ab482a3..3a0cea8 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -38,9 +38,6 @@ private[sql] sealed abstract class ColumnType[JvmType] {
   // The catalyst data type of this column.
   def dataType: DataType
 
-  // A unique ID representing the type.
-  def typeId: Int
-
   // Default size in bytes for one element of type T (e.g. 4 for `Int`).
   def defaultSize: Int
 
@@ -107,7 +104,6 @@ private[sql] sealed abstract class ColumnType[JvmType] {
 
 private[sql] abstract class NativeColumnType[T <: AtomicType](
     val dataType: T,
-    val typeId: Int,
     val defaultSize: Int)
   extends ColumnType[T#InternalType] {
 
@@ -117,7 +113,7 @@ private[sql] abstract class NativeColumnType[T <: AtomicType](
   def scalaTag: TypeTag[dataType.InternalType] = dataType.tag
 }
 
-private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
+private[sql] object INT extends NativeColumnType(IntegerType, 4) {
   override def append(v: Int, buffer: ByteBuffer): Unit = {
     buffer.putInt(v)
   }
@@ -145,7 +141,7 @@ private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
   }
 }
 
-private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
+private[sql] object LONG extends NativeColumnType(LongType, 8) {
   override def append(v: Long, buffer: ByteBuffer): Unit = {
     buffer.putLong(v)
   }
@@ -173,7 +169,7 @@ private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
   }
 }
 
-private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
+private[sql] object FLOAT extends NativeColumnType(FloatType, 4) {
   override def append(v: Float, buffer: ByteBuffer): Unit = {
     buffer.putFloat(v)
   }
@@ -201,7 +197,7 @@ private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
   }
 }
 
-private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
+private[sql] object DOUBLE extends NativeColumnType(DoubleType, 8) {
   override def append(v: Double, buffer: ByteBuffer): Unit = {
     buffer.putDouble(v)
   }
@@ -229,7 +225,7 @@ private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
   }
 }
 
-private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
+private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 1) {
   override def append(v: Boolean, buffer: ByteBuffer): Unit = {
     buffer.put(if (v) 1: Byte else 0: Byte)
   }
@@ -255,7 +251,7 @@ private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
   }
 }
 
-private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
+private[sql] object BYTE extends NativeColumnType(ByteType, 1) {
   override def append(v: Byte, buffer: ByteBuffer): Unit = {
     buffer.put(v)
   }
@@ -283,7 +279,7 @@ private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
   }
 }
 
-private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
+private[sql] object SHORT extends NativeColumnType(ShortType, 2) {
   override def append(v: Short, buffer: ByteBuffer): Unit = {
     buffer.putShort(v)
   }
@@ -311,7 +307,7 @@ private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
   }
 }
 
-private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
+private[sql] object STRING extends NativeColumnType(StringType, 8) {
   override def actualSize(row: InternalRow, ordinal: Int): Int = {
     row.getUTF8String(ordinal).numBytes() + 4
   }
@@ -343,46 +339,9 @@ private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
   override def clone(v: UTF8String): UTF8String = v.clone()
 }
 
-private[sql] object DATE extends NativeColumnType(DateType, 8, 4) {
-  override def extract(buffer: ByteBuffer): Int = {
-    buffer.getInt
-  }
-
-  override def append(v: Int, buffer: ByteBuffer): Unit = {
-    buffer.putInt(v)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Int = {
-    row.getInt(ordinal)
-  }
-
-  def setField(row: MutableRow, ordinal: Int, value: Int): Unit = {
-    row(ordinal) = value
-  }
-}
-
-private[sql] object TIMESTAMP extends NativeColumnType(TimestampType, 9, 8) {
-  override def extract(buffer: ByteBuffer): Long = {
-    buffer.getLong
-  }
-
-  override def append(v: Long, buffer: ByteBuffer): Unit = {
-    buffer.putLong(v)
-  }
-
-  override def getField(row: InternalRow, ordinal: Int): Long = {
-    row.getLong(ordinal)
-  }
-
-  override def setField(row: MutableRow, ordinal: Int, value: Long): Unit = {
-    row(ordinal) = value
-  }
-}
-
 private[sql] case class FIXED_DECIMAL(precision: Int, scale: Int)
   extends NativeColumnType(
     DecimalType(precision, scale),
-    10,
     FIXED_DECIMAL.defaultSize) {
 
   override def extract(buffer: ByteBuffer): Decimal = {
@@ -410,9 +369,7 @@ private[sql] object FIXED_DECIMAL {
   val defaultSize = 8
 }
 
-private[sql] sealed abstract class ByteArrayColumnType(
-    val typeId: Int,
-    val defaultSize: Int)
+private[sql] sealed abstract class ByteArrayColumnType(val defaultSize: Int)
   extends ColumnType[Array[Byte]] {
 
   override def actualSize(row: InternalRow, ordinal: Int): Int = {
@@ -431,7 +388,7 @@ private[sql] sealed abstract class ByteArrayColumnType(
   }
 }
 
-private[sql] object BINARY extends ByteArrayColumnType(11, 16) {
+private[sql] object BINARY extends ByteArrayColumnType(16) {
 
   def dataType: DataType = BooleanType
 
@@ -447,7 +404,7 @@ private[sql] object BINARY extends ByteArrayColumnType(11, 16) {
 // Used to process generic objects (all types other than those listed above). Objects should be
 // serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
 // byte array.
-private[sql] case class GENERIC(dataType: DataType) extends ByteArrayColumnType(12, 16) {
+private[sql] case class GENERIC(dataType: DataType) extends ByteArrayColumnType(16) {
   override def setField(row: MutableRow, ordinal: Int, value: Array[Byte]): Unit = {
     row.update(ordinal, SparkSqlSerializer.deserialize[Any](value))
   }
@@ -463,10 +420,8 @@ private[sql] object ColumnType {
       case BooleanType => BOOLEAN
       case ByteType => BYTE
       case ShortType => SHORT
-      case IntegerType => INT
-      case DateType => DATE
-      case LongType => LONG
-      case TimestampType => TIMESTAMP
+      case IntegerType | DateType => INT
+      case LongType | TimestampType => LONG
       case FloatType => FLOAT
       case DoubleType => DOUBLE
       case StringType => STRING

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
index ba47bc7..76cfddf 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -25,14 +25,13 @@ import org.apache.spark.sql.catalyst.InternalRow
  * A stackable trait used for building byte buffer for a column containing null values.  Memory
  * layout of the final byte buffer is:
  * {{{
- *    .----------------------- Column type ID (4 bytes)
- *    |   .------------------- Null count N (4 bytes)
- *    |   |   .--------------- Null positions (4 x N bytes, empty if null count is zero)
- *    |   |   |     .--------- Non-null elements
- *    V   V   V     V
- *   +---+---+-----+---------+
- *   |   |   | ... | ... ... |
- *   +---+---+-----+---------+
+ *    .------------------- Null count N (4 bytes)
+ *    |   .--------------- Null positions (4 x N bytes, empty if null count is zero)
+ *    |   |     .--------- Non-null elements
+ *    V   V     V
+ *   +---+-----+---------+
+ *   |   | ... | ... ... |
+ *   +---+-----+---------+
  * }}}
  */
 private[sql] trait NullableColumnBuilder extends ColumnBuilder {
@@ -66,16 +65,14 @@ private[sql] trait NullableColumnBuilder extends ColumnBuilder {
 
   abstract override def build(): ByteBuffer = {
     val nonNulls = super.build()
-    val typeId = nonNulls.getInt()
     val nullDataLen = nulls.position()
 
     nulls.limit(nullDataLen)
     nulls.rewind()
 
     val buffer = ByteBuffer
-      .allocate(4 + 4 + nullDataLen + nonNulls.remaining())
+      .allocate(4 + nullDataLen + nonNulls.remaining())
       .order(ByteOrder.nativeOrder())
-      .putInt(typeId)
       .putInt(nullCount)
       .put(nulls)
       .put(nonNulls)

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
index 39b21dd..161021f 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressibleColumnBuilder.scala
@@ -28,17 +28,16 @@ import org.apache.spark.sql.types.AtomicType
  * A stackable trait that builds optionally compressed byte buffer for a column.  Memory layout of
  * the final byte buffer is:
  * {{{
- *    .--------------------------- Column type ID (4 bytes)
- *    |   .----------------------- Null count N (4 bytes)
- *    |   |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
- *    |   |   |     .------------- Compression scheme ID (4 bytes)
- *    |   |   |     |   .--------- Compressed non-null elements
- *    V   V   V     V   V
- *   +---+---+-----+---+---------+
- *   |   |   | ... |   | ... ... |
- *   +---+---+-----+---+---------+
- *    \-----------/ \-----------/
- *       header         body
+ *    .----------------------- Null count N (4 bytes)
+ *    |   .------------------- Null positions (4 x N bytes, empty if null count is zero)
+ *    |   |     .------------- Compression scheme ID (4 bytes)
+ *    |   |     |   .--------- Compressed non-null elements
+ *    V   V     V   V
+ *   +---+-----+---+---------+
+ *   |   | ... |   | ... ... |
+ *   +---+-----+---+---------+
+ *    \-------/ \-----------/
+ *     header         body
  * }}}
  */
 private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
@@ -83,14 +82,13 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
 
   override def build(): ByteBuffer = {
     val nonNullBuffer = buildNonNulls()
-    val typeId = nonNullBuffer.getInt()
     val encoder: Encoder[T] = {
       val candidate = compressionEncoders.minBy(_.compressionRatio)
       if (isWorthCompressing(candidate)) candidate else PassThrough.encoder(columnType)
     }
 
-    // Header = column type ID + null count + null positions
-    val headerSize = 4 + 4 + nulls.limit()
+    // Header = null count + null positions
+    val headerSize = 4 + nulls.limit()
     val compressedSize = if (encoder.compressedSize == 0) {
       nonNullBuffer.remaining()
     } else {
@@ -102,7 +100,6 @@ private[sql] trait CompressibleColumnBuilder[T <: AtomicType]
       .allocate(headerSize + 4 + compressedSize)
       .order(ByteOrder.nativeOrder)
       // Write the header
-      .putInt(typeId)
       .putInt(nullCount)
       .put(nulls)
 

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
index b1ef9b2..9322b77 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/compression/CompressionScheme.scala
@@ -74,8 +74,8 @@ private[sql] object CompressionScheme {
 
   def columnHeaderSize(columnBuffer: ByteBuffer): Int = {
     val header = columnBuffer.duplicate().order(ByteOrder.nativeOrder)
-    val nullCount = header.getInt(4)
-    // Column type ID + null count + null positions
-    4 + 4 + 4 * nullCount
+    val nullCount = header.getInt()
+    // null count + null positions
+    4 + 4 * nullCount
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
index d0430d2..708fb4c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnStatsSuite.scala
@@ -27,10 +27,7 @@ class ColumnStatsSuite extends SparkFunSuite {
   testColumnStats(classOf[ByteColumnStats], BYTE, createRow(Byte.MaxValue, Byte.MinValue, 0))
   testColumnStats(classOf[ShortColumnStats], SHORT, createRow(Short.MaxValue, Short.MinValue, 0))
   testColumnStats(classOf[IntColumnStats], INT, createRow(Int.MaxValue, Int.MinValue, 0))
-  testColumnStats(classOf[DateColumnStats], DATE, createRow(Int.MaxValue, Int.MinValue, 0))
   testColumnStats(classOf[LongColumnStats], LONG, createRow(Long.MaxValue, Long.MinValue, 0))
-  testColumnStats(classOf[TimestampColumnStats], TIMESTAMP,
-    createRow(Long.MaxValue, Long.MinValue, 0))
   testColumnStats(classOf[FloatColumnStats], FLOAT, createRow(Float.MaxValue, Float.MinValue, 0))
   testColumnStats(classOf[DoubleColumnStats], DOUBLE,
     createRow(Double.MaxValue, Double.MinValue, 0))

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
index 8f02469..a4cbe35 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -37,8 +37,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
 
   test("defaultSize") {
     val checks = Map(
-      BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4, DATE -> 4,
-      LONG -> 8, TIMESTAMP -> 8, FLOAT -> 4, DOUBLE -> 8,
+      BOOLEAN -> 1, BYTE -> 1, SHORT -> 2, INT -> 4,
+      LONG -> 8, FLOAT -> 4, DOUBLE -> 8,
       STRING -> 8, BINARY -> 16, FIXED_DECIMAL(15, 10) -> 8,
       MAP_GENERIC -> 16)
 
@@ -66,9 +66,7 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
     checkActualSize(BYTE, Byte.MaxValue, 1)
     checkActualSize(SHORT, Short.MaxValue, 2)
     checkActualSize(INT, Int.MaxValue, 4)
-    checkActualSize(DATE, Int.MaxValue, 4)
     checkActualSize(LONG, Long.MaxValue, 8)
-    checkActualSize(TIMESTAMP, Long.MaxValue, 8)
     checkActualSize(FLOAT, Float.MaxValue, 4)
     checkActualSize(DOUBLE, Double.MaxValue, 8)
     checkActualSize(STRING, UTF8String.fromString("hello"), 4 + "hello".getBytes("utf-8").length)
@@ -93,12 +91,8 @@ class ColumnTypeSuite extends SparkFunSuite with Logging {
 
   testNativeColumnType(INT)(_.putInt(_), _.getInt)
 
-  testNativeColumnType(DATE)(_.putInt(_), _.getInt)
-
   testNativeColumnType(LONG)(_.putLong(_), _.getLong)
 
-  testNativeColumnType(TIMESTAMP)(_.putLong(_), _.getLong)
-
   testNativeColumnType(FLOAT)(_.putFloat(_), _.getFloat)
 
   testNativeColumnType(DOUBLE)(_.putDouble(_), _.getDouble)

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
index 79bb7d0..123a705 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestUtils.scala
@@ -19,9 +19,10 @@ package org.apache.spark.sql.columnar
 
 import scala.collection.immutable.HashSet
 import scala.util.Random
+
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{DataType, Decimal, AtomicType}
+import org.apache.spark.sql.types.{AtomicType, Decimal}
 import org.apache.spark.unsafe.types.UTF8String
 
 object ColumnarTestUtils {
@@ -43,9 +44,7 @@ object ColumnarTestUtils {
       case BYTE => (Random.nextInt(Byte.MaxValue * 2) - Byte.MaxValue).toByte
       case SHORT => (Random.nextInt(Short.MaxValue * 2) - Short.MaxValue).toShort
       case INT => Random.nextInt()
-      case DATE => Random.nextInt()
       case LONG => Random.nextLong()
-      case TIMESTAMP => Random.nextLong()
       case FLOAT => Random.nextFloat()
       case DOUBLE => Random.nextDouble()
       case STRING => UTF8String.fromString(Random.nextString(Random.nextInt(32)))

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
index f4f6c76..a3a23d3 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
@@ -21,7 +21,7 @@ import java.nio.ByteBuffer
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
-import org.apache.spark.sql.types.{StringType, ArrayType, DataType}
+import org.apache.spark.sql.types.{ArrayType, StringType}
 
 class TestNullableColumnAccessor[JvmType](
     buffer: ByteBuffer,
@@ -32,17 +32,15 @@ class TestNullableColumnAccessor[JvmType](
 object TestNullableColumnAccessor {
   def apply[JvmType](buffer: ByteBuffer, columnType: ColumnType[JvmType])
     : TestNullableColumnAccessor[JvmType] = {
-    // Skips the column type ID
-    buffer.getInt()
     new TestNullableColumnAccessor(buffer, columnType)
   }
 }
 
 class NullableColumnAccessorSuite extends SparkFunSuite {
-  import ColumnarTestUtils._
+  import org.apache.spark.sql.columnar.ColumnarTestUtils._
 
   Seq(
-    BOOLEAN, BYTE, SHORT, INT, DATE, LONG, TIMESTAMP, FLOAT, DOUBLE,
+    BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
     STRING, BINARY, FIXED_DECIMAL(15, 10), GENERIC(ArrayType(StringType)))
     .foreach {
     testNullableColumnAccessor(_)

http://git-wip-us.apache.org/repos/asf/spark/blob/27ecfe61/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
index 241d09e..9557eea 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
@@ -38,7 +38,7 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
   import ColumnarTestUtils._
 
   Seq(
-    BOOLEAN, BYTE, SHORT, INT, DATE, LONG, TIMESTAMP, FLOAT, DOUBLE,
+    BOOLEAN, BYTE, SHORT, INT, LONG, FLOAT, DOUBLE,
     STRING, BINARY, FIXED_DECIMAL(15, 10), GENERIC(ArrayType(StringType)))
     .foreach {
     testNullableColumnBuilder(_)
@@ -53,7 +53,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
       val columnBuilder = TestNullableColumnBuilder(columnType)
       val buffer = columnBuilder.build()
 
-      assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
       assertResult(0, "Wrong null count")(buffer.getInt())
       assert(!buffer.hasRemaining)
     }
@@ -68,7 +67,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
 
       val buffer = columnBuilder.build()
 
-      assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
       assertResult(0, "Wrong null count")(buffer.getInt())
     }
 
@@ -84,7 +82,6 @@ class NullableColumnBuilderSuite extends SparkFunSuite {
 
       val buffer = columnBuilder.build()
 
-      assertResult(columnType.typeId, "Wrong column type ID")(buffer.getInt())
       assertResult(4, "Wrong null count")(buffer.getInt())
 
       // For null positions


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org