You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by pw...@apache.org on 2014/03/23 20:09:21 UTC

git commit: [SPARK-1292] In-memory columnar representation for Spark SQL

Repository: spark
Updated Branches:
  refs/heads/master abf6714e2 -> 57a4379c0


[SPARK-1292] In-memory columnar representation for Spark SQL

This PR is rebased from the Catalyst repository, and contains the first version of in-memory columnar representation for Spark SQL. Compression support is not included yet and will be added later in a separate PR.

Author: Cheng Lian <li...@databricks.com>
Author: Cheng Lian <li...@gmail.com>

Closes #205 from liancheng/memColumnarSupport and squashes the following commits:

99dba41 [Cheng Lian] Restricted new objects/classes to `private[sql]'
0892ad8 [Cheng Lian] Addressed ScalaStyle issues
af1ad5e [Cheng Lian] Fixed some minor issues introduced during rebasing
0dbf2fb [Cheng Lian] Make necessary renaming due to rebase
a162d4d [Cheng Lian] Removed the unnecessary InMemoryColumnarRelation class
9bcae4b [Cheng Lian] Added Apache license
220ee1e [Cheng Lian] Added table scan operator for in-memory columnar support.
c701c7a [Cheng Lian] Using SparkSqlSerializer for generic object SerDe causes error, made a workaround
ed8608e [Cheng Lian] Added implicit conversion from DataType to ColumnType
b8a645a [Cheng Lian] Replaced KryoSerializer with an updated SparkSqlSerializer
b6c0a49 [Cheng Lian] Minor test suite refactoring
214be73 [Cheng Lian] Refactored BINARY and GENERIC to reduce duplicate code
da2f4d5 [Cheng Lian] Added Apache license
dbf7a38 [Cheng Lian] Added ColumnAccessor and test suite, refactored ColumnBuilder
c01a177 [Cheng Lian] Added column builder classes and test suite
f18ddc6 [Cheng Lian] Added ColumnTypes and test suite
2d09066 [Cheng Lian] Added KryoSerializer
34f3c19 [Cheng Lian] Added TypeTag field to all NativeTypes
acc5c48 [Cheng Lian] Added Hive test files to .gitignore


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/57a4379c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/57a4379c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/57a4379c

Branch: refs/heads/master
Commit: 57a4379c031e5d5901ba580422207d6aa2f19749
Parents: abf6714
Author: Cheng Lian <li...@databricks.com>
Authored: Sun Mar 23 12:08:55 2014 -0700
Committer: Patrick Wendell <pw...@gmail.com>
Committed: Sun Mar 23 12:08:55 2014 -0700

----------------------------------------------------------------------
 .../spark/sql/catalyst/types/dataTypes.scala    |  14 +-
 .../spark/sql/columnar/ColumnAccessor.scala     | 175 ++++++++++++++++
 .../spark/sql/columnar/ColumnBuilder.scala      | 187 +++++++++++++++++
 .../apache/spark/sql/columnar/ColumnType.scala  | 198 ++++++++++++++++++
 .../sql/columnar/NullableColumnAccessor.scala   |  57 ++++++
 .../sql/columnar/NullableColumnBuilder.scala    |  83 ++++++++
 .../columnar/inMemoryColumnarOperators.scala    |  80 ++++++++
 .../apache/spark/sql/execution/Exchange.scala   |  33 ---
 .../sql/execution/SparkSqlSerializer.scala      |  73 +++++++
 .../scala/org/apache/spark/sql/QueryTest.scala  |   2 +-
 .../spark/sql/columnar/ColumnTypeSuite.scala    | 204 +++++++++++++++++++
 .../spark/sql/columnar/ColumnarQuerySuite.scala |  34 ++++
 .../spark/sql/columnar/ColumnarTestData.scala   |  55 +++++
 .../columnar/NullableColumnAccessorSuite.scala  |  61 ++++++
 .../columnar/NullableColumnBuilderSuite.scala   |  94 +++++++++
 15 files changed, 1315 insertions(+), 35 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
index 6eb2b62..90a9f9f 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/types/dataTypes.scala
@@ -19,7 +19,9 @@ package org.apache.spark.sql
 package catalyst
 package types
 
-import expressions.Expression
+import scala.reflect.runtime.universe.{typeTag, TypeTag}
+
+import org.apache.spark.sql.catalyst.expressions.Expression
 
 abstract class DataType {
   /** Matches any expression that evaluates to this DataType */
@@ -33,11 +35,13 @@ case object NullType extends DataType
 
 abstract class NativeType extends DataType {
   type JvmType
+  @transient val tag: TypeTag[JvmType]
   val ordering: Ordering[JvmType]
 }
 
 case object StringType extends NativeType {
   type JvmType = String
+  @transient lazy val tag = typeTag[JvmType]
   val ordering = implicitly[Ordering[JvmType]]
 }
 case object BinaryType extends DataType {
@@ -45,6 +49,7 @@ case object BinaryType extends DataType {
 }
 case object BooleanType extends NativeType {
   type JvmType = Boolean
+  @transient lazy val tag = typeTag[JvmType]
   val ordering = implicitly[Ordering[JvmType]]
 }
 
@@ -71,6 +76,7 @@ abstract class IntegralType extends NumericType {
 
 case object LongType extends IntegralType {
   type JvmType = Long
+  @transient lazy val tag = typeTag[JvmType]
   val numeric = implicitly[Numeric[Long]]
   val integral = implicitly[Integral[Long]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -78,6 +84,7 @@ case object LongType extends IntegralType {
 
 case object IntegerType extends IntegralType {
   type JvmType = Int
+  @transient lazy val tag = typeTag[JvmType]
   val numeric = implicitly[Numeric[Int]]
   val integral = implicitly[Integral[Int]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -85,6 +92,7 @@ case object IntegerType extends IntegralType {
 
 case object ShortType extends IntegralType {
   type JvmType = Short
+  @transient lazy val tag = typeTag[JvmType]
   val numeric = implicitly[Numeric[Short]]
   val integral = implicitly[Integral[Short]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -92,6 +100,7 @@ case object ShortType extends IntegralType {
 
 case object ByteType extends IntegralType {
   type JvmType = Byte
+  @transient lazy val tag = typeTag[JvmType]
   val numeric = implicitly[Numeric[Byte]]
   val integral = implicitly[Integral[Byte]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -110,6 +119,7 @@ abstract class FractionalType extends NumericType {
 
 case object DecimalType extends FractionalType {
   type JvmType = BigDecimal
+  @transient lazy val tag = typeTag[JvmType]
   val numeric = implicitly[Numeric[BigDecimal]]
   val fractional = implicitly[Fractional[BigDecimal]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -117,6 +127,7 @@ case object DecimalType extends FractionalType {
 
 case object DoubleType extends FractionalType {
   type JvmType = Double
+  @transient lazy val tag = typeTag[JvmType]
   val numeric = implicitly[Numeric[Double]]
   val fractional = implicitly[Fractional[Double]]
   val ordering = implicitly[Ordering[JvmType]]
@@ -124,6 +135,7 @@ case object DoubleType extends FractionalType {
 
 case object FloatType extends FractionalType {
   type JvmType = Float
+  @transient lazy val tag = typeTag[JvmType]
   val numeric = implicitly[Numeric[Float]]
   val fractional = implicitly[Fractional[Float]]
   val ordering = implicitly[Ordering[JvmType]]

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
new file mode 100644
index 0000000..ddbeba6
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnAccessor.scala
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+import org.apache.spark.sql.catalyst.types.{BinaryType, NativeType, DataType}
+import org.apache.spark.sql.catalyst.expressions.MutableRow
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+/**
+ * An `Iterator` like trait used to extract values from columnar byte buffer. When a value is
+ * extracted from the buffer, instead of directly returning it, the value is set into some field of
+ * a [[MutableRow]]. In this way, boxing cost can be avoided by leveraging the setter methods
+ * for primitive values provided by [[MutableRow]].
+ */
+private[sql] trait ColumnAccessor {
+  initialize()
+
+  protected def initialize()
+
+  def hasNext: Boolean
+
+  def extractTo(row: MutableRow, ordinal: Int)
+
+  protected def underlyingBuffer: ByteBuffer
+}
+
+private[sql] abstract class BasicColumnAccessor[T <: DataType, JvmType](buffer: ByteBuffer)
+  extends ColumnAccessor {
+
+  protected def initialize() {}
+
+  def columnType: ColumnType[T, JvmType]
+
+  def hasNext = buffer.hasRemaining
+
+  def extractTo(row: MutableRow, ordinal: Int) {
+    doExtractTo(row, ordinal)
+  }
+
+  protected def doExtractTo(row: MutableRow, ordinal: Int)
+
+  protected def underlyingBuffer = buffer
+}
+
+private[sql] abstract class NativeColumnAccessor[T <: NativeType](
+    buffer: ByteBuffer,
+    val columnType: NativeColumnType[T])
+  extends BasicColumnAccessor[T, T#JvmType](buffer)
+  with NullableColumnAccessor
+
+private[sql] class BooleanColumnAccessor(buffer: ByteBuffer)
+  extends NativeColumnAccessor(buffer, BOOLEAN) {
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row.setBoolean(ordinal, columnType.extract(buffer))
+  }
+}
+
+private[sql] class IntColumnAccessor(buffer: ByteBuffer)
+  extends NativeColumnAccessor(buffer, INT) {
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row.setInt(ordinal, columnType.extract(buffer))
+  }
+}
+
+private[sql] class ShortColumnAccessor(buffer: ByteBuffer)
+  extends NativeColumnAccessor(buffer, SHORT) {
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row.setShort(ordinal, columnType.extract(buffer))
+  }
+}
+
+private[sql] class LongColumnAccessor(buffer: ByteBuffer)
+  extends NativeColumnAccessor(buffer, LONG) {
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row.setLong(ordinal, columnType.extract(buffer))
+  }
+}
+
+private[sql] class ByteColumnAccessor(buffer: ByteBuffer)
+  extends NativeColumnAccessor(buffer, BYTE) {
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row.setByte(ordinal, columnType.extract(buffer))
+  }
+}
+
+private[sql] class DoubleColumnAccessor(buffer: ByteBuffer)
+  extends NativeColumnAccessor(buffer, DOUBLE) {
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row.setDouble(ordinal, columnType.extract(buffer))
+  }
+}
+
+private[sql] class FloatColumnAccessor(buffer: ByteBuffer)
+  extends NativeColumnAccessor(buffer, FLOAT) {
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row.setFloat(ordinal, columnType.extract(buffer))
+  }
+}
+
+private[sql] class StringColumnAccessor(buffer: ByteBuffer)
+  extends NativeColumnAccessor(buffer, STRING) {
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row.setString(ordinal, columnType.extract(buffer))
+  }
+}
+
+private[sql] class BinaryColumnAccessor(buffer: ByteBuffer)
+  extends BasicColumnAccessor[BinaryType.type, Array[Byte]](buffer)
+  with NullableColumnAccessor {
+
+  def columnType = BINARY
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    row(ordinal) = columnType.extract(buffer)
+  }
+}
+
+private[sql] class GenericColumnAccessor(buffer: ByteBuffer)
+  extends BasicColumnAccessor[DataType, Array[Byte]](buffer)
+  with NullableColumnAccessor {
+
+  def columnType = GENERIC
+
+  override protected def doExtractTo(row: MutableRow, ordinal: Int) {
+    val serialized = columnType.extract(buffer)
+    row(ordinal) = SparkSqlSerializer.deserialize[Any](serialized)
+  }
+}
+
+private[sql] object ColumnAccessor {
+  def apply(b: ByteBuffer): ColumnAccessor = {
+    // The first 4 bytes in the buffer indicates the column type.
+    val buffer = b.duplicate().order(ByteOrder.nativeOrder())
+    val columnTypeId = buffer.getInt()
+
+    columnTypeId match {
+      case INT.typeId     => new IntColumnAccessor(buffer)
+      case LONG.typeId    => new LongColumnAccessor(buffer)
+      case FLOAT.typeId   => new FloatColumnAccessor(buffer)
+      case DOUBLE.typeId  => new DoubleColumnAccessor(buffer)
+      case BOOLEAN.typeId => new BooleanColumnAccessor(buffer)
+      case BYTE.typeId    => new ByteColumnAccessor(buffer)
+      case SHORT.typeId   => new ShortColumnAccessor(buffer)
+      case STRING.typeId  => new StringColumnAccessor(buffer)
+      case BINARY.typeId  => new BinaryColumnAccessor(buffer)
+      case GENERIC.typeId => new GenericColumnAccessor(buffer)
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
new file mode 100644
index 0000000..6bd1841
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnBuilder.scala
@@ -0,0 +1,187 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+private[sql] trait ColumnBuilder {
+  /**
+   * Initializes with an approximate lower bound on the expected number of elements in this column.
+   */
+  def initialize(initialSize: Int, columnName: String = "")
+
+  def appendFrom(row: Row, ordinal: Int)
+
+  def build(): ByteBuffer
+}
+
+private[sql] abstract class BasicColumnBuilder[T <: DataType, JvmType] extends ColumnBuilder {
+  import ColumnBuilder._
+
+  private var columnName: String = _
+  protected var buffer: ByteBuffer = _
+
+  def columnType: ColumnType[T, JvmType]
+
+  override def initialize(initialSize: Int, columnName: String = "") = {
+    val size = if (initialSize == 0) DEFAULT_INITIAL_BUFFER_SIZE else initialSize
+    this.columnName = columnName
+    buffer = ByteBuffer.allocate(4 + 4 + size * columnType.defaultSize)
+    buffer.order(ByteOrder.nativeOrder()).putInt(columnType.typeId)
+  }
+
+  // Have to give a concrete implementation to make mixin possible
+  override def appendFrom(row: Row, ordinal: Int) {
+    doAppendFrom(row, ordinal)
+  }
+
+  // Concrete `ColumnBuilder`s can override this method to append values
+  protected def doAppendFrom(row: Row, ordinal: Int)
+
+  // Helper method to append primitive values (to avoid boxing cost)
+  protected def appendValue(v: JvmType) {
+    buffer = ensureFreeSpace(buffer, columnType.actualSize(v))
+    columnType.append(v, buffer)
+  }
+
+  override def build() = {
+    buffer.limit(buffer.position()).rewind()
+    buffer
+  }
+}
+
+private[sql] abstract class NativeColumnBuilder[T <: NativeType](
+    val columnType: NativeColumnType[T])
+  extends BasicColumnBuilder[T, T#JvmType]
+  with NullableColumnBuilder
+
+private[sql] class BooleanColumnBuilder extends NativeColumnBuilder(BOOLEAN) {
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row.getBoolean(ordinal))
+  }
+}
+
+private[sql] class IntColumnBuilder extends NativeColumnBuilder(INT) {
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row.getInt(ordinal))
+  }
+}
+
+private[sql] class ShortColumnBuilder extends NativeColumnBuilder(SHORT) {
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row.getShort(ordinal))
+  }
+}
+
+private[sql] class LongColumnBuilder extends NativeColumnBuilder(LONG) {
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row.getLong(ordinal))
+  }
+}
+
+private[sql] class ByteColumnBuilder extends NativeColumnBuilder(BYTE) {
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row.getByte(ordinal))
+  }
+}
+
+private[sql] class DoubleColumnBuilder extends NativeColumnBuilder(DOUBLE) {
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row.getDouble(ordinal))
+  }
+}
+
+private[sql] class FloatColumnBuilder extends NativeColumnBuilder(FLOAT) {
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row.getFloat(ordinal))
+  }
+}
+
+private[sql] class StringColumnBuilder extends NativeColumnBuilder(STRING) {
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row.getString(ordinal))
+  }
+}
+
+private[sql] class BinaryColumnBuilder
+  extends BasicColumnBuilder[BinaryType.type, Array[Byte]]
+  with NullableColumnBuilder {
+
+  def columnType = BINARY
+
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    appendValue(row(ordinal).asInstanceOf[Array[Byte]])
+  }
+}
+
+// TODO (lian) Add support for array, struct and map
+private[sql] class GenericColumnBuilder
+  extends BasicColumnBuilder[DataType, Array[Byte]]
+  with NullableColumnBuilder {
+
+  def columnType = GENERIC
+
+  override def doAppendFrom(row: Row, ordinal: Int) {
+    val serialized = SparkSqlSerializer.serialize(row(ordinal))
+    buffer = ColumnBuilder.ensureFreeSpace(buffer, columnType.actualSize(serialized))
+    columnType.append(serialized, buffer)
+  }
+}
+
+private[sql] object ColumnBuilder {
+  val DEFAULT_INITIAL_BUFFER_SIZE = 10 * 1024 * 104
+
+  private[columnar] def ensureFreeSpace(orig: ByteBuffer, size: Int) = {
+    if (orig.remaining >= size) {
+      orig
+    } else {
+      // grow in steps of initial size
+      val capacity = orig.capacity()
+      val newSize = capacity + size.max(capacity / 8 + 1)
+      val pos = orig.position()
+
+      orig.clear()
+      ByteBuffer
+        .allocate(newSize)
+        .order(ByteOrder.nativeOrder())
+        .put(orig.array(), 0, pos)
+    }
+  }
+
+  def apply(typeId: Int, initialSize: Int = 0, columnName: String = ""): ColumnBuilder = {
+    val builder = (typeId match {
+      case INT.typeId     => new IntColumnBuilder
+      case LONG.typeId    => new LongColumnBuilder
+      case FLOAT.typeId   => new FloatColumnBuilder
+      case DOUBLE.typeId  => new DoubleColumnBuilder
+      case BOOLEAN.typeId => new BooleanColumnBuilder
+      case BYTE.typeId    => new ByteColumnBuilder
+      case SHORT.typeId   => new ShortColumnBuilder
+      case STRING.typeId  => new StringColumnBuilder
+      case BINARY.typeId  => new BinaryColumnBuilder
+      case GENERIC.typeId => new GenericColumnBuilder
+    }).asInstanceOf[ColumnBuilder]
+
+    builder.initialize(initialSize, columnName)
+    builder
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
new file mode 100644
index 0000000..3b759a5
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/ColumnType.scala
@@ -0,0 +1,198 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql
+package columnar
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.sql.catalyst.types._
+
+/**
+ * An abstract class that represents type of a column. Used to append/extract Java objects into/from
+ * the underlying [[ByteBuffer]] of a column.
+ *
+ * @param typeId A unique ID representing the type.
+ * @param defaultSize Default size in bytes for one element of type T (e.g. 4 for `Int`).
+ * @tparam T Scala data type for the column.
+ * @tparam JvmType Underlying Java type to represent the elements.
+ */
+private[sql] sealed abstract class ColumnType[T <: DataType, JvmType](
+    val typeId: Int,
+    val defaultSize: Int) {
+
+  /**
+   * Extracts a value out of the buffer at the buffer's current position.
+   */
+  def extract(buffer: ByteBuffer): JvmType
+
+  /**
+   * Appends the given value v of type T into the given ByteBuffer.
+   */
+  def append(v: JvmType, buffer: ByteBuffer)
+
+  /**
+   * Returns the size of the value. This is used to calculate the size of variable length types
+   * such as byte arrays and strings.
+   */
+  def actualSize(v: JvmType): Int = defaultSize
+
+  /**
+   * Creates a duplicated copy of the value.
+   */
+  def clone(v: JvmType): JvmType = v
+}
+
+private[sql] abstract class NativeColumnType[T <: NativeType](
+    val dataType: T,
+    typeId: Int,
+    defaultSize: Int)
+  extends ColumnType[T, T#JvmType](typeId, defaultSize) {
+
+  /**
+   * Scala TypeTag. Can be used to create primitive arrays and hash tables.
+   */
+  def scalaTag = dataType.tag
+}
+
+private[sql] object INT extends NativeColumnType(IntegerType, 0, 4) {
+  def append(v: Int, buffer: ByteBuffer) {
+    buffer.putInt(v)
+  }
+
+  def extract(buffer: ByteBuffer) = {
+    buffer.getInt()
+  }
+}
+
+private[sql] object LONG extends NativeColumnType(LongType, 1, 8) {
+  override def append(v: Long, buffer: ByteBuffer) {
+    buffer.putLong(v)
+  }
+
+  override def extract(buffer: ByteBuffer) = {
+    buffer.getLong()
+  }
+}
+
+private[sql] object FLOAT extends NativeColumnType(FloatType, 2, 4) {
+  override def append(v: Float, buffer: ByteBuffer) {
+    buffer.putFloat(v)
+  }
+
+  override def extract(buffer: ByteBuffer) = {
+    buffer.getFloat()
+  }
+}
+
+private[sql] object DOUBLE extends NativeColumnType(DoubleType, 3, 8) {
+  override def append(v: Double, buffer: ByteBuffer) {
+    buffer.putDouble(v)
+  }
+
+  override def extract(buffer: ByteBuffer) = {
+    buffer.getDouble()
+  }
+}
+
+private[sql] object BOOLEAN extends NativeColumnType(BooleanType, 4, 1) {
+  override def append(v: Boolean, buffer: ByteBuffer) {
+    buffer.put(if (v) 1.toByte else 0.toByte)
+  }
+
+  override def extract(buffer: ByteBuffer) = {
+    if (buffer.get() == 1) true else false
+  }
+}
+
+private[sql] object BYTE extends NativeColumnType(ByteType, 5, 1) {
+  override def append(v: Byte, buffer: ByteBuffer) {
+    buffer.put(v)
+  }
+
+  override def extract(buffer: ByteBuffer) = {
+    buffer.get()
+  }
+}
+
+private[sql] object SHORT extends NativeColumnType(ShortType, 6, 2) {
+  override def append(v: Short, buffer: ByteBuffer) {
+    buffer.putShort(v)
+  }
+
+  override def extract(buffer: ByteBuffer) = {
+    buffer.getShort()
+  }
+}
+
+private[sql] object STRING extends NativeColumnType(StringType, 7, 8) {
+  override def actualSize(v: String): Int = v.getBytes.length + 4
+
+  override def append(v: String, buffer: ByteBuffer) {
+    val stringBytes = v.getBytes()
+    buffer.putInt(stringBytes.length).put(stringBytes, 0, stringBytes.length)
+  }
+
+  override def extract(buffer: ByteBuffer) = {
+    val length = buffer.getInt()
+    val stringBytes = new Array[Byte](length)
+    buffer.get(stringBytes, 0, length)
+    new String(stringBytes)
+  }
+}
+
+private[sql] sealed abstract class ByteArrayColumnType[T <: DataType](
+    typeId: Int,
+    defaultSize: Int)
+  extends ColumnType[T, Array[Byte]](typeId, defaultSize) {
+
+  override def actualSize(v: Array[Byte]) = v.length + 4
+
+  override def append(v: Array[Byte], buffer: ByteBuffer) {
+    buffer.putInt(v.length).put(v, 0, v.length)
+  }
+
+  override def extract(buffer: ByteBuffer) = {
+    val length = buffer.getInt()
+    val bytes = new Array[Byte](length)
+    buffer.get(bytes, 0, length)
+    bytes
+  }
+}
+
+private[sql] object BINARY extends ByteArrayColumnType[BinaryType.type](8, 16)
+
+// Used to process generic objects (all types other than those listed above). Objects should be
+// serialized first before appending to the column `ByteBuffer`, and is also extracted as serialized
+// byte array.
+private[sql] object GENERIC extends ByteArrayColumnType[DataType](9, 16)
+
+private[sql] object ColumnType {
+  implicit def dataTypeToColumnType(dataType: DataType): ColumnType[_, _] = {
+    dataType match {
+      case IntegerType => INT
+      case LongType    => LONG
+      case FloatType   => FLOAT
+      case DoubleType  => DOUBLE
+      case BooleanType => BOOLEAN
+      case ByteType    => BYTE
+      case ShortType   => SHORT
+      case StringType  => STRING
+      case BinaryType  => BINARY
+      case _           => GENERIC
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
new file mode 100644
index 0000000..2970c60
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnAccessor.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+import org.apache.spark.sql.catalyst.expressions.MutableRow
+
+private[sql] trait NullableColumnAccessor extends ColumnAccessor {
+  private var nullsBuffer: ByteBuffer = _
+  private var nullCount: Int = _
+  private var seenNulls: Int = 0
+
+  private var nextNullIndex: Int = _
+  private var pos: Int = 0
+
+  abstract override def initialize() {
+    nullsBuffer = underlyingBuffer.duplicate().order(ByteOrder.nativeOrder())
+    nullCount = nullsBuffer.getInt()
+    nextNullIndex = if (nullCount > 0) nullsBuffer.getInt() else -1
+    pos = 0
+
+    underlyingBuffer.position(underlyingBuffer.position + 4 + nullCount * 4)
+    super.initialize()
+  }
+
+  abstract override def extractTo(row: MutableRow, ordinal: Int) {
+    if (pos == nextNullIndex) {
+      seenNulls += 1
+
+      if (seenNulls < nullCount) {
+        nextNullIndex = nullsBuffer.getInt()
+      }
+
+      row.setNullAt(ordinal)
+    } else {
+      super.extractTo(row, ordinal)
+    }
+
+    pos += 1
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
new file mode 100644
index 0000000..1661c3f
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/NullableColumnBuilder.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import java.nio.{ByteOrder, ByteBuffer}
+
+/**
+ * Builds a nullable column. The byte buffer of a nullable column contains:
+ * - 4 bytes for the null count (number of nulls)
+ * - positions for each null, in ascending order
+ * - the non-null data (column data type, compression type, data...)
+ */
+private[sql] trait NullableColumnBuilder extends ColumnBuilder {
+  private var nulls: ByteBuffer = _
+  private var pos: Int = _
+  private var nullCount: Int = _
+
+  abstract override def initialize(initialSize: Int, columnName: String) {
+    nulls = ByteBuffer.allocate(1024)
+    nulls.order(ByteOrder.nativeOrder())
+    pos = 0
+    nullCount = 0
+    super.initialize(initialSize, columnName)
+  }
+
+  abstract override def appendFrom(row: Row, ordinal: Int) {
+    if (row.isNullAt(ordinal)) {
+      nulls = ColumnBuilder.ensureFreeSpace(nulls, 4)
+      nulls.putInt(pos)
+      nullCount += 1
+    } else {
+      super.appendFrom(row, ordinal)
+    }
+    pos += 1
+  }
+
+  abstract override def build(): ByteBuffer = {
+    val nonNulls = super.build()
+    val typeId = nonNulls.getInt()
+    val nullDataLen = nulls.position()
+
+    nulls.limit(nullDataLen)
+    nulls.rewind()
+
+    // Column type ID is moved to the front, follows the null count, then non-null data
+    //
+    //      +---------+
+    //      | 4 bytes | Column type ID
+    //      +---------+
+    //      | 4 bytes | Null count
+    //      +---------+
+    //      |   ...   | Null positions (if null count is not zero)
+    //      +---------+
+    //      |   ...   | Non-null part (without column type ID)
+    //      +---------+
+    val buffer = ByteBuffer
+      .allocate(4 + nullDataLen + nonNulls.limit)
+      .order(ByteOrder.nativeOrder())
+      .putInt(typeId)
+      .putInt(nullCount)
+      .put(nulls)
+      .put(nonNulls)
+
+    buffer.rewind()
+    buffer
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala
new file mode 100644
index 0000000..c7efd30
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/columnar/inMemoryColumnarOperators.scala
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import org.apache.spark.sql.catalyst.expressions.{GenericMutableRow, Attribute}
+import org.apache.spark.sql.execution.{SparkPlan, LeafNode}
+
+private[sql] case class InMemoryColumnarTableScan(attributes: Seq[Attribute], child: SparkPlan)
+  extends LeafNode {
+
+  // For implicit conversion from `DataType` to `ColumnType`
+  import ColumnType._
+
+  override def output: Seq[Attribute] = attributes
+
+  lazy val cachedColumnBuffers = {
+    val output = child.output
+    val cached = child.execute().mapPartitions { iterator =>
+      val columnBuilders = output.map { a =>
+        ColumnBuilder(a.dataType.typeId, 0, a.name)
+      }.toArray
+
+      var row: Row = null
+      while (iterator.hasNext) {
+        row = iterator.next()
+        var i = 0
+        while (i < row.length) {
+          columnBuilders(i).appendFrom(row, i)
+          i += 1
+        }
+      }
+
+      Iterator.single(columnBuilders.map(_.build()))
+    }.cache()
+
+    cached.setName(child.toString)
+    // Force the materialization of the cached RDD.
+    cached.count()
+    cached
+  }
+
+  override def execute() = {
+    cachedColumnBuffers.mapPartitions { iterator =>
+      val columnBuffers = iterator.next()
+      assert(!iterator.hasNext)
+
+      new Iterator[Row] {
+        val columnAccessors = columnBuffers.map(ColumnAccessor(_))
+        val nextRow = new GenericMutableRow(columnAccessors.length)
+
+        override def next() = {
+          var i = 0
+          while (i < nextRow.length) {
+            columnAccessors(i).extractTo(nextRow, i)
+            i += 1
+          }
+          nextRow
+        }
+
+        override def hasNext = columnAccessors.head.hasNext
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
index 72dc5ec..e934c4c 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/Exchange.scala
@@ -18,14 +18,8 @@
 package org.apache.spark.sql
 package execution
 
-import java.nio.ByteBuffer
-
-import com.esotericsoftware.kryo.{Kryo, Serializer}
-import com.esotericsoftware.kryo.io.{Output, Input}
-
 import org.apache.spark.{SparkConf, RangePartitioner, HashPartitioner}
 import org.apache.spark.rdd.ShuffledRDD
-import org.apache.spark.serializer.KryoSerializer
 import org.apache.spark.util.MutablePair
 
 import catalyst.rules.Rule
@@ -33,33 +27,6 @@ import catalyst.errors._
 import catalyst.expressions._
 import catalyst.plans.physical._
 
-private class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
-  override def newKryo(): Kryo = {
-    val kryo = new Kryo
-    kryo.setRegistrationRequired(true)
-    kryo.register(classOf[MutablePair[_,_]])
-    kryo.register(classOf[Array[Any]])
-    kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
-    kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
-    kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
-    kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
-    kryo.setReferences(false)
-    kryo.setClassLoader(this.getClass.getClassLoader)
-    kryo
-  }
-}
-
-private class BigDecimalSerializer extends Serializer[BigDecimal] {
-  def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
-    // TODO: There are probably more efficient representations than strings...
-    output.writeString(bd.toString)
-  }
-
-  def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
-    BigDecimal(input.readString())
-  }
-}
-
 case class Exchange(newPartitioning: Partitioning, child: SparkPlan) extends UnaryNode {
 
   override def outputPartitioning = newPartitioning

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
new file mode 100644
index 0000000..ad7cd58
--- /dev/null
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/SparkSqlSerializer.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package execution
+
+import java.nio.ByteBuffer
+
+import com.esotericsoftware.kryo.io.{Input, Output}
+import com.esotericsoftware.kryo.{Serializer, Kryo}
+
+import org.apache.spark.{SparkEnv, SparkConf}
+import org.apache.spark.serializer.KryoSerializer
+import org.apache.spark.util.MutablePair
+
+class SparkSqlSerializer(conf: SparkConf) extends KryoSerializer(conf) {
+  override def newKryo(): Kryo = {
+    val kryo = new Kryo()
+    kryo.setRegistrationRequired(false)
+    kryo.register(classOf[MutablePair[_, _]])
+    kryo.register(classOf[Array[Any]])
+    kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericRow])
+    kryo.register(classOf[org.apache.spark.sql.catalyst.expressions.GenericMutableRow])
+    kryo.register(classOf[scala.collection.mutable.ArrayBuffer[_]])
+    kryo.register(classOf[scala.math.BigDecimal], new BigDecimalSerializer)
+    kryo.setReferences(false)
+    kryo.setClassLoader(this.getClass.getClassLoader)
+    kryo
+  }
+}
+
+object SparkSqlSerializer {
+  // TODO (lian) Using KryoSerializer here is workaround, needs further investigation
+  // Using SparkSqlSerializer here makes BasicQuerySuite to fail because of Kryo serialization
+  // related error.
+  @transient lazy val ser: KryoSerializer = {
+    val sparkConf = Option(SparkEnv.get).map(_.conf).getOrElse(new SparkConf())
+    new KryoSerializer(sparkConf)
+  }
+
+  def serialize[T](o: T): Array[Byte] = {
+    ser.newInstance().serialize(o).array()
+  }
+
+  def deserialize[T](bytes: Array[Byte]): T  = {
+    ser.newInstance().deserialize[T](ByteBuffer.wrap(bytes))
+  }
+}
+
+class BigDecimalSerializer extends Serializer[BigDecimal] {
+  def write(kryo: Kryo, output: Output, bd: math.BigDecimal) {
+    // TODO: There are probably more efficient representations than strings...
+    output.writeString(bd.toString())
+  }
+
+  def read(kryo: Kryo, input: Input, tpe: Class[BigDecimal]): BigDecimal = {
+    BigDecimal(input.readString())
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
index 728fece..aa84211 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/QueryTest.scala
@@ -33,7 +33,7 @@ import TestSQLContext._
 class QueryTest extends FunSuite {
   /**
    * Runs the plan and makes sure the answer matches the expected result.
-   * @param plan the query to be executed
+   * @param rdd the [[SchemaRDD]] to be executed
    * @param expectedAnswer the expected result, can either be an Any, Seq[Product], or Seq[ Seq[Any] ].
    */
   protected def checkAnswer(rdd: SchemaRDD, expectedAnswer: Any): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
new file mode 100644
index 0000000..c7aaaae
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnTypeSuite.scala
@@ -0,0 +1,204 @@
+package org.apache.spark.sql
+package columnar
+
+import java.nio.ByteBuffer
+
+import scala.util.Random
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.types._
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+class ColumnTypeSuite extends FunSuite {
+  val columnTypes = Seq(INT, SHORT, LONG, BYTE, DOUBLE, FLOAT, STRING, BINARY, GENERIC)
+
+  test("defaultSize") {
+    val defaultSize = Seq(4, 2, 8, 1, 8, 4, 8, 16, 16)
+
+    columnTypes.zip(defaultSize).foreach { case (columnType, size) =>
+      assert(columnType.defaultSize === size)
+    }
+  }
+
+  test("actualSize") {
+    val expectedSizes = Seq(4, 2, 8, 1, 8, 4, 4 + 5, 4 + 4, 4 + 11)
+    val actualSizes = Seq(
+      INT.actualSize(Int.MaxValue),
+      SHORT.actualSize(Short.MaxValue),
+      LONG.actualSize(Long.MaxValue),
+      BYTE.actualSize(Byte.MaxValue),
+      DOUBLE.actualSize(Double.MaxValue),
+      FLOAT.actualSize(Float.MaxValue),
+      STRING.actualSize("hello"),
+      BINARY.actualSize(new Array[Byte](4)),
+      GENERIC.actualSize(SparkSqlSerializer.serialize(Map(1 -> "a"))))
+
+    expectedSizes.zip(actualSizes).foreach { case (expected, actual) =>
+      assert(expected === actual)
+    }
+  }
+
+  testNumericColumnType[BooleanType.type, Boolean](
+    BOOLEAN,
+    Array.fill(4)(Random.nextBoolean()),
+    ByteBuffer.allocate(32),
+    (buffer: ByteBuffer, v: Boolean) => {
+      buffer.put((if (v) 1 else 0).toByte)
+    },
+    (buffer: ByteBuffer) => {
+      buffer.get() == 1
+    })
+
+  testNumericColumnType[IntegerType.type, Int](
+    INT,
+    Array.fill(4)(Random.nextInt()),
+    ByteBuffer.allocate(32),
+    (_: ByteBuffer).putInt(_),
+    (_: ByteBuffer).getInt)
+
+  testNumericColumnType[ShortType.type, Short](
+    SHORT,
+    Array.fill(4)(Random.nextInt(Short.MaxValue).asInstanceOf[Short]),
+    ByteBuffer.allocate(32),
+    (_: ByteBuffer).putShort(_),
+    (_: ByteBuffer).getShort)
+
+  testNumericColumnType[LongType.type, Long](
+    LONG,
+    Array.fill(4)(Random.nextLong()),
+    ByteBuffer.allocate(64),
+    (_: ByteBuffer).putLong(_),
+    (_: ByteBuffer).getLong)
+
+  testNumericColumnType[ByteType.type, Byte](
+    BYTE,
+    Array.fill(4)(Random.nextInt(Byte.MaxValue).asInstanceOf[Byte]),
+    ByteBuffer.allocate(64),
+    (_: ByteBuffer).put(_),
+    (_: ByteBuffer).get)
+
+  testNumericColumnType[DoubleType.type, Double](
+    DOUBLE,
+    Array.fill(4)(Random.nextDouble()),
+    ByteBuffer.allocate(64),
+    (_: ByteBuffer).putDouble(_),
+    (_: ByteBuffer).getDouble)
+
+  testNumericColumnType[FloatType.type, Float](
+    FLOAT,
+    Array.fill(4)(Random.nextFloat()),
+    ByteBuffer.allocate(64),
+    (_: ByteBuffer).putFloat(_),
+    (_: ByteBuffer).getFloat)
+
+  test("STRING") {
+    val buffer = ByteBuffer.allocate(128)
+    val seq = Array("hello", "world", "spark", "sql")
+
+    seq.map(_.getBytes).foreach { bytes: Array[Byte] =>
+      buffer.putInt(bytes.length).put(bytes)
+    }
+
+    buffer.rewind()
+    seq.foreach { s =>
+      assert(s === STRING.extract(buffer))
+    }
+
+    buffer.rewind()
+    seq.foreach(STRING.append(_, buffer))
+
+    buffer.rewind()
+    seq.foreach { s =>
+      val length = buffer.getInt
+      assert(length === s.getBytes.length)
+
+      val bytes = new Array[Byte](length)
+      buffer.get(bytes, 0, length)
+      assert(s === new String(bytes))
+    }
+  }
+
+  test("BINARY") {
+    val buffer = ByteBuffer.allocate(128)
+    val seq = Array.fill(4) {
+      val bytes = new Array[Byte](4)
+      Random.nextBytes(bytes)
+      bytes
+    }
+
+    seq.foreach { bytes =>
+      buffer.putInt(bytes.length).put(bytes)
+    }
+
+    buffer.rewind()
+    seq.foreach { b =>
+      assert(b === BINARY.extract(buffer))
+    }
+
+    buffer.rewind()
+    seq.foreach(BINARY.append(_, buffer))
+
+    buffer.rewind()
+    seq.foreach { b =>
+      val length = buffer.getInt
+      assert(length === b.length)
+
+      val bytes = new Array[Byte](length)
+      buffer.get(bytes, 0, length)
+      assert(b === bytes)
+    }
+  }
+
+  test("GENERIC") {
+    val buffer = ByteBuffer.allocate(512)
+    val obj = Map(1 -> "spark", 2 -> "sql")
+    val serializedObj = SparkSqlSerializer.serialize(obj)
+
+    GENERIC.append(SparkSqlSerializer.serialize(obj), buffer)
+    buffer.rewind()
+
+    val length = buffer.getInt()
+    assert(length === serializedObj.length)
+
+    val bytes = new Array[Byte](length)
+    buffer.get(bytes, 0, length)
+    assert(obj === SparkSqlSerializer.deserialize(bytes))
+
+    buffer.rewind()
+    buffer.putInt(serializedObj.length).put(serializedObj)
+
+    buffer.rewind()
+    assert(obj === SparkSqlSerializer.deserialize(GENERIC.extract(buffer)))
+  }
+
+  def testNumericColumnType[T <: DataType, JvmType](
+      columnType: ColumnType[T, JvmType],
+      seq: Seq[JvmType],
+      buffer: ByteBuffer,
+      putter: (ByteBuffer, JvmType) => Unit,
+      getter: (ByteBuffer) => JvmType) {
+
+    val columnTypeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+    test(s"$columnTypeName.extract") {
+      buffer.rewind()
+      seq.foreach(putter(buffer, _))
+
+      buffer.rewind()
+      seq.foreach { i =>
+        assert(i === columnType.extract(buffer))
+      }
+    }
+
+    test(s"$columnTypeName.append") {
+      buffer.rewind()
+      seq.foreach(columnType.append(_, buffer))
+
+      buffer.rewind()
+      seq.foreach { i =>
+        assert(i === getter(buffer))
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
new file mode 100644
index 0000000..928851a
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarQuerySuite.scala
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import org.apache.spark.sql.execution.SparkLogicalPlan
+import org.apache.spark.sql.test.TestSQLContext
+import org.apache.spark.sql.{TestData, DslQuerySuite}
+
+class ColumnarQuerySuite extends DslQuerySuite {
+  import TestData._
+  import TestSQLContext._
+
+  test("simple columnar query") {
+    val plan = TestSQLContext.executePlan(testData.logicalPlan).executedPlan
+    val scan = SparkLogicalPlan(InMemoryColumnarTableScan(plan.output, plan))
+
+    checkAnswer(scan, testData.collect().toSeq)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala
new file mode 100644
index 0000000..ddcdede
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/ColumnarTestData.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.columnar
+
+import scala.util.Random
+
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+
+// TODO Enrich test data
+object ColumnarTestData {
+  object GenericMutableRow {
+    def apply(values: Any*) = {
+      val row = new GenericMutableRow(values.length)
+      row.indices.foreach { i =>
+        row(i) = values(i)
+      }
+      row
+    }
+  }
+
+  def randomBytes(length: Int) = {
+    val bytes = new Array[Byte](length)
+    Random.nextBytes(bytes)
+    bytes
+  }
+
+  val nonNullRandomRow = GenericMutableRow(
+    Random.nextInt(),
+    Random.nextLong(),
+    Random.nextFloat(),
+    Random.nextDouble(),
+    Random.nextBoolean(),
+    Random.nextInt(Byte.MaxValue).asInstanceOf[Byte],
+    Random.nextInt(Short.MaxValue).asInstanceOf[Short],
+    Random.nextString(Random.nextInt(64)),
+    randomBytes(Random.nextInt(64)),
+    Map(Random.nextInt() -> Random.nextString(4)))
+
+  val nullRow = GenericMutableRow(Seq.fill(10)(null): _*)
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
new file mode 100644
index 0000000..279607c
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnAccessorSuite.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import org.scalatest.FunSuite
+import org.apache.spark.sql.catalyst.types.DataType
+import org.apache.spark.sql.catalyst.expressions.GenericMutableRow
+
+class NullableColumnAccessorSuite extends FunSuite {
+  import ColumnarTestData._
+
+  Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach {
+    testNullableColumnAccessor(_)
+  }
+
+  def testNullableColumnAccessor[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) {
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+    test(s"$typeName accessor: empty column") {
+      val builder = ColumnBuilder(columnType.typeId, 4)
+      val accessor = ColumnAccessor(builder.build())
+      assert(!accessor.hasNext)
+    }
+
+    test(s"$typeName accessor: access null values") {
+      val builder = ColumnBuilder(columnType.typeId, 4)
+
+      (0 until 4).foreach { _ =>
+        builder.appendFrom(nonNullRandomRow, columnType.typeId)
+        builder.appendFrom(nullRow, columnType.typeId)
+      }
+
+      val accessor = ColumnAccessor(builder.build())
+      val row = new GenericMutableRow(1)
+
+      (0 until 4).foreach { _ =>
+        accessor.extractTo(row, 0)
+        assert(row(0) === nonNullRandomRow(columnType.typeId))
+
+        accessor.extractTo(row, 0)
+        assert(row(0) === null)
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/57a4379c/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
new file mode 100644
index 0000000..3354da3
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/columnar/NullableColumnBuilderSuite.scala
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql
+package columnar
+
+import org.scalatest.FunSuite
+
+import org.apache.spark.sql.catalyst.types.DataType
+import org.apache.spark.sql.execution.SparkSqlSerializer
+
+class NullableColumnBuilderSuite extends FunSuite {
+  import ColumnarTestData._
+
+  Seq(INT, LONG, SHORT, BOOLEAN, BYTE, STRING, DOUBLE, FLOAT, BINARY, GENERIC).foreach {
+    testNullableColumnBuilder(_)
+  }
+
+  def testNullableColumnBuilder[T <: DataType, JvmType](columnType: ColumnType[T, JvmType]) {
+    val columnBuilder = ColumnBuilder(columnType.typeId)
+    val typeName = columnType.getClass.getSimpleName.stripSuffix("$")
+
+    test(s"$typeName column builder: empty column") {
+      columnBuilder.initialize(4)
+
+      val buffer = columnBuilder.build()
+
+      // For column type ID
+      assert(buffer.getInt() === columnType.typeId)
+      // For null count
+      assert(buffer.getInt === 0)
+      assert(!buffer.hasRemaining)
+    }
+
+    test(s"$typeName column builder: buffer size auto growth") {
+      columnBuilder.initialize(4)
+
+      (0 until 4) foreach { _ =>
+        columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId)
+      }
+
+      val buffer = columnBuilder.build()
+
+      // For column type ID
+      assert(buffer.getInt() === columnType.typeId)
+      // For null count
+      assert(buffer.getInt() === 0)
+    }
+
+    test(s"$typeName column builder: null values") {
+      columnBuilder.initialize(4)
+
+      (0 until 4) foreach { _ =>
+        columnBuilder.appendFrom(nonNullRandomRow, columnType.typeId)
+        columnBuilder.appendFrom(nullRow, columnType.typeId)
+      }
+
+      val buffer = columnBuilder.build()
+
+      // For column type ID
+      assert(buffer.getInt() === columnType.typeId)
+      // For null count
+      assert(buffer.getInt() === 4)
+      // For null positions
+      (1 to 7 by 2).foreach(i => assert(buffer.getInt() === i))
+
+      // For non-null values
+      (0 until 4).foreach { _ =>
+        val actual = if (columnType == GENERIC) {
+          SparkSqlSerializer.deserialize[Any](GENERIC.extract(buffer))
+        } else {
+          columnType.extract(buffer)
+        }
+        assert(actual === nonNullRandomRow(columnType.typeId))
+      }
+
+      assert(!buffer.hasRemaining)
+    }
+  }
+}