You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/27 02:34:04 UTC

[1/2] spark git commit: [SPARK-12854][SQL] Implement complex types support in ColumnarBatch

Repository: spark
Updated Branches:
  refs/heads/master 1dac964c1 -> 555127387


http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
index 3f9ecf6..1a4b3ec 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/sources/hadoopFsRelationSuites.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.sources
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.fs.Path
@@ -122,7 +123,7 @@ abstract class HadoopFsRelationTest extends QueryTest with SQLTestUtils with Tes
         val dataGenerator = RandomDataGenerator.forType(
           dataType = dataType,
           nullable = true,
-          seed = Some(System.nanoTime())
+          new Random(System.nanoTime())
         ).getOrElse {
           fail(s"Failed to create data generator for schema $dataType")
         }

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
----------------------------------------------------------------------
diff --git a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
index 0d6b215..b29bf6a 100644
--- a/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
+++ b/unsafe/src/main/java/org/apache/spark/unsafe/Platform.java
@@ -105,6 +105,17 @@ public final class Platform {
     _UNSAFE.freeMemory(address);
   }
 
+  public static long reallocateMemory(long address, long oldSize, long newSize) {
+    long newMemory = _UNSAFE.allocateMemory(newSize);
+    copyMemory(null, address, null, newMemory, oldSize);
+    freeMemory(address);
+    return newMemory;
+  }
+
+  public static void setMemory(long address, byte value, long size) {
+    _UNSAFE.setMemory(address, size, value);
+  }
+
   public static void copyMemory(
     Object src, long srcOffset, Object dst, long dstOffset, long length) {
     // Check if dstOffset is before or after srcOffset to determine if we should copy


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-12854][SQL] Implement complex types support in ColumnarBatch

Posted by rx...@apache.org.
[SPARK-12854][SQL] Implement complex types support in ColumnarBatch

This patch adds support for complex types for ColumnarBatch. ColumnarBatch supports structs
and arrays. There is a simple mapping between the richer catalyst types to these two. Strings
are treated as an array of bytes.

ColumnarBatch will contain a column for each node of the schema. Non-complex schemas consists
of just leaf nodes. Structs represent an internal node with one child for each field. Arrays
are internal nodes with one child. Structs just contain nullability. Arrays contain offsets
and lengths into the child array. This structure is able to handle arbitrary nesting. It has
the key property that we maintain columnar throughout and that primitive types are only stored
in the leaf nodes and contiguous across rows. For example, if the schema is
```
array<array<int>>
```
There are three columns in the schema. The internal nodes each have one children. The leaf node contains all the int data stored consecutively.

As part of this, this patch adds append APIs in addition to the Put APIs (e.g. putLong(rowid, v)
vs appendLong(v)). These APIs are necessary when the batch contains variable length elements.
The vectors are not fixed length and will grow as necessary. This should make the usage a lot
simpler for the writer.

Author: Nong Li <no...@databricks.com>

Closes #10820 from nongli/spark-12854.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/55512738
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/55512738
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/55512738

Branch: refs/heads/master
Commit: 555127387accdd7c1cf236912941822ba8af0a52
Parents: 1dac964
Author: Nong Li <no...@databricks.com>
Authored: Tue Jan 26 17:34:01 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jan 26 17:34:01 2016 -0800

----------------------------------------------------------------------
 .../sql/catalyst/expressions/UnsafeRow.java     |   7 +-
 .../expressions/SpecificMutableRow.scala        |   3 +-
 .../apache/spark/sql/RandomDataGenerator.scala  |  94 ++-
 .../spark/sql/RandomDataGeneratorSuite.scala    |   4 +-
 .../codegen/GenerateUnsafeRowJoinerSuite.scala  |   5 +-
 .../sql/execution/vectorized/ColumnVector.java  | 630 ++++++++++++++++++-
 .../execution/vectorized/ColumnVectorUtils.java | 126 ++++
 .../sql/execution/vectorized/ColumnarBatch.java |  70 ++-
 .../vectorized/OffHeapColumnVector.java         | 157 ++++-
 .../vectorized/OnHeapColumnVector.java          | 169 ++++-
 .../execution/UnsafeKVExternalSorterSuite.scala |   4 +-
 .../vectorized/ColumnarBatchBenchmark.scala     |  78 ++-
 .../vectorized/ColumnarBatchSuite.scala         | 397 +++++++++++-
 .../hive/execution/AggregationQuerySuite.scala  |   3 +-
 .../sql/sources/hadoopFsRelationSuites.scala    |   3 +-
 .../java/org/apache/spark/unsafe/Platform.java  |  11 +
 16 files changed, 1671 insertions(+), 90 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
index 1a35193..a88bcbf 100644
--- a/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
+++ b/sql/catalyst/src/main/java/org/apache/spark/sql/catalyst/expressions/UnsafeRow.java
@@ -68,6 +68,10 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
     return ((numFields + 63)/ 64) * 8;
   }
 
+  public static int calculateFixedPortionByteSize(int numFields) {
+    return 8 * numFields + calculateBitSetWidthInBytes(numFields);
+  }
+
   /**
    * Field types that can be updated in place in UnsafeRows (e.g. we support set() for these types)
    */
@@ -596,10 +600,9 @@ public final class UnsafeRow extends MutableRow implements Externalizable, KryoS
   public String toString() {
     StringBuilder build = new StringBuilder("[");
     for (int i = 0; i < sizeInBytes; i += 8) {
+      if (i != 0) build.append(',');
       build.append(java.lang.Long.toHexString(Platform.getLong(baseObject, baseOffset + i)));
-      build.append(',');
     }
-    build.deleteCharAt(build.length() - 1);
     build.append(']');
     return build.toString();
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
index 475cbe0..4615c55 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/SpecificMutableRow.scala
@@ -19,7 +19,6 @@ package org.apache.spark.sql.catalyst.expressions
 
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types._
-import org.apache.spark.unsafe.types.UTF8String
 
 /**
  * A parent class for mutable container objects that are reused when the values are changed,
@@ -212,6 +211,8 @@ final class SpecificMutableRow(val values: Array[MutableValue])
 
   def this() = this(Seq.empty)
 
+  def this(schema: StructType) = this(schema.fields.map(_.dataType))
+
   override def numFields: Int = values.length
 
   override def setNullAt(i: Int): Unit = {

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
index 7614f05..55efea8 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGenerator.scala
@@ -21,6 +21,7 @@ import java.lang.Double.longBitsToDouble
 import java.lang.Float.intBitsToFloat
 import java.math.MathContext
 
+import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
@@ -74,14 +75,48 @@ object RandomDataGenerator {
    * @param numFields the number of fields in this schema
    * @param acceptedTypes types to draw from.
    */
-  def randomSchema(numFields: Int, acceptedTypes: Seq[DataType]): StructType = {
+  def randomSchema(rand: Random, numFields: Int, acceptedTypes: Seq[DataType]): StructType = {
     StructType(Seq.tabulate(numFields) { i =>
-      val dt = acceptedTypes(Random.nextInt(acceptedTypes.size))
-      StructField("col_" + i, dt, nullable = true)
+      val dt = acceptedTypes(rand.nextInt(acceptedTypes.size))
+      StructField("col_" + i, dt, nullable = rand.nextBoolean())
     })
   }
 
   /**
+   * Returns a random nested schema. This will randomly generate structs and arrays drawn from
+   * acceptedTypes.
+   */
+  def randomNestedSchema(rand: Random, totalFields: Int, acceptedTypes: Seq[DataType]):
+      StructType = {
+    val fields = mutable.ArrayBuffer.empty[StructField]
+    var i = 0
+    var numFields = totalFields
+    while (numFields > 0) {
+      val v = rand.nextInt(3)
+      if (v == 0) {
+        // Simple type:
+        val dt = acceptedTypes(rand.nextInt(acceptedTypes.size))
+        fields += new StructField("col_" + i, dt, rand.nextBoolean())
+        numFields -= 1
+      } else if (v == 1) {
+        // Array
+        val dt = acceptedTypes(rand.nextInt(acceptedTypes.size))
+        fields += new StructField("col_" + i, ArrayType(dt), rand.nextBoolean())
+        numFields -= 1
+      } else {
+        // Struct
+        // TODO: do empty structs make sense?
+        val n = Math.max(rand.nextInt(numFields), 1)
+        val nested = randomNestedSchema(rand, n, acceptedTypes)
+        fields += new StructField("col_" + i, nested, rand.nextBoolean())
+        numFields -= n
+      }
+      i += 1
+    }
+    StructType(fields)
+  }
+
+  /**
    * Returns a function which generates random values for the given [[DataType]], or `None` if no
    * random data generator is defined for that data type. The generated values will use an external
    * representation of the data type; for example, the random generator for [[DateType]] will return
@@ -90,16 +125,13 @@ object RandomDataGenerator {
    *
    * @param dataType the type to generate values for
    * @param nullable whether null values should be generated
-   * @param seed an optional seed for the random number generator
+   * @param rand an optional random number generator
    * @return a function which can be called to generate random values.
    */
   def forType(
       dataType: DataType,
       nullable: Boolean = true,
-      seed: Option[Long] = None): Option[() => Any] = {
-    val rand = new Random()
-    seed.foreach(rand.setSeed)
-
+      rand: Random = new Random): Option[() => Any] = {
     val valueGenerator: Option[() => Any] = dataType match {
       case StringType => Some(() => rand.nextString(rand.nextInt(MAX_STR_LEN)))
       case BinaryType => Some(() => {
@@ -165,15 +197,15 @@ object RandomDataGenerator {
         rand, _.nextInt().toShort, Seq(Short.MinValue, Short.MaxValue, 0.toShort))
       case NullType => Some(() => null)
       case ArrayType(elementType, containsNull) => {
-        forType(elementType, nullable = containsNull, seed = Some(rand.nextLong())).map {
+        forType(elementType, nullable = containsNull, rand).map {
           elementGenerator => () => Seq.fill(rand.nextInt(MAX_ARR_SIZE))(elementGenerator())
         }
       }
       case MapType(keyType, valueType, valueContainsNull) => {
         for (
-          keyGenerator <- forType(keyType, nullable = false, seed = Some(rand.nextLong()));
+          keyGenerator <- forType(keyType, nullable = false, rand);
           valueGenerator <-
-            forType(valueType, nullable = valueContainsNull, seed = Some(rand.nextLong()))
+            forType(valueType, nullable = valueContainsNull, rand)
         ) yield {
           () => {
             Seq.fill(rand.nextInt(MAX_MAP_SIZE))((keyGenerator(), valueGenerator())).toMap
@@ -182,7 +214,7 @@ object RandomDataGenerator {
       }
       case StructType(fields) => {
         val maybeFieldGenerators: Seq[Option[() => Any]] = fields.map { field =>
-          forType(field.dataType, nullable = field.nullable, seed = Some(rand.nextLong()))
+          forType(field.dataType, nullable = field.nullable, rand)
         }
         if (maybeFieldGenerators.forall(_.isDefined)) {
           val fieldGenerators: Seq[() => Any] = maybeFieldGenerators.map(_.get)
@@ -192,7 +224,7 @@ object RandomDataGenerator {
         }
       }
       case udt: UserDefinedType[_] => {
-        val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, seed)
+        val maybeSqlTypeGenerator = forType(udt.sqlType, nullable, rand)
         // Because random data generator at here returns scala value, we need to
         // convert it to catalyst value to call udt's deserialize.
         val toCatalystType = CatalystTypeConverters.createToCatalystConverter(udt.sqlType)
@@ -229,4 +261,40 @@ object RandomDataGenerator {
       }
     }
   }
+
+  // Generates a random row for `schema`.
+  def randomRow(rand: Random, schema: StructType): Row = {
+    val fields = mutable.ArrayBuffer.empty[Any]
+    schema.fields.foreach { f =>
+      f.dataType match {
+        case ArrayType(childType, nullable) => {
+          val data = if (f.nullable && rand.nextFloat() <= PROBABILITY_OF_NULL) {
+            null
+          } else {
+            val arr = mutable.ArrayBuffer.empty[Any]
+            val n = 1// rand.nextInt(10)
+            var i = 0
+            val generator = RandomDataGenerator.forType(childType, nullable, rand)
+            assert(generator.isDefined, "Unsupported type")
+            val gen = generator.get
+            while (i < n) {
+              arr += gen()
+              i += 1
+            }
+            arr
+          }
+          fields += data
+        }
+        case StructType(children) => {
+          fields += randomRow(rand, StructType(children))
+        }
+        case _ =>
+          val generator = RandomDataGenerator.forType(f.dataType, f.nullable, rand)
+          assert(generator.isDefined, "Unsupported type")
+          val gen = generator.get
+          fields += gen()
+      }
+    }
+    Row.fromSeq(fields)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
index cccac7e..b8ccdf7 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/RandomDataGeneratorSuite.scala
@@ -17,6 +17,8 @@
 
 package org.apache.spark.sql
 
+import scala.util.Random
+
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.sql.catalyst.CatalystTypeConverters
 import org.apache.spark.sql.types._
@@ -32,7 +34,7 @@ class RandomDataGeneratorSuite extends SparkFunSuite {
    */
   def testRandomDataGeneration(dataType: DataType, nullable: Boolean = true): Unit = {
     val toCatalyst = CatalystTypeConverters.createToCatalystConverter(dataType)
-    val generator = RandomDataGenerator.forType(dataType, nullable, Some(33)).getOrElse {
+    val generator = RandomDataGenerator.forType(dataType, nullable, new Random(33)).getOrElse {
       fail(s"Random data generator was not defined for $dataType")
     }
     if (nullable) {

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
index 59729e7..9f19745 100644
--- a/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
+++ b/sql/catalyst/src/test/scala/org/apache/spark/sql/catalyst/expressions/codegen/GenerateUnsafeRowJoinerSuite.scala
@@ -74,8 +74,9 @@ class GenerateUnsafeRowJoinerSuite extends SparkFunSuite {
 
   private def testConcatOnce(numFields1: Int, numFields2: Int, candidateTypes: Seq[DataType]) {
     info(s"schema size $numFields1, $numFields2")
-    val schema1 = RandomDataGenerator.randomSchema(numFields1, candidateTypes)
-    val schema2 = RandomDataGenerator.randomSchema(numFields2, candidateTypes)
+    val random = new Random()
+    val schema1 = RandomDataGenerator.randomSchema(random, numFields1, candidateTypes)
+    val schema2 = RandomDataGenerator.randomSchema(random, numFields2, candidateTypes)
 
     // Create the converters needed to convert from external row to internal row and to UnsafeRows.
     val internalConverter1 = CatalystTypeConverters.createToCatalystConverter(schema1)

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 8550975..c119758 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -17,22 +17,45 @@
 package org.apache.spark.sql.execution.vectorized;
 
 import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.apache.commons.lang.NotImplementedException;
 
 /**
  * This class represents a column of values and provides the main APIs to access the data
  * values. It supports all the types and contains get/put APIs as well as their batched versions.
  * The batched versions are preferable whenever possible.
  *
- * Most of the APIs take the rowId as a parameter. This is the local 0-based row id for values
+ * To handle nested schemas, ColumnVector has two types: Arrays and Structs. In both cases these
+ * columns have child columns. All of the data is stored in the child columns and the parent column
+ * contains nullability, and in the case of Arrays, the lengths and offsets into the child column.
+ * Lengths and offsets are encoded identically to INTs.
+ * Maps are just a special case of a two field struct.
+ * Strings are handled as an Array of ByteType.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is enough room before adding
+ * elements. This means that the put() APIs do not check as in common cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
  * in the current RowBatch.
  *
  * A ColumnVector should be considered immutable once originally created. In other words, it is not
  * valid to call put APIs after reads until reset() is called.
+ *
+ * ColumnVectors are intended to be reused.
  */
 public abstract class ColumnVector {
   /**
-   * Allocates a column with each element of size `width` either on or off heap.
+   * Allocates a column to store elements of `type` on or off heap.
+   * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
+   * in number of elements, not number of bytes.
    */
   public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) {
     if (mode == MemoryMode.OFF_HEAP) {
@@ -42,13 +65,265 @@ public abstract class ColumnVector {
     }
   }
 
+  /**
+   * Holder object to return an array. This object is intended to be reused. Callers should
+   * copy the data out if it needs to be stored.
+   */
+  public static final class Array extends ArrayData {
+    // The data for this array. This array contains elements from
+    // data[offset] to data[offset + length).
+    public final ColumnVector data;
+    public int length;
+    public int offset;
+
+    // Populate if binary data is required for the Array. This is stored here as an optimization
+    // for string data.
+    public byte[] byteArray;
+    public int byteArrayOffset;
+
+    // Reused staging buffer, used for loading from offheap.
+    protected byte[] tmpByteArray = new byte[1];
+
+    protected Array(ColumnVector data) {
+      this.data = data;
+    }
+
+    @Override
+    public final int numElements() { return length; }
+
+    @Override
+    public ArrayData copy() {
+      throw new NotImplementedException();
+    }
+
+    // TODO: this is extremely expensive.
+    @Override
+    public Object[] array() {
+      DataType dt = data.dataType();
+      Object[] list = new Object[length];
+
+      if (dt instanceof ByteType) {
+        for (int i = 0; i < length; i++) {
+          if (!data.getIsNull(offset + i)) {
+            list[i] = data.getByte(offset + i);
+          }
+        }
+      } else if (dt instanceof IntegerType) {
+        for (int i = 0; i < length; i++) {
+          if (!data.getIsNull(offset + i)) {
+            list[i] = data.getInt(offset + i);
+          }
+        }
+      } else if (dt instanceof DoubleType) {
+        for (int i = 0; i < length; i++) {
+          if (!data.getIsNull(offset + i)) {
+            list[i] = data.getDouble(offset + i);
+          }
+        }
+      } else if (dt instanceof LongType) {
+        for (int i = 0; i < length; i++) {
+          if (!data.getIsNull(offset + i)) {
+            list[i] = data.getLong(offset + i);
+          }
+        }
+      } else if (dt instanceof StringType) {
+        for (int i = 0; i < length; i++) {
+          if (!data.getIsNull(offset + i)) {
+            list[i] = ColumnVectorUtils.toString(data.getByteArray(offset + i));
+          }
+        }
+      } else {
+        throw new NotImplementedException("Type " + dt);
+      }
+      return list;
+    }
+
+    @Override
+    public final boolean isNullAt(int ordinal) { return data.getIsNull(offset + ordinal); }
+
+    @Override
+    public final boolean getBoolean(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public byte getByte(int ordinal) { return data.getByte(offset + ordinal); }
+
+    @Override
+    public short getShort(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public int getInt(int ordinal) { return data.getInt(offset + ordinal); }
+
+    @Override
+    public long getLong(int ordinal) { return data.getLong(offset + ordinal); }
+
+    @Override
+    public float getFloat(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public double getDouble(int ordinal) { return data.getDouble(offset + ordinal); }
+
+    @Override
+    public Decimal getDecimal(int ordinal, int precision, int scale) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public UTF8String getUTF8String(int ordinal) {
+      Array child = data.getByteArray(offset + ordinal);
+      return UTF8String.fromBytes(child.byteArray, child.byteArrayOffset, child.length);
+    }
+
+    @Override
+    public byte[] getBinary(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public CalendarInterval getInterval(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public InternalRow getStruct(int ordinal, int numFields) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public ArrayData getArray(int ordinal) {
+      return data.getArray(offset + ordinal);
+    }
+
+    @Override
+    public MapData getMap(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public Object get(int ordinal, DataType dataType) {
+      throw new NotImplementedException();
+    }
+  }
+
+  /**
+   * Holder object to return a struct. This object is intended to be reused.
+   */
+  public static final class Struct extends InternalRow {
+    // The fields that make up this struct. For example, if the struct had 2 int fields, the access
+    // to it would be:
+    //   int f1 = fields[0].getInt[rowId]
+    //   int f2 = fields[1].getInt[rowId]
+    public final ColumnVector[] fields;
+
+    @Override
+    public boolean isNullAt(int fieldIdx) { return fields[fieldIdx].getIsNull(rowId); }
+
+    @Override
+    public boolean getBoolean(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    public byte getByte(int fieldIdx) { return fields[fieldIdx].getByte(rowId); }
+
+    @Override
+    public short getShort(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    public int getInt(int fieldIdx) { return fields[fieldIdx].getInt(rowId); }
+    public long getLong(int fieldIdx) { return fields[fieldIdx].getLong(rowId); }
+
+    @Override
+    public float getFloat(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    public double getDouble(int fieldIdx) { return fields[fieldIdx].getDouble(rowId); }
+
+    @Override
+    public Decimal getDecimal(int ordinal, int precision, int scale) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public UTF8String getUTF8String(int ordinal) {
+      Array a = getByteArray(ordinal);
+      return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
+    }
+
+    @Override
+    public byte[] getBinary(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public CalendarInterval getInterval(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public InternalRow getStruct(int ordinal, int numFields) {
+      return fields[ordinal].getStruct(rowId);
+    }
+
+    public Array getArray(int fieldIdx) { return fields[fieldIdx].getArray(rowId); }
+
+    @Override
+    public MapData getMap(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public Object get(int ordinal, DataType dataType) {
+      throw new NotImplementedException();
+    }
+
+    public Array getByteArray(int fieldIdx) { return fields[fieldIdx].getByteArray(rowId); }
+    public Struct getStruct(int fieldIdx) { return fields[fieldIdx].getStruct(rowId); }
+
+    @Override
+    public final int numFields() {
+      return fields.length;
+    }
+
+    @Override
+    public InternalRow copy() {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public boolean anyNull() {
+      throw new NotImplementedException();
+    }
+
+    protected int rowId;
+
+    protected Struct(ColumnVector[] fields) {
+      this.fields = fields;
+    }
+  }
+
+  /**
+   * Returns the data type of this column.
+   */
   public final DataType dataType() { return type; }
 
   /**
    * Resets this column for writing. The currently stored values are no longer accessible.
    */
   public void reset() {
+    if (childColumns != null) {
+      for (ColumnVector c: childColumns) {
+        c.reset();
+      }
+    }
     numNulls = 0;
+    elementsAppended = 0;
     if (anyNullsSet) {
       putNotNulls(0, capacity);
       anyNullsSet = false;
@@ -61,6 +336,12 @@ public abstract class ColumnVector {
    */
   public abstract void close();
 
+  /*
+   * Ensures that there is enough storage to store capcity elements. That is, the put() APIs
+   * must work for all rowIds < capcity.
+   */
+  public abstract void reserve(int capacity);
+
   /**
    * Returns the number of nulls in this column.
    */
@@ -99,6 +380,26 @@ public abstract class ColumnVector {
   /**
    * Sets the value at rowId to `value`.
    */
+  public abstract void putByte(int rowId, byte value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putBytes(int rowId, int count, byte value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Returns the value for rowId.
+   */
+  public abstract byte getByte(int rowId);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
   public abstract void putInt(int rowId, int value);
 
   /**
@@ -118,13 +419,39 @@ public abstract class ColumnVector {
   public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
 
   /**
-   * Returns the integer for rowId.
+   * Returns the value for rowId.
    */
   public abstract int getInt(int rowId);
 
   /**
    * Sets the value at rowId to `value`.
    */
+  public abstract void putLong(int rowId, long value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putLongs(int rowId, int count, long value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putLongs(int rowId, int count, long[] src, int srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
+   * The data in src must be 8-byte little endian longs.
+   */
+  public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Returns the value for rowId.
+   */
+  public abstract long getLong(int rowId);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
   public abstract void putDouble(int rowId, double value);
 
   /**
@@ -145,14 +472,248 @@ public abstract class ColumnVector {
   public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
 
   /**
-   * Returns the double for rowId.
+   * Returns the value for rowId.
    */
   public abstract double getDouble(int rowId);
 
   /**
+   * Puts a byte array that already exists in this column.
+   */
+  public abstract void putArray(int rowId, int offset, int length);
+
+  /**
+   * Returns the length of the array at rowid.
+   */
+  public abstract int getArrayLength(int rowId);
+
+  /**
+   * Returns the offset of the array at rowid.
+   */
+  public abstract int getArrayOffset(int rowId);
+
+  /**
+   * Returns a utility object to get structs.
+   */
+  public Struct getStruct(int rowId) {
+    resultStruct.rowId = rowId;
+    return resultStruct;
+  }
+
+  /**
+   * Returns the array at rowid.
+   */
+  public final Array getArray(int rowId) {
+    resultArray.length = getArrayLength(rowId);
+    resultArray.offset = getArrayOffset(rowId);
+    return resultArray;
+  }
+
+  /**
+   * Loads the data into array.byteArray.
+   */
+  public abstract void loadBytes(Array array);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
+  public final int putByteArray(int rowId, byte[] value) {
+    return putByteArray(rowId, value, 0, value.length);
+  }
+
+  /**
+   * Returns the value for rowId.
+   */
+  public final Array getByteArray(int rowId) {
+    Array array = getArray(rowId);
+    array.data.loadBytes(array);
+    return array;
+  }
+
+  /**
+   * Append APIs. These APIs all behave similarly and will append data to the current vector.  It
+   * is not valid to mix the put and append APIs. The append APIs are slower and should only be
+   * used if the sizes are not known up front.
+   * In all these cases, the return value is the rowId for the first appended element.
+   */
+  public final int appendNull() {
+    assert (!(dataType() instanceof StructType)); // Use appendStruct()
+    reserve(elementsAppended + 1);
+    putNull(elementsAppended);
+    return elementsAppended++;
+  }
+
+  public final int appendNotNull() {
+    reserve(elementsAppended + 1);
+    putNotNull(elementsAppended);
+    return elementsAppended++;
+  }
+
+  public final int appendNulls(int count) {
+    assert (!(dataType() instanceof StructType));
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putNulls(elementsAppended, count);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendNotNulls(int count) {
+    assert (!(dataType() instanceof StructType));
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putNotNulls(elementsAppended, count);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendByte(byte v) {
+    reserve(elementsAppended + 1);
+    putByte(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendBytes(int count, byte v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putBytes(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendBytes(int length, byte[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putBytes(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendInt(int v) {
+    reserve(elementsAppended + 1);
+    putInt(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendInts(int count, int v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putInts(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendInts(int length, int[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putInts(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendLong(long v) {
+    reserve(elementsAppended + 1);
+    putLong(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendLongs(int count, long v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putLongs(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendLongs(int length, long[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putLongs(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendDouble(double v) {
+    reserve(elementsAppended + 1);
+    putDouble(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendDoubles(int count, double v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putDoubles(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendDoubles(int length, double[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putDoubles(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendByteArray(byte[] value, int offset, int length) {
+    int copiedOffset = arrayData().appendBytes(length, value, offset);
+    reserve(elementsAppended + 1);
+    putArray(elementsAppended, copiedOffset, length);
+    return elementsAppended++;
+  }
+
+  public final int appendArray(int length) {
+    reserve(elementsAppended + 1);
+    putArray(elementsAppended, arrayData().elementsAppended, length);
+    return elementsAppended++;
+  }
+
+  /**
+   * Appends a NULL struct. This *has* to be used for structs instead of appendNull() as this
+   * recursively appends a NULL to its children.
+   * We don't have this logic as the general appendNull implementation to optimize the more
+   * common non-struct case.
+   */
+  public final int appendStruct(boolean isNull) {
+    if (isNull) {
+      appendNull();
+      for (ColumnVector c: childColumns) {
+        if (c.type instanceof StructType) {
+          c.appendStruct(true);
+        } else {
+          c.appendNull();
+        }
+      }
+    } else {
+      appendNotNull();
+    }
+    return elementsAppended;
+  }
+
+  /**
+   * Returns the data for the underlying array.
+   */
+  public final ColumnVector arrayData() { return childColumns[0]; }
+
+  /**
+   * Returns the ordinal's child data column.
+   */
+  public final ColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; }
+
+  /**
+   * Returns the elements appended.
+   */
+  public int getElementsAppended() { return elementsAppended; }
+
+  /**
    * Maximum number of rows that can be stored in this column.
    */
-  protected final int capacity;
+  protected int capacity;
+
+  /**
+   * Data type for this column.
+   */
+  protected final DataType type;
 
   /**
    * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
@@ -166,12 +727,63 @@ public abstract class ColumnVector {
   protected boolean anyNullsSet;
 
   /**
-   * Data type for this column.
+   * Default size of each array length value. This grows as necessary.
    */
-  protected final DataType type;
+  protected static final int DEFAULT_ARRAY_LENGTH = 4;
+
+  /**
+   * Current write cursor (row index) when appending data.
+   */
+  protected int elementsAppended;
 
-  protected ColumnVector(int capacity, DataType type) {
+  /**
+   * If this is a nested type (array or struct), the column for the child data.
+   */
+  protected final ColumnVector[] childColumns;
+
+  /**
+   * Reusable Array holder for getArray().
+   */
+  protected final Array resultArray;
+
+  /**
+   * Reusable Struct holder for getStruct().
+   */
+  protected final Struct resultStruct;
+
+  /**
+   * Sets up the common state and also handles creating the child columns if this is a nested
+   * type.
+   */
+  protected ColumnVector(int capacity, DataType type, MemoryMode memMode) {
     this.capacity = capacity;
     this.type = type;
+
+    if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType) {
+      DataType childType;
+      int childCapacity = capacity;
+      if (type instanceof ArrayType) {
+        childType = ((ArrayType)type).elementType();
+      } else {
+        childType = DataTypes.ByteType;
+        childCapacity *= DEFAULT_ARRAY_LENGTH;
+      }
+      this.childColumns = new ColumnVector[1];
+      this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode);
+      this.resultArray = new Array(this.childColumns[0]);
+      this.resultStruct = null;
+    } else if (type instanceof StructType) {
+      StructType st = (StructType)type;
+      this.childColumns = new ColumnVector[st.fields().length];
+      for (int i = 0; i < childColumns.length; ++i) {
+        this.childColumns[i] = ColumnVector.allocate(capacity, st.fields()[i].dataType(), memMode);
+      }
+      this.resultArray = null;
+      this.resultStruct = new Struct(this.childColumns);
+    } else {
+      this.childColumns = null;
+      this.resultArray = null;
+      this.resultStruct = null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
new file mode 100644
index 0000000..6c651a7
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Iterator;
+import java.util.List;
+
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.Row;
+import org.apache.spark.sql.types.*;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * Utilities to help manipulate data associate with ColumnVectors. These should be used mostly
+ * for debugging or other non-performance critical paths.
+ * These utilities are mostly used to convert ColumnVectors into other formats.
+ */
+public class ColumnVectorUtils {
+  public static String toString(ColumnVector.Array a) {
+    return new String(a.byteArray, a.byteArrayOffset, a.length);
+  }
+
+  /**
+   * Returns the array data as the java primitive array.
+   * For example, an array of IntegerType will return an int[].
+   * Throws exceptions for unhandled schemas.
+   */
+  public static Object toPrimitiveJavaArray(ColumnVector.Array array) {
+    DataType dt = array.data.dataType();
+    if (dt instanceof IntegerType) {
+      int[] result = new int[array.length];
+      ColumnVector data = array.data;
+      for (int i = 0; i < result.length; i++) {
+        if (data.getIsNull(array.offset + i)) {
+          throw new RuntimeException("Cannot handle NULL values.");
+        }
+        result[i] = data.getInt(array.offset + i);
+      }
+      return result;
+    } else {
+      throw new NotImplementedException();
+    }
+  }
+
+  private static void appendValue(ColumnVector dst, DataType t, Object o) {
+    if (o == null) {
+      dst.appendNull();
+    } else {
+      if (t == DataTypes.ByteType) {
+        dst.appendByte(((Byte)o).byteValue());
+      } else if (t == DataTypes.IntegerType) {
+        dst.appendInt(((Integer)o).intValue());
+      } else if (t == DataTypes.LongType) {
+        dst.appendLong(((Long)o).longValue());
+      } else if (t == DataTypes.DoubleType) {
+        dst.appendDouble(((Double)o).doubleValue());
+      } else if (t == DataTypes.StringType) {
+        byte[] b =((String)o).getBytes();
+        dst.appendByteArray(b, 0, b.length);
+      } else {
+        throw new NotImplementedException("Type " + t);
+      }
+    }
+  }
+
+  private static void appendValue(ColumnVector dst, DataType t, Row src, int fieldIdx) {
+    if (t instanceof ArrayType) {
+      ArrayType at = (ArrayType)t;
+      if (src.isNullAt(fieldIdx)) {
+        dst.appendNull();
+      } else {
+        List<Object> values = src.getList(fieldIdx);
+        dst.appendArray(values.size());
+        for (Object o : values) {
+          appendValue(dst.arrayData(), at.elementType(), o);
+        }
+      }
+    } else if (t instanceof StructType) {
+      StructType st = (StructType)t;
+      if (src.isNullAt(fieldIdx)) {
+        dst.appendStruct(true);
+      } else {
+        dst.appendStruct(false);
+        Row c = src.getStruct(fieldIdx);
+        for (int i = 0; i < st.fields().length; i++) {
+          appendValue(dst.getChildColumn(i), st.fields()[i].dataType(), c, i);
+        }
+      }
+    } else {
+      appendValue(dst, t, src.get(fieldIdx));
+    }
+  }
+
+  /**
+   * Converts an iterator of rows into a single ColumnBatch.
+   */
+  public static ColumnarBatch toBatch(
+      StructType schema, MemoryMode memMode, Iterator<Row> row) {
+    ColumnarBatch batch = ColumnarBatch.allocate(schema, memMode);
+    int n = 0;
+    while (row.hasNext()) {
+      Row r = row.next();
+      for (int i = 0; i < schema.fields().length; i++) {
+        appendValue(batch.column(i), schema.fields()[i].dataType(), r, i);
+      }
+      n++;
+    }
+    batch.setNumRows(n);
+    return batch;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 2c55f85..d558dae 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -21,12 +21,10 @@ import java.util.Iterator;
 
 import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.Decimal;
-import org.apache.spark.sql.types.StructField;
-import org.apache.spark.sql.types.StructType;
+import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
@@ -48,6 +46,7 @@ import org.apache.commons.lang.NotImplementedException;
  */
 public final class ColumnarBatch {
   private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
+  private static MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
 
   private final StructType schema;
   private final int capacity;
@@ -64,6 +63,10 @@ public final class ColumnarBatch {
     return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
   }
 
+  public static ColumnarBatch allocate(StructType type) {
+    return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE);
+  }
+
   public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) {
     return new ColumnarBatch(schema, maxRows, memMode);
   }
@@ -82,25 +85,53 @@ public final class ColumnarBatch {
    * Adapter class to interop with existing components that expect internal row. A lot of
    * performance is lost with this translation.
    */
-  public final class Row extends InternalRow {
+  public static final class Row extends InternalRow {
     private int rowId;
+    private final ColumnarBatch parent;
+    private final int fixedLenRowSize;
+
+    private Row(ColumnarBatch parent) {
+      this.parent = parent;
+      this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(parent.numCols());
+    }
 
     /**
      * Marks this row as being filtered out. This means a subsequent iteration over the rows
      * in this batch will not include this row.
      */
     public final void markFiltered() {
-      ColumnarBatch.this.markFiltered(rowId);
+      parent.markFiltered(rowId);
     }
 
     @Override
     public final int numFields() {
-      return ColumnarBatch.this.numCols();
+      return parent.numCols();
     }
 
     @Override
+    /**
+     * Revisit this. This is expensive.
+     */
     public final InternalRow copy() {
-      throw new NotImplementedException();
+      UnsafeRow row = new UnsafeRow(parent.numCols());
+      row.pointTo(new byte[fixedLenRowSize], fixedLenRowSize);
+      for (int i = 0; i < parent.numCols(); i++) {
+        if (isNullAt(i)) {
+          row.setNullAt(i);
+        } else {
+          DataType dt = parent.schema.fields()[i].dataType();
+          if (dt instanceof IntegerType) {
+            row.setInt(i, getInt(i));
+          } else if (dt instanceof LongType) {
+            row.setLong(i, getLong(i));
+          } else if (dt instanceof DoubleType) {
+            row.setDouble(i, getDouble(i));
+          } else {
+            throw new RuntimeException("Not implemented.");
+          }
+        }
+      }
+      return row;
     }
 
     @Override
@@ -110,7 +141,7 @@ public final class ColumnarBatch {
 
     @Override
     public final boolean isNullAt(int ordinal) {
-      return ColumnarBatch.this.column(ordinal).getIsNull(rowId);
+      return parent.column(ordinal).getIsNull(rowId);
     }
 
     @Override
@@ -119,9 +150,7 @@ public final class ColumnarBatch {
     }
 
     @Override
-    public final byte getByte(int ordinal) {
-      throw new NotImplementedException();
-    }
+    public final byte getByte(int ordinal) { return parent.column(ordinal).getByte(rowId); }
 
     @Override
     public final short getShort(int ordinal) {
@@ -130,13 +159,11 @@ public final class ColumnarBatch {
 
     @Override
     public final int getInt(int ordinal) {
-      return ColumnarBatch.this.column(ordinal).getInt(rowId);
+      return parent.column(ordinal).getInt(rowId);
     }
 
     @Override
-    public final long getLong(int ordinal) {
-      throw new NotImplementedException();
-    }
+    public final long getLong(int ordinal) { return parent.column(ordinal).getLong(rowId); }
 
     @Override
     public final float getFloat(int ordinal) {
@@ -145,7 +172,7 @@ public final class ColumnarBatch {
 
     @Override
     public final double getDouble(int ordinal) {
-      return ColumnarBatch.this.column(ordinal).getDouble(rowId);
+      return parent.column(ordinal).getDouble(rowId);
     }
 
     @Override
@@ -155,7 +182,8 @@ public final class ColumnarBatch {
 
     @Override
     public final UTF8String getUTF8String(int ordinal) {
-      throw new NotImplementedException();
+      ColumnVector.Array a = parent.column(ordinal).getByteArray(rowId);
+      return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
     }
 
     @Override
@@ -170,12 +198,12 @@ public final class ColumnarBatch {
 
     @Override
     public final InternalRow getStruct(int ordinal, int numFields) {
-      throw new NotImplementedException();
+      return parent.column(ordinal).getStruct(rowId);
     }
 
     @Override
     public final ArrayData getArray(int ordinal) {
-      throw new NotImplementedException();
+      return parent.column(ordinal).getArray(rowId);
     }
 
     @Override
@@ -194,7 +222,7 @@ public final class ColumnarBatch {
    */
   public Iterator<Row> rowIterator() {
     final int maxRows = ColumnarBatch.this.numRows();
-    final Row row = new Row();
+    final Row row = new Row(this);
     return new Iterator<Row>() {
       int rowId = 0;
 

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 6180dd3..335124f 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -18,14 +18,20 @@ package org.apache.spark.sql.execution.vectorized;
 
 import java.nio.ByteOrder;
 
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.ByteType;
 import org.apache.spark.sql.types.DataType;
 import org.apache.spark.sql.types.DoubleType;
 import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.sql.types.LongType;
 import org.apache.spark.unsafe.Platform;
+import org.apache.spark.unsafe.types.UTF8String;
 
 
 import org.apache.commons.lang.NotImplementedException;
 
+import org.apache.commons.lang.NotImplementedException;
+
 /**
  * Column data backed using offheap memory.
  */
@@ -35,21 +41,21 @@ public final class OffHeapColumnVector extends ColumnVector {
   private long nulls;
   private long data;
 
+  // Set iff the type is array.
+  private long lengthData;
+  private long offsetData;
+
   protected OffHeapColumnVector(int capacity, DataType type) {
-    super(capacity, type);
+    super(capacity, type, MemoryMode.OFF_HEAP);
     if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) {
       throw new NotImplementedException("Only little endian is supported.");
     }
+    nulls = 0;
+    data = 0;
+    lengthData = 0;
+    offsetData = 0;
 
-    this.nulls = Platform.allocateMemory(capacity);
-    if (type instanceof IntegerType) {
-      this.data = Platform.allocateMemory(capacity * 4);
-    } else if (type instanceof DoubleType) {
-      this.data = Platform.allocateMemory(capacity * 8);
-    } else {
-      throw new RuntimeException("Unhandled " + type);
-    }
-    anyNullsSet = true;
+    reserveInternal(capacity);
     reset();
   }
 
@@ -67,8 +73,12 @@ public final class OffHeapColumnVector extends ColumnVector {
   public final void close() {
     Platform.freeMemory(nulls);
     Platform.freeMemory(data);
+    Platform.freeMemory(lengthData);
+    Platform.freeMemory(offsetData);
     nulls = 0;
     data = 0;
+    lengthData = 0;
+    offsetData = 0;
   }
 
   //
@@ -112,6 +122,33 @@ public final class OffHeapColumnVector extends ColumnVector {
   }
 
   //
+  // APIs dealing with Bytes
+  //
+
+  @Override
+  public final void putByte(int rowId, byte value) {
+    Platform.putByte(null, data + rowId, value);
+
+  }
+
+  @Override
+  public final void putBytes(int rowId, int count, byte value) {
+    for (int i = 0; i < count; ++i) {
+      Platform.putByte(null, data + rowId + i, value);
+    }
+  }
+
+  @Override
+  public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, null, data + rowId, count);
+  }
+
+  @Override
+  public final byte getByte(int rowId) {
+    return Platform.getByte(null, data + rowId);
+  }
+
+  //
   // APIs dealing with ints
   //
 
@@ -146,6 +183,40 @@ public final class OffHeapColumnVector extends ColumnVector {
   }
 
   //
+  // APIs dealing with Longs
+  //
+
+  @Override
+  public final void putLong(int rowId, long value) {
+    Platform.putLong(null, data + 8 * rowId, value);
+  }
+
+  @Override
+  public final void putLongs(int rowId, int count, long value) {
+    long offset = data + 8 * rowId;
+    for (int i = 0; i < count; ++i, offset += 8) {
+      Platform.putLong(null, offset, value);
+    }
+  }
+
+  @Override
+  public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.LONG_ARRAY_OFFSET + srcIndex * 8,
+        null, data + 8 * rowId, count * 8);
+  }
+
+  @Override
+  public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
+        null, data + 8 * rowId, count * 8);
+  }
+
+  @Override
+  public final long getLong(int rowId) {
+    return Platform.getLong(null, data + 8 * rowId);
+  }
+
+  //
   // APIs dealing with doubles
   //
 
@@ -178,4 +249,70 @@ public final class OffHeapColumnVector extends ColumnVector {
   public final double getDouble(int rowId) {
     return Platform.getDouble(null, data + rowId * 8);
   }
+
+  //
+  // APIs dealing with Arrays.
+  //
+  @Override
+  public final void putArray(int rowId, int offset, int length) {
+    assert(offset >= 0 && offset + length <= childColumns[0].capacity);
+    Platform.putInt(null, lengthData + 4 * rowId, length);
+    Platform.putInt(null, offsetData + 4 * rowId, offset);
+  }
+
+  @Override
+  public final int getArrayLength(int rowId) {
+    return Platform.getInt(null, lengthData + 4 * rowId);
+  }
+
+  @Override
+  public final int getArrayOffset(int rowId) {
+    return Platform.getInt(null, offsetData + 4 * rowId);
+  }
+
+  // APIs dealing with ByteArrays
+  @Override
+  public final int putByteArray(int rowId, byte[] value, int offset, int length) {
+    int result = arrayData().appendBytes(length, value, offset);
+    Platform.putInt(null, lengthData + 4 * rowId, length);
+    Platform.putInt(null, offsetData + 4 * rowId, result);
+    return result;
+  }
+
+  @Override
+  public final void loadBytes(Array array) {
+    if (array.tmpByteArray.length < array.length) array.tmpByteArray = new byte[array.length];
+    Platform.copyMemory(
+        null, data + array.offset, array.tmpByteArray, Platform.BYTE_ARRAY_OFFSET, array.length);
+    array.byteArray = array.tmpByteArray;
+    array.byteArrayOffset = 0;
+  }
+
+  @Override
+  public final void reserve(int requiredCapacity) {
+    if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
+  }
+
+  // Split out the slow path.
+  private final void reserveInternal(int newCapacity) {
+    if (this.resultArray != null) {
+      this.lengthData =
+          Platform.reallocateMemory(lengthData, elementsAppended * 4, newCapacity * 4);
+      this.offsetData =
+          Platform.reallocateMemory(offsetData, elementsAppended * 4, newCapacity * 4);
+    } else if (type instanceof ByteType) {
+      this.data = Platform.reallocateMemory(data, elementsAppended, newCapacity);
+    } else if (type instanceof IntegerType) {
+      this.data = Platform.reallocateMemory(data, elementsAppended * 4, newCapacity * 4);
+    } else if (type instanceof LongType || type instanceof DoubleType) {
+      this.data = Platform.reallocateMemory(data, elementsAppended * 8, newCapacity * 8);
+    } else if (resultStruct != null) {
+      // Nothing to store.
+    } else {
+      throw new RuntimeException("Unhandled " + type);
+    }
+    this.nulls = Platform.reallocateMemory(nulls, elementsAppended, newCapacity);
+    Platform.setMemory(nulls + elementsAppended, (byte)0, newCapacity - elementsAppended);
+    capacity = newCapacity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 76d9956..8197fa1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -16,13 +16,10 @@
  */
 package org.apache.spark.sql.execution.vectorized;
 
-import org.apache.spark.sql.types.DataType;
-import org.apache.spark.sql.types.DoubleType;
-import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.memory.MemoryMode;
+import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 
-import java.nio.ByteBuffer;
-import java.nio.DoubleBuffer;
 import java.util.Arrays;
 
 /**
@@ -37,19 +34,18 @@ public final class OnHeapColumnVector extends ColumnVector {
   private byte[] nulls;
 
   // Array for each type. Only 1 is populated for any type.
+  private byte[] byteData;
   private int[] intData;
+  private long[] longData;
   private double[] doubleData;
 
+  // Only set if type is Array.
+  private int[] arrayLengths;
+  private int[] arrayOffsets;
+
   protected OnHeapColumnVector(int capacity, DataType type) {
-    super(capacity, type);
-    if (type instanceof IntegerType) {
-      this.intData = new int[capacity];
-    } else if (type instanceof DoubleType) {
-      this.doubleData = new double[capacity];
-    } else {
-      throw new RuntimeException("Unhandled " + type);
-    }
-    this.nulls = new byte[capacity];
+    super(capacity, type, MemoryMode.ON_HEAP);
+    reserveInternal(capacity);
     reset();
   }
 
@@ -109,6 +105,32 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   //
+  // APIs dealing with Bytes
+  //
+
+  @Override
+  public final void putByte(int rowId, byte value) {
+    byteData[rowId] = value;
+  }
+
+  @Override
+  public final void putBytes(int rowId, int count, byte value) {
+    for (int i = 0; i < count; ++i) {
+      byteData[i + rowId] = value;
+    }
+  }
+
+  @Override
+  public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
+    System.arraycopy(src, srcIndex, byteData, rowId, count);
+  }
+
+  @Override
+  public final byte getByte(int rowId) {
+    return byteData[rowId];
+  }
+
+  //
   // APIs dealing with Ints
   //
 
@@ -145,6 +167,43 @@ public final class OnHeapColumnVector extends ColumnVector {
   }
 
   //
+  // APIs dealing with Longs
+  //
+
+  @Override
+  public final void putLong(int rowId, long value) {
+    longData[rowId] = value;
+  }
+
+  @Override
+  public final void putLongs(int rowId, int count, long value) {
+    for (int i = 0; i < count; ++i) {
+      longData[i + rowId] = value;
+    }
+  }
+
+  @Override
+  public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
+    System.arraycopy(src, srcIndex, longData, rowId, count);
+  }
+
+  @Override
+  public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+    int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+    for (int i = 0; i < count; ++i) {
+      longData[i + rowId] = Platform.getLong(src, srcOffset);
+      srcIndex += 8;
+      srcOffset += 8;
+    }
+  }
+
+  @Override
+  public final long getLong(int rowId) {
+    return longData[rowId];
+  }
+
+
+  //
   // APIs dealing with doubles
   //
 
@@ -173,4 +232,86 @@ public final class OnHeapColumnVector extends ColumnVector {
   public final double getDouble(int rowId) {
     return doubleData[rowId];
   }
+
+  //
+  // APIs dealing with Arrays
+  //
+
+  @Override
+  public final int getArrayLength(int rowId) {
+    return arrayLengths[rowId];
+  }
+  @Override
+  public final int getArrayOffset(int rowId) {
+    return arrayOffsets[rowId];
+  }
+
+  @Override
+  public final void putArray(int rowId, int offset, int length) {
+    arrayOffsets[rowId] = offset;
+    arrayLengths[rowId] = length;
+  }
+
+  @Override
+  public final void loadBytes(Array array) {
+    array.byteArray = byteData;
+    array.byteArrayOffset = array.offset;
+  }
+
+  //
+  // APIs dealing with Byte Arrays
+  //
+
+  @Override
+  public final int putByteArray(int rowId, byte[] value, int offset, int length) {
+    int result = arrayData().appendBytes(length, value, offset);
+    arrayOffsets[rowId] = result;
+    arrayLengths[rowId] = length;
+    return result;
+  }
+
+  @Override
+  public final void reserve(int requiredCapacity) {
+    if (requiredCapacity > capacity) reserveInternal(requiredCapacity * 2);
+  }
+
+  // Spilt this function out since it is the slow path.
+  private final void reserveInternal(int newCapacity) {
+    if (this.resultArray != null) {
+      int[] newLengths = new int[newCapacity];
+      int[] newOffsets = new int[newCapacity];
+      if (this.arrayLengths != null) {
+        System.arraycopy(this.arrayLengths, 0, newLengths, 0, elementsAppended);
+        System.arraycopy(this.arrayOffsets, 0, newOffsets, 0, elementsAppended);
+      }
+      arrayLengths = newLengths;
+      arrayOffsets = newOffsets;
+    } else if (type instanceof ByteType) {
+      byte[] newData = new byte[newCapacity];
+      if (byteData != null) System.arraycopy(byteData, 0, newData, 0, elementsAppended);
+      byteData = newData;
+    } else if (type instanceof IntegerType) {
+      int[] newData = new int[newCapacity];
+      if (intData != null) System.arraycopy(intData, 0, newData, 0, elementsAppended);
+      intData = newData;
+    } else if (type instanceof LongType) {
+      long[] newData = new long[newCapacity];
+      if (longData != null) System.arraycopy(longData, 0, newData, 0, elementsAppended);
+      longData = newData;
+    } else if (type instanceof DoubleType) {
+      double[] newData = new double[newCapacity];
+      if (doubleData != null) System.arraycopy(doubleData, 0, newData, 0, elementsAppended);
+      doubleData = newData;
+    } else if (resultStruct != null) {
+      // Nothing to store.
+    } else {
+      throw new RuntimeException("Unhandled " + type);
+    }
+
+    byte[] newNulls = new byte[newCapacity];
+    if (nulls != null) System.arraycopy(nulls, 0, newNulls, 0, elementsAppended);
+    nulls = newNulls;
+
+    capacity = newCapacity;
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
index 95c9550..8a95359 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/UnsafeKVExternalSorterSuite.scala
@@ -40,8 +40,8 @@ class UnsafeKVExternalSorterSuite extends SparkFunSuite with SharedSQLContext {
 
   private val rand = new Random(42)
   for (i <- 0 until 6) {
-    val keySchema = RandomDataGenerator.randomSchema(rand.nextInt(10) + 1, keyTypes)
-    val valueSchema = RandomDataGenerator.randomSchema(rand.nextInt(10) + 1, valueTypes)
+    val keySchema = RandomDataGenerator.randomSchema(rand, rand.nextInt(10) + 1, keyTypes)
+    val valueSchema = RandomDataGenerator.randomSchema(rand, rand.nextInt(10) + 1, valueTypes)
     testKVSorter(keySchema, valueSchema, spill = i > 3)
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index bfe944d..8efdf8a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -18,10 +18,12 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.nio.ByteBuffer
 
+import scala.util.Random
+
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.vectorized.ColumnVector
-import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.sql.types.{BinaryType, IntegerType}
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.Benchmark
 import org.apache.spark.util.collection.BitSet
@@ -239,6 +241,26 @@ object ColumnarBatchBenchmark {
       Platform.freeMemory(buffer)
     }
 
+    // Adding values by appending, instead of putting.
+    val onHeapAppend = { i: Int =>
+      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          col.appendInt(i)
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          sum += col.getInt(i)
+          i += 1
+        }
+        col.reset()
+      }
+      col.close
+    }
+
     /*
     Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
     Int Read/Write:              Avg Time(ms)    Avg Rate(M/s)  Relative Rate
@@ -253,6 +275,7 @@ object ColumnarBatchBenchmark {
     Column(off heap direct)             237.6          1379.12         1.05 X
     UnsafeRow (on heap)                 414.6           790.35         0.60 X
     UnsafeRow (off heap)                487.2           672.58         0.51 X
+    Column On Heap Append               530.1           618.14         0.59 X
     */
     val benchmark = new Benchmark("Int Read/Write", count * iters)
     benchmark.addCase("Java Array")(javaArray)
@@ -265,6 +288,7 @@ object ColumnarBatchBenchmark {
     benchmark.addCase("Column(off heap direct)")(columnOffheapDirect)
     benchmark.addCase("UnsafeRow (on heap)")(unsafeRowOnheap)
     benchmark.addCase("UnsafeRow (off heap)")(unsafeRowOffheap)
+    benchmark.addCase("Column On Heap Append")(onHeapAppend)
     benchmark.run()
   }
 
@@ -314,8 +338,60 @@ object ColumnarBatchBenchmark {
     benchmark.run()
   }
 
+  def stringAccess(iters: Long): Unit = {
+    val chars = "0123456789ABCDEFGHIJKLMNOPQRSTUVWXYZ";
+    val random = new Random(0)
+
+    def randomString(min: Int, max: Int): String = {
+      val len = random.nextInt(max - min) + min
+      val sb = new StringBuilder(len)
+      var i = 0
+      while (i < len) {
+        sb.append(chars.charAt(random.nextInt(chars.length())));
+        i += 1
+      }
+      return sb.toString
+    }
+
+    val minString = 3
+    val maxString = 32
+    val count = 4 * 1000
+
+    val data = Seq.fill(count)(randomString(minString, maxString)).map(_.getBytes).toArray
+
+    def column(memoryMode: MemoryMode) = { i: Int =>
+      val column = ColumnVector.allocate(count, BinaryType, memoryMode)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          column.putByteArray(i, data(i))
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          sum += column.getByteArray(i).length
+          i += 1
+        }
+        column.reset()
+      }
+    }
+
+    /*
+    String Read/Write:                       Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+    -------------------------------------------------------------------------------------
+    On Heap                                         457.0            35.85         1.00 X
+    Off Heap                                       1206.0            13.59         0.38 X
+    */
+    val benchmark = new Benchmark("String Read/Write", count * iters)
+    benchmark.addCase("On Heap")(column(MemoryMode.ON_HEAP))
+    benchmark.addCase("Off Heap")(column(MemoryMode.OFF_HEAP))
+    benchmark.run
+  }
+
   def main(args: Array[String]): Unit = {
     intAccess(1024 * 40)
     booleanAccess(1024 * 40)
+    stringAccess(1024 * 4)
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index d5e517c..215ca9a 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -17,14 +17,15 @@
 
 package org.apache.spark.sql.execution.vectorized
 
+import scala.collection.JavaConverters._
 import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
 import org.apache.spark.memory.MemoryMode
-import org.apache.spark.sql.Row
+import org.apache.spark.sql.{RandomDataGenerator, Row}
 import org.apache.spark.sql.catalyst.InternalRow
-import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
+import org.apache.spark.sql.types._
 import org.apache.spark.unsafe.Platform
 
 class ColumnarBatchSuite extends SparkFunSuite {
@@ -74,6 +75,45 @@ class ColumnarBatchSuite extends SparkFunSuite {
     }}
   }
 
+  test("Byte Apis") {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+      val reference = mutable.ArrayBuffer.empty[Byte]
+
+      val column = ColumnVector.allocate(1024, ByteType, memMode)
+      var idx = 0
+
+      val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).map(_.toByte).toArray
+      column.putBytes(idx, 2, values, 0)
+      reference += 1
+      reference += 2
+      idx += 2
+
+      column.putBytes(idx, 3, values, 2)
+      reference += 3
+      reference += 4
+      reference += 5
+      idx += 3
+
+      column.putByte(idx, 9)
+      reference += 9
+      idx += 1
+
+      column.putBytes(idx, 3, 4)
+      reference += 4
+      reference += 4
+      reference += 4
+      idx += 3
+
+      reference.zipWithIndex.foreach { v =>
+        assert(v._1 == column.getByte(v._2), "MemoryMode" + memMode)
+        if (memMode == MemoryMode.OFF_HEAP) {
+          val addr = column.valuesNativeAddress()
+          assert(v._1 == Platform.getByte(null, addr + v._2))
+        }
+      }
+    }}
+  }
+
   test("Int Apis") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val seed = System.currentTimeMillis()
@@ -142,6 +182,76 @@ class ColumnarBatchSuite extends SparkFunSuite {
     }}
   }
 
+  test("Long Apis") {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+      val seed = System.currentTimeMillis()
+      val random = new Random(seed)
+      val reference = mutable.ArrayBuffer.empty[Long]
+
+      val column = ColumnVector.allocate(1024, LongType, memMode)
+      var idx = 0
+
+      val values = (1L :: 2L :: 3L :: 4L :: 5L :: Nil).toArray
+      column.putLongs(idx, 2, values, 0)
+      reference += 1
+      reference += 2
+      idx += 2
+
+      column.putLongs(idx, 3, values, 2)
+      reference += 3
+      reference += 4
+      reference += 5
+      idx += 3
+
+      val littleEndian = new Array[Byte](16)
+      littleEndian(0) = 7
+      littleEndian(1) = 1
+      littleEndian(8) = 6
+      littleEndian(10) = 1
+
+      column.putLongsLittleEndian(idx, 1, littleEndian, 8)
+      column.putLongsLittleEndian(idx + 1, 1, littleEndian, 0)
+      reference += 6 + (1 << 16)
+      reference += 7 + (1 << 8)
+      idx += 2
+
+      column.putLongsLittleEndian(idx, 2, littleEndian, 0)
+      reference += 7 + (1 << 8)
+      reference += 6 + (1 << 16)
+      idx += 2
+
+      while (idx < column.capacity) {
+        val single = random.nextBoolean()
+        if (single) {
+          val v = random.nextLong()
+          column.putLong(idx, v)
+          reference += v
+          idx += 1
+        } else {
+
+          val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
+          column.putLongs(idx, n, n + 1)
+          var i = 0
+          while (i < n) {
+            reference += (n + 1)
+            i += 1
+          }
+          idx += n
+        }
+      }
+
+
+      reference.zipWithIndex.foreach { v =>
+        assert(v._1 == column.getLong(v._2), "idx=" + v._2 +
+            " Seed = " + seed + " MemMode=" + memMode)
+        if (memMode == MemoryMode.OFF_HEAP) {
+          val addr = column.valuesNativeAddress()
+          assert(v._1 == Platform.getLong(null, addr + 8 * v._2))
+        }
+      }
+    }}
+  }
+
   test("Double APIs") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val seed = System.currentTimeMillis()
@@ -209,15 +319,150 @@ class ColumnarBatchSuite extends SparkFunSuite {
     }}
   }
 
+  test("String APIs") {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+      val reference = mutable.ArrayBuffer.empty[String]
+
+      val column = ColumnVector.allocate(6, BinaryType, memMode)
+      assert(column.arrayData().elementsAppended == 0)
+      var idx = 0
+
+      val values = ("Hello" :: "abc" :: Nil).toArray
+      column.putByteArray(idx, values(0).getBytes, 0, values(0).getBytes().length)
+      reference += values(0)
+      idx += 1
+      assert(column.arrayData().elementsAppended == 5)
+
+      column.putByteArray(idx, values(1).getBytes, 0, values(1).getBytes().length)
+      reference += values(1)
+      idx += 1
+      assert(column.arrayData().elementsAppended == 8)
+
+      // Just put llo
+      val offset = column.putByteArray(idx, values(0).getBytes, 2, values(0).getBytes().length - 2)
+      reference += "llo"
+      idx += 1
+      assert(column.arrayData().elementsAppended == 11)
+
+      // Put the same "ll" at offset. This should not allocate more memory in the column.
+      column.putArray(idx, offset, 2)
+      reference += "ll"
+      idx += 1
+      assert(column.arrayData().elementsAppended == 11)
+
+      // Put a long string
+      val s = "abcdefghijklmnopqrstuvwxyz"
+      column.putByteArray(idx, (s + s).getBytes)
+      reference += (s + s)
+      idx += 1
+      assert(column.arrayData().elementsAppended == 11 + (s + s).length)
+
+      reference.zipWithIndex.foreach { v =>
+        assert(v._1.length == column.getArrayLength(v._2), "MemoryMode=" + memMode)
+        assert(v._1 == ColumnVectorUtils.toString(column.getByteArray(v._2)),
+          "MemoryMode" + memMode)
+      }
+
+      column.reset()
+      assert(column.arrayData().elementsAppended == 0)
+    }}
+  }
+
+  test("Int Array") {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+      val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode)
+
+      // Fill the underlying data with all the arrays back to back.
+      val data = column.arrayData();
+      var i = 0
+      while (i < 6) {
+        data.putInt(i, i)
+        i += 1
+      }
+
+      // Populate it with arrays [0], [1, 2], [], [3, 4, 5]
+      column.putArray(0, 0, 1)
+      column.putArray(1, 1, 2)
+      column.putArray(2, 2, 0)
+      column.putArray(3, 3, 3)
+
+      val a1 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
+      val a2 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(1)).asInstanceOf[Array[Int]]
+      val a3 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(2)).asInstanceOf[Array[Int]]
+      val a4 = ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(3)).asInstanceOf[Array[Int]]
+      assert(a1 === Array(0))
+      assert(a2 === Array(1, 2))
+      assert(a3 === Array.empty[Int])
+      assert(a4 === Array(3, 4, 5))
+
+      // Verify the ArrayData APIs
+      assert(column.getArray(0).length == 1)
+      assert(column.getArray(0).getInt(0) == 0)
+
+      assert(column.getArray(1).length == 2)
+      assert(column.getArray(1).getInt(0) == 1)
+      assert(column.getArray(1).getInt(1) == 2)
+
+      assert(column.getArray(2).length == 0)
+
+      assert(column.getArray(3).length == 3)
+      assert(column.getArray(3).getInt(0) == 3)
+      assert(column.getArray(3).getInt(1) == 4)
+      assert(column.getArray(3).getInt(2) == 5)
+
+      // Add a longer array which requires resizing
+      column.reset
+      val array = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12)
+      assert(data.capacity == 10)
+      data.reserve(array.length)
+      assert(data.capacity == array.length * 2)
+      data.putInts(0, array.length, array, 0)
+      column.putArray(0, 0, array.length)
+      assert(ColumnVectorUtils.toPrimitiveJavaArray(column.getArray(0)).asInstanceOf[Array[Int]]
+        === array)
+    }}
+  }
+
+  test("Struct Column") {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+      val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
+      val column = ColumnVector.allocate(1024, schema, memMode)
+
+      val c1 = column.getChildColumn(0)
+      val c2 = column.getChildColumn(1)
+      assert(c1.dataType() == IntegerType)
+      assert(c2.dataType() == DoubleType)
+
+      c1.putInt(0, 123)
+      c2.putDouble(0, 3.45)
+      c1.putInt(1, 456)
+      c2.putDouble(1, 5.67)
+
+      val s = column.getStruct(0)
+      assert(s.fields(0).getInt(0) == 123)
+      assert(s.fields(0).getInt(1) == 456)
+      assert(s.fields(1).getDouble(0) == 3.45)
+      assert(s.fields(1).getDouble(1) == 5.67)
+
+      assert(s.getInt(0) == 123)
+      assert(s.getDouble(1) == 3.45)
+
+      val s2 = column.getStruct(1)
+      assert(s2.getInt(0) == 456)
+      assert(s2.getDouble(1) == 5.67)
+    }}
+  }
+
   test("ColumnarBatch basic") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val schema = new StructType()
         .add("intCol", IntegerType)
         .add("doubleCol", DoubleType)
         .add("intCol2", IntegerType)
+        .add("string", BinaryType)
 
       val batch = ColumnarBatch.allocate(schema, memMode)
-      assert(batch.numCols() == 3)
+      assert(batch.numCols() == 4)
       assert(batch.numRows() == 0)
       assert(batch.numValidRows() == 0)
       assert(batch.capacity() > 0)
@@ -227,10 +472,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
       batch.column(0).putInt(0, 1)
       batch.column(1).putDouble(0, 1.1)
       batch.column(2).putNull(0)
+      batch.column(3).putByteArray(0, "Hello".getBytes)
       batch.setNumRows(1)
 
       // Verify the results of the row.
-      assert(batch.numCols() == 3)
+      assert(batch.numCols() == 4)
       assert(batch.numRows() == 1)
       assert(batch.numValidRows() == 1)
       assert(batch.rowIterator().hasNext == true)
@@ -241,6 +487,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.column(1).getDouble(0) == 1.1)
       assert(batch.column(1).getIsNull(0) == false)
       assert(batch.column(2).getIsNull(0) == true)
+      assert(ColumnVectorUtils.toString(batch.column(3).getByteArray(0)) == "Hello")
 
       // Verify the iterator works correctly.
       val it = batch.rowIterator()
@@ -251,6 +498,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(row.getDouble(1) == 1.1)
       assert(row.isNullAt(1) == false)
       assert(row.isNullAt(2) == true)
+      assert(ColumnVectorUtils.toString(batch.column(3).getByteArray(0)) == "Hello")
       assert(it.hasNext == false)
       assert(it.hasNext == false)
 
@@ -260,24 +508,27 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.numValidRows() == 0)
       assert(batch.rowIterator().hasNext == false)
 
-      // Reset and add 3 throws
+      // Reset and add 3 rows
       batch.reset()
       assert(batch.numRows() == 0)
       assert(batch.numValidRows() == 0)
       assert(batch.rowIterator().hasNext == false)
 
-      // Add rows [NULL, 2.2, 2], [3, NULL, 3], [4, 4.4, 4]
+      // Add rows [NULL, 2.2, 2, "abc"], [3, NULL, 3, ""], [4, 4.4, 4, "world]
       batch.column(0).putNull(0)
       batch.column(1).putDouble(0, 2.2)
       batch.column(2).putInt(0, 2)
+      batch.column(3).putByteArray(0, "abc".getBytes)
 
       batch.column(0).putInt(1, 3)
       batch.column(1).putNull(1)
       batch.column(2).putInt(1, 3)
+      batch.column(3).putByteArray(1, "".getBytes)
 
       batch.column(0).putInt(2, 4)
       batch.column(1).putDouble(2, 4.4)
       batch.column(2).putInt(2, 4)
+      batch.column(3).putByteArray(2, "world".getBytes)
       batch.setNumRows(3)
 
       def rowEquals(x: InternalRow, y: Row): Unit = {
@@ -289,30 +540,152 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
         assert(x.isNullAt(2) == y.isNullAt(2))
         if (!x.isNullAt(2)) assert(x.getInt(2) == y.getInt(2))
+
+        assert(x.isNullAt(3) == y.isNullAt(3))
+        if (!x.isNullAt(3)) assert(x.getString(3) == y.getString(3))
       }
+
       // Verify
       assert(batch.numRows() == 3)
       assert(batch.numValidRows() == 3)
       val it2 = batch.rowIterator()
-      rowEquals(it2.next(), Row(null, 2.2, 2))
-      rowEquals(it2.next(), Row(3, null, 3))
-      rowEquals(it2.next(), Row(4, 4.4, 4))
+      rowEquals(it2.next(), Row(null, 2.2, 2, "abc"))
+      rowEquals(it2.next(), Row(3, null, 3, ""))
+      rowEquals(it2.next(), Row(4, 4.4, 4, "world"))
       assert(!it.hasNext)
 
       // Filter out some rows and verify
       batch.markFiltered(1)
       assert(batch.numValidRows() == 2)
       val it3 = batch.rowIterator()
-      rowEquals(it3.next(), Row(null, 2.2, 2))
-      rowEquals(it3.next(), Row(4, 4.4, 4))
+      rowEquals(it3.next(), Row(null, 2.2, 2, "abc"))
+      rowEquals(it3.next(), Row(4, 4.4, 4, "world"))
       assert(!it.hasNext)
 
       batch.markFiltered(2)
       assert(batch.numValidRows() == 1)
       val it4 = batch.rowIterator()
-      rowEquals(it4.next(), Row(null, 2.2, 2))
+      rowEquals(it4.next(), Row(null, 2.2, 2, "abc"))
 
       batch.close
     }}
   }
+
+
+  private def doubleEquals(d1: Double, d2: Double): Boolean = {
+    if (d1.isNaN && d2.isNaN) {
+      true
+    } else {
+      d1 == d2
+    }
+  }
+
+  private def compareStruct(fields: Seq[StructField], r1: InternalRow, r2: Row, seed: Long) {
+    fields.zipWithIndex.foreach { v => {
+      assert(r1.isNullAt(v._2) == r2.isNullAt(v._2), "Seed = " + seed)
+      if (!r1.isNullAt(v._2)) {
+        v._1.dataType match {
+          case ByteType => assert(r1.getByte(v._2) == r2.getByte(v._2), "Seed = " + seed)
+          case IntegerType => assert(r1.getInt(v._2) == r2.getInt(v._2), "Seed = " + seed)
+          case LongType => assert(r1.getLong(v._2) == r2.getLong(v._2), "Seed = " + seed)
+          case DoubleType => assert(doubleEquals(r1.getDouble(v._2), r2.getDouble(v._2)),
+            "Seed = " + seed)
+          case StringType =>
+            assert(r1.getString(v._2) == r2.getString(v._2), "Seed = " + seed)
+          case ArrayType(childType, n) =>
+            val a1 = r1.getArray(v._2).array
+            val a2 = r2.getList(v._2).toArray
+            assert(a1.length == a2.length, "Seed = " + seed)
+            childType match {
+              case DoubleType => {
+                var i = 0
+                while (i < a1.length) {
+                  assert(doubleEquals(a1(i).asInstanceOf[Double], a2(i).asInstanceOf[Double]),
+                    "Seed = " + seed)
+                  i += 1
+                }
+              }
+              case _ => assert(a1 === a2, "Seed = " + seed)
+            }
+          case StructType(childFields) =>
+            compareStruct(childFields, r1.getStruct(v._2, fields.length), r2.getStruct(v._2), seed)
+          case _ =>
+            throw new NotImplementedError("Not implemented " + v._1.dataType)
+        }
+      }
+    }}
+  }
+
+  test("Convert rows") {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+      val rows = Row(1, 2L, "a", 1.2, 'b'.toByte) :: Row(4, 5L, "cd", 2.3, 'a'.toByte) :: Nil
+      val schema = new StructType()
+        .add("i1", IntegerType)
+        .add("l2", LongType)
+        .add("string", StringType)
+        .add("d", DoubleType)
+        .add("b", ByteType)
+
+      val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava)
+      assert(batch.numRows() == 2)
+      assert(batch.numCols() == 5)
+
+      val it = batch.rowIterator()
+      val referenceIt = rows.iterator
+      while (it.hasNext) {
+        compareStruct(schema, it.next(), referenceIt.next(), 0)
+      }
+      batch.close()
+    }
+  }}
+
+  /**
+   * This test generates a random schema data, serializes it to column batches and verifies the
+   * results.
+   */
+  def testRandomRows(flatSchema: Boolean, numFields: Int) {
+    // TODO: add remaining types. Figure out why StringType doesn't work on jenkins.
+    val types = Array(ByteType, IntegerType, LongType, DoubleType)
+    val seed = System.nanoTime()
+    val NUM_ROWS = 500
+    val NUM_ITERS = 1000
+    val random = new Random(seed)
+    var i = 0
+    while (i < NUM_ITERS) {
+      val schema = if (flatSchema) {
+        RandomDataGenerator.randomSchema(random, numFields, types)
+      } else {
+        RandomDataGenerator.randomNestedSchema(random, numFields, types)
+      }
+      val rows = mutable.ArrayBuffer.empty[Row]
+      var j = 0
+      while (j < NUM_ROWS) {
+        val row = RandomDataGenerator.randomRow(random, schema)
+        rows += row
+        j += 1
+      }
+      (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
+        val batch = ColumnVectorUtils.toBatch(schema, memMode, rows.iterator.asJava)
+        assert(batch.numRows() == NUM_ROWS)
+
+        val it = batch.rowIterator()
+        val referenceIt = rows.iterator
+        var k = 0
+        while (it.hasNext) {
+          compareStruct(schema, it.next(), referenceIt.next(), seed)
+          k += 1
+        }
+        batch.close()
+      }}
+      i += 1
+    }
+  }
+
+  test("Random flat schema") {
+    testRandomRows(true, 10)
+  }
+
+  test("Random nested schema") {
+    testRandomRows(false, 30)
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/55512738/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
----------------------------------------------------------------------
diff --git a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
index 76b36aa..3e4cf3f 100644
--- a/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
+++ b/sql/hive/src/test/scala/org/apache/spark/sql/hive/execution/AggregationQuerySuite.scala
@@ -18,6 +18,7 @@
 package org.apache.spark.sql.hive.execution
 
 import scala.collection.JavaConverters._
+import scala.util.Random
 
 import org.apache.spark.sql._
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
@@ -879,7 +880,7 @@ abstract class AggregationQuerySuite extends QueryTest with SQLTestUtils with Te
         RandomDataGenerator.forType(
           dataType = schemaForGenerator,
           nullable = true,
-          seed = Some(System.nanoTime()))
+          new Random(System.nanoTime()))
       val dataGenerator =
         maybeDataGenerator
           .getOrElse(fail(s"Failed to create data generator for schema $schemaForGenerator"))


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org