You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by we...@apache.org on 2017/08/24 13:13:56 UTC

[1/2] spark git commit: [SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector read-only and to introduce WritableColumnVector.

Repository: spark
Updated Branches:
  refs/heads/master dc5d34d8d -> 9e33954dd


http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
new file mode 100644
index 0000000..b4f753c
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/WritableColumnVector.java
@@ -0,0 +1,674 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import com.google.common.annotations.VisibleForTesting;
+
+import org.apache.spark.sql.internal.SQLConf;
+import org.apache.spark.sql.types.*;
+import org.apache.spark.unsafe.types.UTF8String;
+
+/**
+ * This class adds write APIs to ColumnVector.
+ * It supports all the types and contains put APIs as well as their batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Capacity: The data stored is dense but the arrays are not fixed capacity. It is the
+ * responsibility of the caller to call reserve() to ensure there is enough room before adding
+ * elements. This means that the put() APIs do not check as in common cases (i.e. flat schemas),
+ * the lengths are known up front.
+ *
+ * A ColumnVector should be considered immutable once originally created. In other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class WritableColumnVector extends ColumnVector {
+
+  /**
+   * Resets this column for writing. The currently stored values are no longer accessible.
+   */
+  public void reset() {
+    if (isConstant) return;
+
+    if (childColumns != null) {
+      for (ColumnVector c: childColumns) {
+        ((WritableColumnVector) c).reset();
+      }
+    }
+    numNulls = 0;
+    elementsAppended = 0;
+    if (anyNullsSet) {
+      putNotNulls(0, capacity);
+      anyNullsSet = false;
+    }
+  }
+
+  public void reserve(int requiredCapacity) {
+    if (requiredCapacity > capacity) {
+      int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
+      if (requiredCapacity <= newCapacity) {
+        try {
+          reserveInternal(newCapacity);
+        } catch (OutOfMemoryError outOfMemoryError) {
+          throwUnsupportedException(requiredCapacity, outOfMemoryError);
+        }
+      } else {
+        throwUnsupportedException(requiredCapacity, null);
+      }
+    }
+  }
+
+  private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
+    String message = "Cannot reserve additional contiguous bytes in the vectorized reader " +
+        "(requested = " + requiredCapacity + " bytes). As a workaround, you can disable the " +
+        "vectorized reader by setting " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
+        " to false.";
+    throw new RuntimeException(message, cause);
+  }
+
+  @Override
+  public int numNulls() { return numNulls; }
+
+  @Override
+  public boolean anyNullsSet() { return anyNullsSet; }
+
+  /**
+   * Ensures that there is enough storage to store capacity elements. That is, the put() APIs
+   * must work for all rowIds < capacity.
+   */
+  protected abstract void reserveInternal(int capacity);
+
+  /**
+   * Sets the value at rowId to null/not null.
+   */
+  public abstract void putNotNull(int rowId);
+  public abstract void putNull(int rowId);
+
+  /**
+   * Sets the values from [rowId, rowId + count) to null/not null.
+   */
+  public abstract void putNulls(int rowId, int count);
+  public abstract void putNotNulls(int rowId, int count);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putBoolean(int rowId, boolean value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putBooleans(int rowId, int count, boolean value);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putByte(int rowId, byte value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putBytes(int rowId, int count, byte value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putShort(int rowId, short value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putShorts(int rowId, int count, short value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putShorts(int rowId, int count, short[] src, int srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putInt(int rowId, int value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putInts(int rowId, int count, int value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
+   * The data in src must be 4-byte little endian ints.
+   */
+  public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putLong(int rowId, long value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putLongs(int rowId, int count, long value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putLongs(int rowId, int count, long[] src, int srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
+   * The data in src must be 8-byte little endian longs.
+   */
+  public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putFloat(int rowId, float value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putFloats(int rowId, int count, float value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putFloats(int rowId, int count, float[] src, int srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
+   * The data in src must be ieee formatted floats.
+   */
+  public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putDouble(int rowId, double value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putDoubles(int rowId, int count, double value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
+   * The data in src must be ieee formatted doubles.
+   */
+  public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Puts a byte array that already exists in this column.
+   */
+  public abstract void putArray(int rowId, int offset, int length);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
+  public final int putByteArray(int rowId, byte[] value) {
+    return putByteArray(rowId, value, 0, value.length);
+  }
+
+  /**
+   * Returns the value for rowId.
+   */
+  private ColumnVector.Array getByteArray(int rowId) {
+    ColumnVector.Array array = getArray(rowId);
+    array.data.loadBytes(array);
+    return array;
+  }
+
+  /**
+   * Returns the decimal for rowId.
+   */
+  @Override
+  public Decimal getDecimal(int rowId, int precision, int scale) {
+    if (precision <= Decimal.MAX_INT_DIGITS()) {
+      return Decimal.createUnsafe(getInt(rowId), precision, scale);
+    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+      return Decimal.createUnsafe(getLong(rowId), precision, scale);
+    } else {
+      // TODO: best perf?
+      byte[] bytes = getBinary(rowId);
+      BigInteger bigInteger = new BigInteger(bytes);
+      BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
+      return Decimal.apply(javaDecimal, precision, scale);
+    }
+  }
+
+  public void putDecimal(int rowId, Decimal value, int precision) {
+    if (precision <= Decimal.MAX_INT_DIGITS()) {
+      putInt(rowId, (int) value.toUnscaledLong());
+    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
+      putLong(rowId, value.toUnscaledLong());
+    } else {
+      BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
+      putByteArray(rowId, bigInteger.toByteArray());
+    }
+  }
+
+  /**
+   * Returns the UTF8String for rowId.
+   */
+  @Override
+  public UTF8String getUTF8String(int rowId) {
+    if (dictionary == null) {
+      ColumnVector.Array a = getByteArray(rowId);
+      return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
+    } else {
+      byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
+      return UTF8String.fromBytes(bytes);
+    }
+  }
+
+  /**
+   * Returns the byte array for rowId.
+   */
+  @Override
+  public byte[] getBinary(int rowId) {
+    if (dictionary == null) {
+      ColumnVector.Array array = getByteArray(rowId);
+      byte[] bytes = new byte[array.length];
+      System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
+      return bytes;
+    } else {
+      return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
+    }
+  }
+
+  /**
+   * Append APIs. These APIs all behave similarly and will append data to the current vector.  It
+   * is not valid to mix the put and append APIs. The append APIs are slower and should only be
+   * used if the sizes are not known up front.
+   * In all these cases, the return value is the rowId for the first appended element.
+   */
+  public final int appendNull() {
+    assert (!(dataType() instanceof StructType)); // Use appendStruct()
+    reserve(elementsAppended + 1);
+    putNull(elementsAppended);
+    return elementsAppended++;
+  }
+
+  public final int appendNotNull() {
+    reserve(elementsAppended + 1);
+    putNotNull(elementsAppended);
+    return elementsAppended++;
+  }
+
+  public final int appendNulls(int count) {
+    assert (!(dataType() instanceof StructType));
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putNulls(elementsAppended, count);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendNotNulls(int count) {
+    assert (!(dataType() instanceof StructType));
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putNotNulls(elementsAppended, count);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendBoolean(boolean v) {
+    reserve(elementsAppended + 1);
+    putBoolean(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendBooleans(int count, boolean v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putBooleans(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendByte(byte v) {
+    reserve(elementsAppended + 1);
+    putByte(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendBytes(int count, byte v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putBytes(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendBytes(int length, byte[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putBytes(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendShort(short v) {
+    reserve(elementsAppended + 1);
+    putShort(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendShorts(int count, short v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putShorts(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendShorts(int length, short[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putShorts(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendInt(int v) {
+    reserve(elementsAppended + 1);
+    putInt(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendInts(int count, int v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putInts(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendInts(int length, int[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putInts(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendLong(long v) {
+    reserve(elementsAppended + 1);
+    putLong(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendLongs(int count, long v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putLongs(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendLongs(int length, long[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putLongs(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendFloat(float v) {
+    reserve(elementsAppended + 1);
+    putFloat(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendFloats(int count, float v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putFloats(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendFloats(int length, float[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putFloats(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendDouble(double v) {
+    reserve(elementsAppended + 1);
+    putDouble(elementsAppended, v);
+    return elementsAppended++;
+  }
+
+  public final int appendDoubles(int count, double v) {
+    reserve(elementsAppended + count);
+    int result = elementsAppended;
+    putDoubles(elementsAppended, count, v);
+    elementsAppended += count;
+    return result;
+  }
+
+  public final int appendDoubles(int length, double[] src, int offset) {
+    reserve(elementsAppended + length);
+    int result = elementsAppended;
+    putDoubles(elementsAppended, length, src, offset);
+    elementsAppended += length;
+    return result;
+  }
+
+  public final int appendByteArray(byte[] value, int offset, int length) {
+    int copiedOffset = arrayData().appendBytes(length, value, offset);
+    reserve(elementsAppended + 1);
+    putArray(elementsAppended, copiedOffset, length);
+    return elementsAppended++;
+  }
+
+  public final int appendArray(int length) {
+    reserve(elementsAppended + 1);
+    putArray(elementsAppended, arrayData().elementsAppended, length);
+    return elementsAppended++;
+  }
+
+  /**
+   * Appends a NULL struct. This *has* to be used for structs instead of appendNull() as this
+   * recursively appends a NULL to its children.
+   * We don't have this logic as the general appendNull implementation to optimize the more
+   * common non-struct case.
+   */
+  public final int appendStruct(boolean isNull) {
+    if (isNull) {
+      appendNull();
+      for (ColumnVector c: childColumns) {
+        if (c.type instanceof StructType) {
+          ((WritableColumnVector) c).appendStruct(true);
+        } else {
+          ((WritableColumnVector) c).appendNull();
+        }
+      }
+    } else {
+      appendNotNull();
+    }
+    return elementsAppended;
+  }
+
+  /**
+   * Returns the data for the underlying array.
+   */
+  @Override
+  public WritableColumnVector arrayData() { return childColumns[0]; }
+
+  /**
+   * Returns the ordinal's child data column.
+   */
+  @Override
+  public WritableColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; }
+
+  /**
+   * Returns the elements appended.
+   */
+  public final int getElementsAppended() { return elementsAppended; }
+
+  /**
+   * Marks this column as being constant.
+   */
+  public final void setIsConstant() { isConstant = true; }
+
+  /**
+   * Maximum number of rows that can be stored in this column.
+   */
+  protected int capacity;
+
+  /**
+   * Upper limit for the maximum capacity for this column.
+   */
+  @VisibleForTesting
+  protected int MAX_CAPACITY = Integer.MAX_VALUE;
+
+  /**
+   * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
+   */
+  protected int numNulls;
+
+  /**
+   * True if there is at least one NULL byte set. This is an optimization for the writer, to skip
+   * having to clear NULL bits.
+   */
+  protected boolean anyNullsSet;
+
+  /**
+   * True if this column's values are fixed. This means the column values never change, even
+   * across resets.
+   */
+  protected boolean isConstant;
+
+  /**
+   * Default size of each array length value. This grows as necessary.
+   */
+  protected static final int DEFAULT_ARRAY_LENGTH = 4;
+
+  /**
+   * Current write cursor (row index) when appending data.
+   */
+  protected int elementsAppended;
+
+  /**
+   * If this is a nested type (array or struct), the column for the child data.
+   */
+  protected WritableColumnVector[] childColumns;
+
+  /**
+   * Update the dictionary.
+   */
+  public void setDictionary(Dictionary dictionary) {
+    this.dictionary = dictionary;
+  }
+
+  /**
+   * Reserve a integer column for ids of dictionary.
+   */
+  public WritableColumnVector reserveDictionaryIds(int capacity) {
+    WritableColumnVector dictionaryIds = (WritableColumnVector) this.dictionaryIds;
+    if (dictionaryIds == null) {
+      dictionaryIds = reserveNewColumn(capacity, DataTypes.IntegerType);
+      this.dictionaryIds = dictionaryIds;
+    } else {
+      dictionaryIds.reset();
+      dictionaryIds.reserve(capacity);
+    }
+    return dictionaryIds;
+  }
+
+  /**
+   * Returns the underlying integer column for ids of dictionary.
+   */
+  @Override
+  public WritableColumnVector getDictionaryIds() {
+    return (WritableColumnVector) dictionaryIds;
+  }
+
+  /**
+   * Reserve a new column.
+   */
+  protected abstract WritableColumnVector reserveNewColumn(int capacity, DataType type);
+
+  /**
+   * Sets up the common state and also handles creating the child columns if this is a nested
+   * type.
+   */
+  protected WritableColumnVector(int capacity, DataType type) {
+    super(type);
+    this.capacity = capacity;
+
+    if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType
+        || DecimalType.isByteArrayDecimalType(type)) {
+      DataType childType;
+      int childCapacity = capacity;
+      if (type instanceof ArrayType) {
+        childType = ((ArrayType)type).elementType();
+      } else {
+        childType = DataTypes.ByteType;
+        childCapacity *= DEFAULT_ARRAY_LENGTH;
+      }
+      this.childColumns = new WritableColumnVector[1];
+      this.childColumns[0] = reserveNewColumn(childCapacity, childType);
+      this.resultArray = new ColumnVector.Array(this.childColumns[0]);
+      this.resultStruct = null;
+    } else if (type instanceof StructType) {
+      StructType st = (StructType)type;
+      this.childColumns = new WritableColumnVector[st.fields().length];
+      for (int i = 0; i < childColumns.length; ++i) {
+        this.childColumns[i] = reserveNewColumn(capacity, st.fields()[i].dataType());
+      }
+      this.resultArray = null;
+      this.resultStruct = new ColumnarBatch.Row(this.childColumns);
+    } else if (type instanceof CalendarIntervalType) {
+      // Two columns. Months as int. Microseconds as Long.
+      this.childColumns = new WritableColumnVector[2];
+      this.childColumns[0] = reserveNewColumn(capacity, DataTypes.IntegerType);
+      this.childColumns[1] = reserveNewColumn(capacity, DataTypes.LongType);
+      this.resultArray = null;
+      this.resultStruct = new ColumnarBatch.Row(this.childColumns);
+    } else {
+      this.childColumns = null;
+      this.resultArray = null;
+      this.resultStruct = null;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
index 0c40417..13f7927 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/aggregate/VectorizedHashMapGenerator.scala
@@ -76,6 +76,8 @@ class VectorizedHashMapGenerator(
         }.mkString("\n").concat(";")
 
     s"""
+       |  private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] batchVectors;
+       |  private org.apache.spark.sql.execution.vectorized.OnHeapColumnVector[] bufferVectors;
        |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch batch;
        |  private org.apache.spark.sql.execution.vectorized.ColumnarBatch aggregateBufferBatch;
        |  private int[] buckets;
@@ -89,14 +91,19 @@ class VectorizedHashMapGenerator(
        |    $generatedAggBufferSchema
        |
        |  public $generatedClassName() {
-       |    batch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(schema,
-       |      org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-       |    // TODO: Possibly generate this projection in HashAggregate directly
-       |    aggregateBufferBatch = org.apache.spark.sql.execution.vectorized.ColumnarBatch.allocate(
-       |      aggregateBufferSchema, org.apache.spark.memory.MemoryMode.ON_HEAP, capacity);
-       |    for (int i = 0 ; i < aggregateBufferBatch.numCols(); i++) {
-       |       aggregateBufferBatch.setColumn(i, batch.column(i+${groupingKeys.length}));
+       |    batchVectors = org.apache.spark.sql.execution.vectorized
+       |      .OnHeapColumnVector.allocateColumns(capacity, schema);
+       |    batch = new org.apache.spark.sql.execution.vectorized.ColumnarBatch(
+       |      schema, batchVectors, capacity);
+       |
+       |    bufferVectors = new org.apache.spark.sql.execution.vectorized
+       |      .OnHeapColumnVector[aggregateBufferSchema.fields().length];
+       |    for (int i = 0; i < aggregateBufferSchema.fields().length; i++) {
+       |      bufferVectors[i] = batchVectors[i + ${groupingKeys.length}];
        |    }
+       |    // TODO: Possibly generate this projection in HashAggregate directly
+       |    aggregateBufferBatch = new org.apache.spark.sql.execution.vectorized.ColumnarBatch(
+       |      aggregateBufferSchema, bufferVectors, capacity);
        |
        |    buckets = new int[numBuckets];
        |    java.util.Arrays.fill(buckets, -1);
@@ -112,8 +119,8 @@ class VectorizedHashMapGenerator(
    *
    * {{{
    * private boolean equals(int idx, long agg_key, long agg_key1) {
-   *   return batch.column(0).getLong(buckets[idx]) == agg_key &&
-   *     batch.column(1).getLong(buckets[idx]) == agg_key1;
+   *   return batchVectors[0].getLong(buckets[idx]) == agg_key &&
+   *     batchVectors[1].getLong(buckets[idx]) == agg_key1;
    * }
    * }}}
    */
@@ -121,8 +128,8 @@ class VectorizedHashMapGenerator(
 
     def genEqualsForKeys(groupingKeys: Seq[Buffer]): String = {
       groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
-        s"""(${ctx.genEqual(key.dataType, ctx.getValue("batch", "buckets[idx]",
-          key.dataType, ordinal), key.name)})"""
+        s"""(${ctx.genEqual(key.dataType, ctx.getValue(s"batchVectors[$ordinal]", "buckets[idx]",
+          key.dataType), key.name)})"""
       }.mkString(" && ")
     }
 
@@ -150,9 +157,9 @@ class VectorizedHashMapGenerator(
    *   while (step < maxSteps) {
    *     // Return bucket index if it's either an empty slot or already contains the key
    *     if (buckets[idx] == -1) {
-   *       batch.column(0).putLong(numRows, agg_key);
-   *       batch.column(1).putLong(numRows, agg_key1);
-   *       batch.column(2).putLong(numRows, 0);
+   *       batchVectors[0].putLong(numRows, agg_key);
+   *       batchVectors[1].putLong(numRows, agg_key1);
+   *       batchVectors[2].putLong(numRows, 0);
    *       buckets[idx] = numRows++;
    *       return batch.getRow(buckets[idx]);
    *     } else if (equals(idx, agg_key, agg_key1)) {
@@ -170,13 +177,13 @@ class VectorizedHashMapGenerator(
 
     def genCodeToSetKeys(groupingKeys: Seq[Buffer]): Seq[String] = {
       groupingKeys.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
-        ctx.setValue("batch", "numRows", key.dataType, ordinal, key.name)
+        ctx.setValue(s"batchVectors[$ordinal]", "numRows", key.dataType, key.name)
       }
     }
 
     def genCodeToSetAggBuffers(bufferValues: Seq[Buffer]): Seq[String] = {
       bufferValues.zipWithIndex.map { case (key: Buffer, ordinal: Int) =>
-        ctx.updateColumn("batch", "numRows", key.dataType, groupingKeys.length + ordinal,
+        ctx.updateColumn(s"batchVectors[${groupingKeys.length + ordinal}]", "numRows", key.dataType,
           buffVars(ordinal), nullable = true)
       }
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index 67b3d98..1331f15 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -24,7 +24,10 @@ import scala.util.Random
 import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.vectorized.ColumnVector
-import org.apache.spark.sql.types.{BinaryType, IntegerType}
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector
+import org.apache.spark.sql.types.{BinaryType, DataType, IntegerType}
 import org.apache.spark.unsafe.Platform
 import org.apache.spark.util.Benchmark
 import org.apache.spark.util.collection.BitSet
@@ -34,6 +37,14 @@ import org.apache.spark.util.collection.BitSet
  */
 object ColumnarBatchBenchmark {
 
+  def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = {
+    if (memMode == MemoryMode.OFF_HEAP) {
+      new OffHeapColumnVector(capacity, dt)
+    } else {
+      new OnHeapColumnVector(capacity, dt)
+    }
+  }
+
   // This benchmark reads and writes an array of ints.
   // TODO: there is a big (2x) penalty for a random access API for off heap.
   // Note: carefully if modifying this code. It's hard to reason about the JIT.
@@ -140,7 +151,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with on heap memory
     val columnOnHeap = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
+      val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -159,7 +170,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with off heap memory
     def columnOffHeap = { i: Int => {
-      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
+      val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -178,7 +189,7 @@ object ColumnarBatchBenchmark {
 
     // Access by directly getting the buffer backing the column.
     val columnOffheapDirect = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
+      val col = allocate(count, IntegerType, MemoryMode.OFF_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var addr = col.valuesNativeAddress()
@@ -244,7 +255,7 @@ object ColumnarBatchBenchmark {
 
     // Adding values by appending, instead of putting.
     val onHeapAppend = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
+      val col = allocate(count, IntegerType, MemoryMode.ON_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -362,7 +373,7 @@ object ColumnarBatchBenchmark {
       .map(_.getBytes(StandardCharsets.UTF_8)).toArray
 
     def column(memoryMode: MemoryMode) = { i: Int =>
-      val column = ColumnVector.allocate(count, BinaryType, memoryMode)
+      val column = allocate(count, BinaryType, memoryMode)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index c8461dc..08ccbd6 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -34,11 +34,20 @@ import org.apache.spark.unsafe.Platform
 import org.apache.spark.unsafe.types.CalendarInterval
 
 class ColumnarBatchSuite extends SparkFunSuite {
+
+  def allocate(capacity: Int, dt: DataType, memMode: MemoryMode): WritableColumnVector = {
+    if (memMode == MemoryMode.OFF_HEAP) {
+      new OffHeapColumnVector(capacity, dt)
+    } else {
+      new OnHeapColumnVector(capacity, dt)
+    }
+  }
+
   test("Null Apis") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val reference = mutable.ArrayBuffer.empty[Boolean]
 
-      val column = ColumnVector.allocate(1024, IntegerType, memMode)
+      val column = allocate(1024, IntegerType, memMode)
       var idx = 0
       assert(column.anyNullsSet() == false)
       assert(column.numNulls() == 0)
@@ -109,7 +118,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val reference = mutable.ArrayBuffer.empty[Byte]
 
-      val column = ColumnVector.allocate(1024, ByteType, memMode)
+      val column = allocate(1024, ByteType, memMode)
 
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toByte).toArray
       column.appendBytes(2, values, 0)
@@ -167,7 +176,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Short]
 
-      val column = ColumnVector.allocate(1024, ShortType, memMode)
+      val column = allocate(1024, ShortType, memMode)
 
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).map(_.toShort).toArray
       column.appendShorts(2, values, 0)
@@ -247,7 +256,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Int]
 
-      val column = ColumnVector.allocate(1024, IntegerType, memMode)
+      val column = allocate(1024, IntegerType, memMode)
 
       var values = (10 :: 20 :: 30 :: 40 :: 50 :: Nil).toArray
       column.appendInts(2, values, 0)
@@ -332,7 +341,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Long]
 
-      val column = ColumnVector.allocate(1024, LongType, memMode)
+      val column = allocate(1024, LongType, memMode)
 
       var values = (10L :: 20L :: 30L :: 40L :: 50L :: Nil).toArray
       column.appendLongs(2, values, 0)
@@ -419,7 +428,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Float]
 
-      val column = ColumnVector.allocate(1024, FloatType, memMode)
+      val column = allocate(1024, FloatType, memMode)
 
       var values = (.1f :: .2f :: .3f :: .4f :: .5f :: Nil).toArray
       column.appendFloats(2, values, 0)
@@ -510,7 +519,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Double]
 
-      val column = ColumnVector.allocate(1024, DoubleType, memMode)
+      val column = allocate(1024, DoubleType, memMode)
 
       var values = (.1 :: .2 :: .3 :: .4 :: .5 :: Nil).toArray
       column.appendDoubles(2, values, 0)
@@ -599,7 +608,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val reference = mutable.ArrayBuffer.empty[String]
 
-      val column = ColumnVector.allocate(6, BinaryType, memMode)
+      val column = allocate(6, BinaryType, memMode)
       assert(column.arrayData().elementsAppended == 0)
 
       val str = "string"
@@ -656,7 +665,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
   test("Int Array") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
-      val column = ColumnVector.allocate(10, new ArrayType(IntegerType, true), memMode)
+      val column = allocate(10, new ArrayType(IntegerType, true), memMode)
 
       // Fill the underlying data with all the arrays back to back.
       val data = column.arrayData();
@@ -714,43 +723,43 @@ class ColumnarBatchSuite extends SparkFunSuite {
     (MemoryMode.ON_HEAP :: Nil).foreach { memMode => {
       val len = 4
 
-      val columnBool = ColumnVector.allocate(len, new ArrayType(BooleanType, false), memMode)
+      val columnBool = allocate(len, new ArrayType(BooleanType, false), memMode)
       val boolArray = Array(false, true, false, true)
       boolArray.zipWithIndex.map { case (v, i) => columnBool.arrayData.putBoolean(i, v) }
       columnBool.putArray(0, 0, len)
       assert(columnBool.getArray(0).toBooleanArray === boolArray)
 
-      val columnByte = ColumnVector.allocate(len, new ArrayType(ByteType, false), memMode)
+      val columnByte = allocate(len, new ArrayType(ByteType, false), memMode)
       val byteArray = Array[Byte](0, 1, 2, 3)
       byteArray.zipWithIndex.map { case (v, i) => columnByte.arrayData.putByte(i, v) }
       columnByte.putArray(0, 0, len)
       assert(columnByte.getArray(0).toByteArray === byteArray)
 
-      val columnShort = ColumnVector.allocate(len, new ArrayType(ShortType, false), memMode)
+      val columnShort = allocate(len, new ArrayType(ShortType, false), memMode)
       val shortArray = Array[Short](0, 1, 2, 3)
       shortArray.zipWithIndex.map { case (v, i) => columnShort.arrayData.putShort(i, v) }
       columnShort.putArray(0, 0, len)
       assert(columnShort.getArray(0).toShortArray === shortArray)
 
-      val columnInt = ColumnVector.allocate(len, new ArrayType(IntegerType, false), memMode)
+      val columnInt = allocate(len, new ArrayType(IntegerType, false), memMode)
       val intArray = Array(0, 1, 2, 3)
       intArray.zipWithIndex.map { case (v, i) => columnInt.arrayData.putInt(i, v) }
       columnInt.putArray(0, 0, len)
       assert(columnInt.getArray(0).toIntArray === intArray)
 
-      val columnLong = ColumnVector.allocate(len, new ArrayType(LongType, false), memMode)
+      val columnLong = allocate(len, new ArrayType(LongType, false), memMode)
       val longArray = Array[Long](0, 1, 2, 3)
       longArray.zipWithIndex.map { case (v, i) => columnLong.arrayData.putLong(i, v) }
       columnLong.putArray(0, 0, len)
       assert(columnLong.getArray(0).toLongArray === longArray)
 
-      val columnFloat = ColumnVector.allocate(len, new ArrayType(FloatType, false), memMode)
+      val columnFloat = allocate(len, new ArrayType(FloatType, false), memMode)
       val floatArray = Array(0.0F, 1.1F, 2.2F, 3.3F)
       floatArray.zipWithIndex.map { case (v, i) => columnFloat.arrayData.putFloat(i, v) }
       columnFloat.putArray(0, 0, len)
       assert(columnFloat.getArray(0).toFloatArray === floatArray)
 
-      val columnDouble = ColumnVector.allocate(len, new ArrayType(DoubleType, false), memMode)
+      val columnDouble = allocate(len, new ArrayType(DoubleType, false), memMode)
       val doubleArray = Array(0.0, 1.1, 2.2, 3.3)
       doubleArray.zipWithIndex.map { case (v, i) => columnDouble.arrayData.putDouble(i, v) }
       columnDouble.putArray(0, 0, len)
@@ -761,7 +770,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   test("Struct Column") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val schema = new StructType().add("int", IntegerType).add("double", DoubleType)
-      val column = ColumnVector.allocate(1024, schema, memMode)
+      val column = allocate(1024, schema, memMode)
 
       val c1 = column.getChildColumn(0)
       val c2 = column.getChildColumn(1)
@@ -790,7 +799,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
   test("Nest Array in Array.") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
-      val column = ColumnVector.allocate(10, new ArrayType(new ArrayType(IntegerType, true), true),
+      val column = allocate(10, new ArrayType(new ArrayType(IntegerType, true), true),
         memMode)
       val childColumn = column.arrayData()
       val data = column.arrayData().arrayData()
@@ -823,7 +832,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
   test("Nest Struct in Array.") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
       val schema = new StructType().add("int", IntegerType).add("long", LongType)
-      val column = ColumnVector.allocate(10, new ArrayType(schema, true), memMode)
+      val column = allocate(10, new ArrayType(schema, true), memMode)
       val data = column.arrayData()
       val c0 = data.getChildColumn(0)
       val c1 = data.getChildColumn(1)
@@ -853,7 +862,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val schema = new StructType()
         .add("int", IntegerType)
         .add("array", new ArrayType(IntegerType, true))
-      val column = ColumnVector.allocate(10, schema, memMode)
+      val column = allocate(10, schema, memMode)
       val c0 = column.getChildColumn(0)
       val c1 = column.getChildColumn(1)
       c0.putInt(0, 0)
@@ -885,7 +894,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       val schema = new StructType()
         .add("int", IntegerType)
         .add("struct", subSchema)
-      val column = ColumnVector.allocate(10, schema, memMode)
+      val column = allocate(10, schema, memMode)
       val c0 = column.getChildColumn(0)
       val c1 = column.getChildColumn(1)
       c0.putInt(0, 0)
@@ -918,7 +927,11 @@ class ColumnarBatchSuite extends SparkFunSuite {
         .add("intCol2", IntegerType)
         .add("string", BinaryType)
 
-      val batch = ColumnarBatch.allocate(schema, memMode)
+      val capacity = ColumnarBatch.DEFAULT_BATCH_SIZE
+      val columns = schema.fields.map { field =>
+        allocate(capacity, field.dataType, memMode)
+      }
+      val batch = new ColumnarBatch(schema, columns.toArray, ColumnarBatch.DEFAULT_BATCH_SIZE)
       assert(batch.numCols() == 4)
       assert(batch.numRows() == 0)
       assert(batch.numValidRows() == 0)
@@ -926,10 +939,10 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.rowIterator().hasNext == false)
 
       // Add a row [1, 1.1, NULL]
-      batch.column(0).putInt(0, 1)
-      batch.column(1).putDouble(0, 1.1)
-      batch.column(2).putNull(0)
-      batch.column(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8))
+      columns(0).putInt(0, 1)
+      columns(1).putDouble(0, 1.1)
+      columns(2).putNull(0)
+      columns(3).putByteArray(0, "Hello".getBytes(StandardCharsets.UTF_8))
       batch.setNumRows(1)
 
       // Verify the results of the row.
@@ -939,12 +952,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.rowIterator().hasNext == true)
       assert(batch.rowIterator().hasNext == true)
 
-      assert(batch.column(0).getInt(0) == 1)
-      assert(batch.column(0).isNullAt(0) == false)
-      assert(batch.column(1).getDouble(0) == 1.1)
-      assert(batch.column(1).isNullAt(0) == false)
-      assert(batch.column(2).isNullAt(0) == true)
-      assert(batch.column(3).getUTF8String(0).toString == "Hello")
+      assert(columns(0).getInt(0) == 1)
+      assert(columns(0).isNullAt(0) == false)
+      assert(columns(1).getDouble(0) == 1.1)
+      assert(columns(1).isNullAt(0) == false)
+      assert(columns(2).isNullAt(0) == true)
+      assert(columns(3).getUTF8String(0).toString == "Hello")
 
       // Verify the iterator works correctly.
       val it = batch.rowIterator()
@@ -955,7 +968,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(row.getDouble(1) == 1.1)
       assert(row.isNullAt(1) == false)
       assert(row.isNullAt(2) == true)
-      assert(batch.column(3).getUTF8String(0).toString == "Hello")
+      assert(columns(3).getUTF8String(0).toString == "Hello")
       assert(it.hasNext == false)
       assert(it.hasNext == false)
 
@@ -972,20 +985,20 @@ class ColumnarBatchSuite extends SparkFunSuite {
       assert(batch.rowIterator().hasNext == false)
 
       // Add rows [NULL, 2.2, 2, "abc"], [3, NULL, 3, ""], [4, 4.4, 4, "world]
-      batch.column(0).putNull(0)
-      batch.column(1).putDouble(0, 2.2)
-      batch.column(2).putInt(0, 2)
-      batch.column(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8))
-
-      batch.column(0).putInt(1, 3)
-      batch.column(1).putNull(1)
-      batch.column(2).putInt(1, 3)
-      batch.column(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8))
-
-      batch.column(0).putInt(2, 4)
-      batch.column(1).putDouble(2, 4.4)
-      batch.column(2).putInt(2, 4)
-      batch.column(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8))
+      columns(0).putNull(0)
+      columns(1).putDouble(0, 2.2)
+      columns(2).putInt(0, 2)
+      columns(3).putByteArray(0, "abc".getBytes(StandardCharsets.UTF_8))
+
+      columns(0).putInt(1, 3)
+      columns(1).putNull(1)
+      columns(2).putInt(1, 3)
+      columns(3).putByteArray(1, "".getBytes(StandardCharsets.UTF_8))
+
+      columns(0).putInt(2, 4)
+      columns(1).putDouble(2, 4.4)
+      columns(2).putInt(2, 4)
+      columns(3).putByteArray(2, "world".getBytes(StandardCharsets.UTF_8))
       batch.setNumRows(3)
 
       def rowEquals(x: InternalRow, y: Row): Unit = {
@@ -1232,7 +1245,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
   test("exceeding maximum capacity should throw an error") {
     (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode =>
-      val column = ColumnVector.allocate(1, ByteType, memMode)
+      val column = allocate(1, ByteType, memMode)
       column.MAX_CAPACITY = 15
       column.appendBytes(5, 0.toByte)
       // Successfully allocate twice the requested capacity


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org


[2/2] spark git commit: [SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector read-only and to introduce WritableColumnVector.

Posted by we...@apache.org.
[SPARK-21745][SQL] Refactor ColumnVector hierarchy to make ColumnVector read-only and to introduce WritableColumnVector.

## What changes were proposed in this pull request?

This is a refactoring of `ColumnVector` hierarchy and related classes.

1. make `ColumnVector` read-only
2. introduce `WritableColumnVector` with write interface
3. remove `ReadOnlyColumnVector`

## How was this patch tested?

Existing tests.

Author: Takuya UESHIN <ue...@databricks.com>

Closes #18958 from ueshin/issues/SPARK-21745.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9e33954d
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9e33954d
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9e33954d

Branch: refs/heads/master
Commit: 9e33954ddfe1148f69e523c89827feb76ba892c9
Parents: dc5d34d
Author: Takuya UESHIN <ue...@databricks.com>
Authored: Thu Aug 24 21:13:44 2017 +0800
Committer: Wenchen Fan <we...@databricks.com>
Committed: Thu Aug 24 21:13:44 2017 +0800

----------------------------------------------------------------------
 .../expressions/codegen/CodeGenerator.scala     |  28 +-
 .../parquet/VectorizedColumnReader.java         |  31 +-
 .../parquet/VectorizedParquetRecordReader.java  |  23 +-
 .../parquet/VectorizedPlainValuesReader.java    |  16 +-
 .../parquet/VectorizedRleValuesReader.java      |  87 ++-
 .../parquet/VectorizedValuesReader.java         |  16 +-
 .../execution/vectorized/AggregateHashMap.java  |  10 +-
 .../execution/vectorized/ArrowColumnVector.java |  45 +-
 .../sql/execution/vectorized/ColumnVector.java  | 632 +----------------
 .../execution/vectorized/ColumnVectorUtils.java |  18 +-
 .../sql/execution/vectorized/ColumnarBatch.java | 106 +--
 .../vectorized/OffHeapColumnVector.java         |  34 +-
 .../vectorized/OnHeapColumnVector.java          |  35 +-
 .../vectorized/ReadOnlyColumnVector.java        | 251 -------
 .../vectorized/WritableColumnVector.java        | 674 +++++++++++++++++++
 .../aggregate/VectorizedHashMapGenerator.scala  |  39 +-
 .../vectorized/ColumnarBatchBenchmark.scala     |  23 +-
 .../vectorized/ColumnarBatchSuite.scala         | 109 +--
 18 files changed, 1078 insertions(+), 1099 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
----------------------------------------------------------------------
diff --git a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
index 807765c..3853863 100644
--- a/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/expressions/codegen/CodeGenerator.scala
@@ -464,14 +464,13 @@ class CodegenContext {
   /**
    * Returns the specialized code to set a given value in a column vector for a given `DataType`.
    */
-  def setValue(batch: String, row: String, dataType: DataType, ordinal: Int,
-      value: String): String = {
+  def setValue(vector: String, rowId: String, dataType: DataType, value: String): String = {
     val jt = javaType(dataType)
     dataType match {
       case _ if isPrimitiveType(jt) =>
-        s"$batch.column($ordinal).put${primitiveTypeName(jt)}($row, $value);"
-      case t: DecimalType => s"$batch.column($ordinal).putDecimal($row, $value, ${t.precision});"
-      case t: StringType => s"$batch.column($ordinal).putByteArray($row, $value.getBytes());"
+        s"$vector.put${primitiveTypeName(jt)}($rowId, $value);"
+      case t: DecimalType => s"$vector.putDecimal($rowId, $value, ${t.precision});"
+      case t: StringType => s"$vector.putByteArray($rowId, $value.getBytes());"
       case _ =>
         throw new IllegalArgumentException(s"cannot generate code for unsupported type: $dataType")
     }
@@ -482,37 +481,36 @@ class CodegenContext {
    * that could potentially be nullable.
    */
   def updateColumn(
-      batch: String,
-      row: String,
+      vector: String,
+      rowId: String,
       dataType: DataType,
-      ordinal: Int,
       ev: ExprCode,
       nullable: Boolean): String = {
     if (nullable) {
       s"""
          if (!${ev.isNull}) {
-           ${setValue(batch, row, dataType, ordinal, ev.value)}
+           ${setValue(vector, rowId, dataType, ev.value)}
          } else {
-           $batch.column($ordinal).putNull($row);
+           $vector.putNull($rowId);
          }
        """
     } else {
-      s"""${setValue(batch, row, dataType, ordinal, ev.value)};"""
+      s"""${setValue(vector, rowId, dataType, ev.value)};"""
     }
   }
 
   /**
    * Returns the specialized code to access a value from a column vector for a given `DataType`.
    */
-  def getValue(batch: String, row: String, dataType: DataType, ordinal: Int): String = {
+  def getValue(vector: String, rowId: String, dataType: DataType): String = {
     val jt = javaType(dataType)
     dataType match {
       case _ if isPrimitiveType(jt) =>
-        s"$batch.column($ordinal).get${primitiveTypeName(jt)}($row)"
+        s"$vector.get${primitiveTypeName(jt)}($rowId)"
       case t: DecimalType =>
-        s"$batch.column($ordinal).getDecimal($row, ${t.precision}, ${t.scale})"
+        s"$vector.getDecimal($rowId, ${t.precision}, ${t.scale})"
       case StringType =>
-        s"$batch.column($ordinal).getUTF8String($row)"
+        s"$vector.getUTF8String($rowId)"
       case _ =>
         throw new IllegalArgumentException(s"cannot generate code for unsupported type: $dataType")
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
index fd8db17..f37864a 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedColumnReader.java
@@ -30,6 +30,7 @@ import org.apache.parquet.schema.PrimitiveType;
 
 import org.apache.spark.sql.catalyst.util.DateTimeUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.DecimalType;
 
@@ -135,9 +136,9 @@ public class VectorizedColumnReader {
   /**
    * Reads `total` values from this columnReader into column.
    */
-  void readBatch(int total, ColumnVector column) throws IOException {
+  void readBatch(int total, WritableColumnVector column) throws IOException {
     int rowId = 0;
-    ColumnVector dictionaryIds = null;
+    WritableColumnVector dictionaryIds = null;
     if (dictionary != null) {
       // SPARK-16334: We only maintain a single dictionary per row batch, so that it can be used to
       // decode all previous dictionary encoded pages if we ever encounter a non-dictionary encoded
@@ -219,8 +220,11 @@ public class VectorizedColumnReader {
   /**
    * Reads `num` values into column, decoding the values from `dictionaryIds` and `dictionary`.
    */
-  private void decodeDictionaryIds(int rowId, int num, ColumnVector column,
-                                   ColumnVector dictionaryIds) {
+  private void decodeDictionaryIds(
+      int rowId,
+      int num,
+      WritableColumnVector column,
+      ColumnVector dictionaryIds) {
     switch (descriptor.getType()) {
       case INT32:
         if (column.dataType() == DataTypes.IntegerType ||
@@ -346,13 +350,13 @@ public class VectorizedColumnReader {
    * is guaranteed that num is smaller than the number of values left in the current page.
    */
 
-  private void readBooleanBatch(int rowId, int num, ColumnVector column) throws IOException {
+  private void readBooleanBatch(int rowId, int num, WritableColumnVector column) throws IOException {
     assert(column.dataType() == DataTypes.BooleanType);
     defColumn.readBooleans(
         num, column, rowId, maxDefLevel, (VectorizedValuesReader) dataColumn);
   }
 
-  private void readIntBatch(int rowId, int num, ColumnVector column) throws IOException {
+  private void readIntBatch(int rowId, int num, WritableColumnVector column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     if (column.dataType() == DataTypes.IntegerType || column.dataType() == DataTypes.DateType ||
@@ -370,7 +374,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readLongBatch(int rowId, int num, ColumnVector column) throws IOException {
+  private void readLongBatch(int rowId, int num, WritableColumnVector column) throws IOException {
     // This is where we implement support for the valid type conversions.
     if (column.dataType() == DataTypes.LongType ||
         DecimalType.is64BitDecimalType(column.dataType())) {
@@ -389,7 +393,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readFloatBatch(int rowId, int num, ColumnVector column) throws IOException {
+  private void readFloatBatch(int rowId, int num, WritableColumnVector column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: support implicit cast to double?
     if (column.dataType() == DataTypes.FloatType) {
@@ -400,7 +404,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readDoubleBatch(int rowId, int num, ColumnVector column) throws IOException {
+  private void readDoubleBatch(int rowId, int num, WritableColumnVector column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     if (column.dataType() == DataTypes.DoubleType) {
@@ -411,7 +415,7 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readBinaryBatch(int rowId, int num, ColumnVector column) throws IOException {
+  private void readBinaryBatch(int rowId, int num, WritableColumnVector column) throws IOException {
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions
     VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
@@ -432,8 +436,11 @@ public class VectorizedColumnReader {
     }
   }
 
-  private void readFixedLenByteArrayBatch(int rowId, int num,
-                                          ColumnVector column, int arrayLen) throws IOException {
+  private void readFixedLenByteArrayBatch(
+      int rowId,
+      int num,
+      WritableColumnVector column,
+      int arrayLen) throws IOException {
     VectorizedValuesReader data = (VectorizedValuesReader) dataColumn;
     // This is where we implement support for the valid type conversions.
     // TODO: implement remaining type conversions

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
index 04f8141..0cacf0c 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedParquetRecordReader.java
@@ -31,6 +31,9 @@ import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.execution.vectorized.ColumnVectorUtils;
 import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
+import org.apache.spark.sql.execution.vectorized.OffHeapColumnVector;
+import org.apache.spark.sql.execution.vectorized.OnHeapColumnVector;
 import org.apache.spark.sql.types.StructField;
 import org.apache.spark.sql.types.StructType;
 
@@ -90,6 +93,8 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
    */
   private ColumnarBatch columnarBatch;
 
+  private WritableColumnVector[] columnVectors;
+
   /**
    * If true, this class returns batches instead of rows.
    */
@@ -172,20 +177,26 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
       }
     }
 
-    columnarBatch = ColumnarBatch.allocate(batchSchema, memMode);
+    int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
+    if (memMode == MemoryMode.OFF_HEAP) {
+      columnVectors = OffHeapColumnVector.allocateColumns(capacity, batchSchema);
+    } else {
+      columnVectors = OnHeapColumnVector.allocateColumns(capacity, batchSchema);
+    }
+    columnarBatch = new ColumnarBatch(batchSchema, columnVectors, capacity);
     if (partitionColumns != null) {
       int partitionIdx = sparkSchema.fields().length;
       for (int i = 0; i < partitionColumns.fields().length; i++) {
-        ColumnVectorUtils.populate(columnarBatch.column(i + partitionIdx), partitionValues, i);
-        columnarBatch.column(i + partitionIdx).setIsConstant();
+        ColumnVectorUtils.populate(columnVectors[i + partitionIdx], partitionValues, i);
+        columnVectors[i + partitionIdx].setIsConstant();
       }
     }
 
     // Initialize missing columns with nulls.
     for (int i = 0; i < missingColumns.length; i++) {
       if (missingColumns[i]) {
-        columnarBatch.column(i).putNulls(0, columnarBatch.capacity());
-        columnarBatch.column(i).setIsConstant();
+        columnVectors[i].putNulls(0, columnarBatch.capacity());
+        columnVectors[i].setIsConstant();
       }
     }
   }
@@ -226,7 +237,7 @@ public class VectorizedParquetRecordReader extends SpecificParquetRecordReaderBa
     int num = (int) Math.min((long) columnarBatch.capacity(), totalCountLoadedSoFar - rowsReturned);
     for (int i = 0; i < columnReaders.length; ++i) {
       if (columnReaders[i] == null) continue;
-      columnReaders[i].readBatch(num, columnarBatch.column(i));
+      columnReaders[i].readBatch(num, columnVectors[i]);
     }
     rowsReturned += num;
     columnarBatch.setNumRows(num);

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
index 98018b7..5b75f71 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -20,7 +20,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 import org.apache.spark.unsafe.Platform;
 
 import org.apache.parquet.column.values.ValuesReader;
@@ -56,7 +56,7 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
   }
 
   @Override
-  public final void readBooleans(int total, ColumnVector c, int rowId) {
+  public final void readBooleans(int total, WritableColumnVector c, int rowId) {
     // TODO: properly vectorize this
     for (int i = 0; i < total; i++) {
       c.putBoolean(rowId + i, readBoolean());
@@ -64,31 +64,31 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
   }
 
   @Override
-  public final void readIntegers(int total, ColumnVector c, int rowId) {
+  public final void readIntegers(int total, WritableColumnVector c, int rowId) {
     c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
     offset += 4 * total;
   }
 
   @Override
-  public final void readLongs(int total, ColumnVector c, int rowId) {
+  public final void readLongs(int total, WritableColumnVector c, int rowId) {
     c.putLongsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
     offset += 8 * total;
   }
 
   @Override
-  public final void readFloats(int total, ColumnVector c, int rowId) {
+  public final void readFloats(int total, WritableColumnVector c, int rowId) {
     c.putFloats(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
     offset += 4 * total;
   }
 
   @Override
-  public final void readDoubles(int total, ColumnVector c, int rowId) {
+  public final void readDoubles(int total, WritableColumnVector c, int rowId) {
     c.putDoubles(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
     offset += 8 * total;
   }
 
   @Override
-  public final void readBytes(int total, ColumnVector c, int rowId) {
+  public final void readBytes(int total, WritableColumnVector c, int rowId) {
     for (int i = 0; i < total; i++) {
       // Bytes are stored as a 4-byte little endian int. Just read the first byte.
       // TODO: consider pushing this in ColumnVector by adding a readBytes with a stride.
@@ -159,7 +159,7 @@ public class VectorizedPlainValuesReader extends ValuesReader implements Vectori
   }
 
   @Override
-  public final void readBinary(int total, ColumnVector v, int rowId) {
+  public final void readBinary(int total, WritableColumnVector v, int rowId) {
     for (int i = 0; i < total; i++) {
       int len = readInteger();
       int start = offset;

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
index 6215738..fc7fa70 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -25,7 +25,7 @@ import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 /**
  * A values reader for Parquet's run-length encoded data. This is based off of the version in
@@ -177,7 +177,11 @@ public final class VectorizedRleValuesReader extends ValuesReader
    *    c[rowId] = null;
    *  }
    */
-  public void readIntegers(int total, ColumnVector c, int rowId, int level,
+  public void readIntegers(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
       VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
@@ -208,8 +212,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
   }
 
   // TODO: can this code duplication be removed without a perf penalty?
-  public void readBooleans(int total, ColumnVector c,
-                        int rowId, int level, VectorizedValuesReader data) {
+  public void readBooleans(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -238,8 +246,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
     }
   }
 
-  public void readBytes(int total, ColumnVector c,
-                        int rowId, int level, VectorizedValuesReader data) {
+  public void readBytes(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -268,8 +280,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
     }
   }
 
-  public void readShorts(int total, ColumnVector c,
-                        int rowId, int level, VectorizedValuesReader data) {
+  public void readShorts(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -300,8 +316,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
     }
   }
 
-  public void readLongs(int total, ColumnVector c, int rowId, int level,
-                        VectorizedValuesReader data) {
+  public void readLongs(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -330,8 +350,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
     }
   }
 
-  public void readFloats(int total, ColumnVector c, int rowId, int level,
-                        VectorizedValuesReader data) {
+  public void readFloats(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -360,8 +384,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
     }
   }
 
-  public void readDoubles(int total, ColumnVector c, int rowId, int level,
-                         VectorizedValuesReader data) {
+  public void readDoubles(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -390,8 +418,12 @@ public final class VectorizedRleValuesReader extends ValuesReader
     }
   }
 
-  public void readBinarys(int total, ColumnVector c, int rowId, int level,
-                        VectorizedValuesReader data) {
+  public void readBinarys(
+      int total,
+      WritableColumnVector c,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -424,8 +456,13 @@ public final class VectorizedRleValuesReader extends ValuesReader
    * Decoding for dictionary ids. The IDs are populated into `values` and the nullability is
    * populated into `nulls`.
    */
-  public void readIntegers(int total, ColumnVector values, ColumnVector nulls, int rowId, int level,
-                           VectorizedValuesReader data) {
+  public void readIntegers(
+      int total,
+      WritableColumnVector values,
+      WritableColumnVector nulls,
+      int rowId,
+      int level,
+      VectorizedValuesReader data) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -459,7 +496,7 @@ public final class VectorizedRleValuesReader extends ValuesReader
   // IDs. This is different than the above APIs that decodes definitions levels along with values.
   // Since this is only used to decode dictionary IDs, only decoding integers is supported.
   @Override
-  public void readIntegers(int total, ColumnVector c, int rowId) {
+  public void readIntegers(int total, WritableColumnVector c, int rowId) {
     int left = total;
     while (left > 0) {
       if (this.currentCount == 0) this.readNextGroup();
@@ -485,32 +522,32 @@ public final class VectorizedRleValuesReader extends ValuesReader
   }
 
   @Override
-  public void readBytes(int total, ColumnVector c, int rowId) {
+  public void readBytes(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readLongs(int total, ColumnVector c, int rowId) {
+  public void readLongs(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readBinary(int total, ColumnVector c, int rowId) {
+  public void readBinary(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readBooleans(int total, ColumnVector c, int rowId) {
+  public void readBooleans(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readFloats(int total, ColumnVector c, int rowId) {
+  public void readFloats(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 
   @Override
-  public void readDoubles(int total, ColumnVector c, int rowId) {
+  public void readDoubles(int total, WritableColumnVector c, int rowId) {
     throw new UnsupportedOperationException("only readInts is valid.");
   }
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
index 88418ca..57d92ae 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -17,7 +17,7 @@
 
 package org.apache.spark.sql.execution.datasources.parquet;
 
-import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.WritableColumnVector;
 
 import org.apache.parquet.io.api.Binary;
 
@@ -37,11 +37,11 @@ public interface VectorizedValuesReader {
   /*
    * Reads `total` values into `c` start at `c[rowId]`
    */
-  void readBooleans(int total, ColumnVector c, int rowId);
-  void readBytes(int total, ColumnVector c, int rowId);
-  void readIntegers(int total, ColumnVector c, int rowId);
-  void readLongs(int total, ColumnVector c, int rowId);
-  void readFloats(int total, ColumnVector c, int rowId);
-  void readDoubles(int total, ColumnVector c, int rowId);
-  void readBinary(int total, ColumnVector c, int rowId);
+  void readBooleans(int total, WritableColumnVector c, int rowId);
+  void readBytes(int total, WritableColumnVector c, int rowId);
+  void readIntegers(int total, WritableColumnVector c, int rowId);
+  void readLongs(int total, WritableColumnVector c, int rowId);
+  void readFloats(int total, WritableColumnVector c, int rowId);
+  void readDoubles(int total, WritableColumnVector c, int rowId);
+  void readBinary(int total, WritableColumnVector c, int rowId);
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
index 25a565d..1c94f70 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/AggregateHashMap.java
@@ -41,6 +41,7 @@ import static org.apache.spark.sql.types.DataTypes.LongType;
  */
 public class AggregateHashMap {
 
+  private OnHeapColumnVector[] columnVectors;
   private ColumnarBatch batch;
   private int[] buckets;
   private int numBuckets;
@@ -62,7 +63,8 @@ public class AggregateHashMap {
 
     this.maxSteps = maxSteps;
     numBuckets = (int) (capacity / loadFactor);
-    batch = ColumnarBatch.allocate(schema, MemoryMode.ON_HEAP, capacity);
+    columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema);
+    batch = new ColumnarBatch(schema, columnVectors, capacity);
     buckets = new int[numBuckets];
     Arrays.fill(buckets, -1);
   }
@@ -74,8 +76,8 @@ public class AggregateHashMap {
   public ColumnarBatch.Row findOrInsert(long key) {
     int idx = find(key);
     if (idx != -1 && buckets[idx] == -1) {
-      batch.column(0).putLong(numRows, key);
-      batch.column(1).putLong(numRows, 0);
+      columnVectors[0].putLong(numRows, key);
+      columnVectors[1].putLong(numRows, 0);
       buckets[idx] = numRows++;
     }
     return batch.getRow(buckets[idx]);
@@ -105,6 +107,6 @@ public class AggregateHashMap {
   }
 
   private boolean equals(int idx, long key1) {
-    return batch.column(0).getLong(buckets[idx]) == key1;
+    return columnVectors[0].getLong(buckets[idx]) == key1;
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
index 59d66c5..be2a9c2 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ArrowColumnVector.java
@@ -29,12 +29,13 @@ import org.apache.spark.unsafe.types.UTF8String;
 /**
  * A column vector backed by Apache Arrow.
  */
-public final class ArrowColumnVector extends ReadOnlyColumnVector {
+public final class ArrowColumnVector extends ColumnVector {
 
   private final ArrowVectorAccessor accessor;
-  private final int valueCount;
+  private ArrowColumnVector[] childColumns;
 
   private void ensureAccessible(int index) {
+    int valueCount = accessor.getValueCount();
     if (index < 0 || index >= valueCount) {
       throw new IndexOutOfBoundsException(
         String.format("index: %d, valueCount: %d", index, valueCount));
@@ -42,6 +43,7 @@ public final class ArrowColumnVector extends ReadOnlyColumnVector {
   }
 
   private void ensureAccessible(int index, int count) {
+    int valueCount = accessor.getValueCount();
     if (index < 0 || index + count > valueCount) {
       throw new IndexOutOfBoundsException(
         String.format("index range: [%d, %d), valueCount: %d", index, index + count, valueCount));
@@ -49,6 +51,16 @@ public final class ArrowColumnVector extends ReadOnlyColumnVector {
   }
 
   @Override
+  public int numNulls() {
+    return accessor.getNullCount();
+  }
+
+  @Override
+  public boolean anyNullsSet() {
+    return numNulls() > 0;
+  }
+
+  @Override
   public long nullsNativeAddress() {
     throw new RuntimeException("Cannot get native address for arrow column");
   }
@@ -274,9 +286,20 @@ public final class ArrowColumnVector extends ReadOnlyColumnVector {
     return accessor.getBinary(rowId);
   }
 
+  /**
+   * Returns the data for the underlying array.
+   */
+  @Override
+  public ArrowColumnVector arrayData() { return childColumns[0]; }
+
+  /**
+   * Returns the ordinal's child data column.
+   */
+  @Override
+  public ArrowColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; }
+
   public ArrowColumnVector(ValueVector vector) {
-    super(vector.getValueCapacity(), ArrowUtils.fromArrowField(vector.getField()),
-      MemoryMode.OFF_HEAP);
+    super(ArrowUtils.fromArrowField(vector.getField()));
 
     if (vector instanceof NullableBitVector) {
       accessor = new BooleanAccessor((NullableBitVector) vector);
@@ -302,7 +325,7 @@ public final class ArrowColumnVector extends ReadOnlyColumnVector {
       ListVector listVector = (ListVector) vector;
       accessor = new ArrayAccessor(listVector);
 
-      childColumns = new ColumnVector[1];
+      childColumns = new ArrowColumnVector[1];
       childColumns[0] = new ArrowColumnVector(listVector.getDataVector());
       resultArray = new ColumnVector.Array(childColumns[0]);
     } else if (vector instanceof MapVector) {
@@ -317,9 +340,6 @@ public final class ArrowColumnVector extends ReadOnlyColumnVector {
     } else {
       throw new UnsupportedOperationException();
     }
-    valueCount = accessor.getValueCount();
-    numNulls = accessor.getNullCount();
-    anyNullsSet = numNulls > 0;
   }
 
   private abstract static class ArrowVectorAccessor {
@@ -327,14 +347,9 @@ public final class ArrowColumnVector extends ReadOnlyColumnVector {
     private final ValueVector vector;
     private final ValueVector.Accessor nulls;
 
-    private final int valueCount;
-    private final int nullCount;
-
     ArrowVectorAccessor(ValueVector vector) {
       this.vector = vector;
       this.nulls = vector.getAccessor();
-      this.valueCount = nulls.getValueCount();
-      this.nullCount = nulls.getNullCount();
     }
 
     final boolean isNullAt(int rowId) {
@@ -342,11 +357,11 @@ public final class ArrowColumnVector extends ReadOnlyColumnVector {
     }
 
     final int getValueCount() {
-      return valueCount;
+      return nulls.getValueCount();
     }
 
     final int getNullCount() {
-      return nullCount;
+      return nulls.getNullCount();
     }
 
     final void close() {

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index 7796638..a69dd97 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -16,23 +16,16 @@
  */
 package org.apache.spark.sql.execution.vectorized;
 
-import java.math.BigDecimal;
-import java.math.BigInteger;
-
-import com.google.common.annotations.VisibleForTesting;
-
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
-import org.apache.spark.sql.internal.SQLConf;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.types.CalendarInterval;
 import org.apache.spark.unsafe.types.UTF8String;
 
 /**
  * This class represents a column of values and provides the main APIs to access the data
- * values. It supports all the types and contains get/put APIs as well as their batched versions.
+ * values. It supports all the types and contains get APIs as well as their batched versions.
  * The batched versions are preferable whenever possible.
  *
  * To handle nested schemas, ColumnVector has two types: Arrays and Structs. In both cases these
@@ -40,34 +33,15 @@ import org.apache.spark.unsafe.types.UTF8String;
  * contains nullability, and in the case of Arrays, the lengths and offsets into the child column.
  * Lengths and offsets are encoded identically to INTs.
  * Maps are just a special case of a two field struct.
- * Strings are handled as an Array of ByteType.
- *
- * Capacity: The data stored is dense but the arrays are not fixed capacity. It is the
- * responsibility of the caller to call reserve() to ensure there is enough room before adding
- * elements. This means that the put() APIs do not check as in common cases (i.e. flat schemas),
- * the lengths are known up front.
  *
  * Most of the APIs take the rowId as a parameter. This is the batch local 0-based row id for values
  * in the current RowBatch.
  *
- * A ColumnVector should be considered immutable once originally created. In other words, it is not
- * valid to call put APIs after reads until reset() is called.
+ * A ColumnVector should be considered immutable once originally created.
  *
  * ColumnVectors are intended to be reused.
  */
 public abstract class ColumnVector implements AutoCloseable {
-  /**
-   * Allocates a column to store elements of `type` on or off heap.
-   * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
-   * in number of elements, not number of bytes.
-   */
-  public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) {
-    if (mode == MemoryMode.OFF_HEAP) {
-      return new OffHeapColumnVector(capacity, type);
-    } else {
-      return new OnHeapColumnVector(capacity, type);
-    }
-  }
 
   /**
    * Holder object to return an array. This object is intended to be reused. Callers should
@@ -279,74 +253,21 @@ public abstract class ColumnVector implements AutoCloseable {
   public final DataType dataType() { return type; }
 
   /**
-   * Resets this column for writing. The currently stored values are no longer accessible.
-   */
-  public void reset() {
-    if (isConstant) return;
-
-    if (childColumns != null) {
-      for (ColumnVector c: childColumns) {
-        c.reset();
-      }
-    }
-    numNulls = 0;
-    elementsAppended = 0;
-    if (anyNullsSet) {
-      putNotNulls(0, capacity);
-      anyNullsSet = false;
-    }
-  }
-
-  /**
    * Cleans up memory for this column. The column is not usable after this.
    * TODO: this should probably have ref-counted semantics.
    */
   public abstract void close();
 
-  public void reserve(int requiredCapacity) {
-    if (requiredCapacity > capacity) {
-      int newCapacity = (int) Math.min(MAX_CAPACITY, requiredCapacity * 2L);
-      if (requiredCapacity <= newCapacity) {
-        try {
-          reserveInternal(newCapacity);
-        } catch (OutOfMemoryError outOfMemoryError) {
-          throwUnsupportedException(requiredCapacity, outOfMemoryError);
-        }
-      } else {
-        throwUnsupportedException(requiredCapacity, null);
-      }
-    }
-  }
-
-  private void throwUnsupportedException(int requiredCapacity, Throwable cause) {
-    String message = "Cannot reserve additional contiguous bytes in the vectorized reader " +
-        "(requested = " + requiredCapacity + " bytes). As a workaround, you can disable the " +
-        "vectorized reader by setting " + SQLConf.PARQUET_VECTORIZED_READER_ENABLED().key() +
-        " to false.";
-
-    if (cause != null) {
-      throw new RuntimeException(message, cause);
-    } else {
-      throw new RuntimeException(message);
-    }
-  }
-
-  /**
-   * Ensures that there is enough storage to store capacity elements. That is, the put() APIs
-   * must work for all rowIds < capacity.
-   */
-  protected abstract void reserveInternal(int capacity);
-
   /**
    * Returns the number of nulls in this column.
    */
-  public final int numNulls() { return numNulls; }
+  public abstract int numNulls();
 
   /**
    * Returns true if any of the nulls indicator are set for this column. This can be used
    * as an optimization to prevent setting nulls.
    */
-  public final boolean anyNullsSet() { return anyNullsSet; }
+  public abstract boolean anyNullsSet();
 
   /**
    * Returns the off heap ptr for the arrays backing the NULLs and values buffer. Only valid
@@ -356,33 +277,11 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract long valuesNativeAddress();
 
   /**
-   * Sets the value at rowId to null/not null.
-   */
-  public abstract void putNotNull(int rowId);
-  public abstract void putNull(int rowId);
-
-  /**
-   * Sets the values from [rowId, rowId + count) to null/not null.
-   */
-  public abstract void putNulls(int rowId, int count);
-  public abstract void putNotNulls(int rowId, int count);
-
-  /**
    * Returns whether the value at rowId is NULL.
    */
   public abstract boolean isNullAt(int rowId);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putBoolean(int rowId, boolean value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putBooleans(int rowId, int count, boolean value);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract boolean getBoolean(int rowId);
@@ -393,21 +292,6 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract boolean[] getBooleans(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putByte(int rowId, byte value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putBytes(int rowId, int count, byte value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
-   */
-  public abstract void putBytes(int rowId, int count, byte[] src, int srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract byte getByte(int rowId);
@@ -418,21 +302,6 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract byte[] getBytes(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putShort(int rowId, short value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putShorts(int rowId, int count, short value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
-   */
-  public abstract void putShorts(int rowId, int count, short[] src, int srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract short getShort(int rowId);
@@ -443,27 +312,6 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract short[] getShorts(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putInt(int rowId, int value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putInts(int rowId, int count, int value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
-   */
-  public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
-   * The data in src must be 4-byte little endian ints.
-   */
-  public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract int getInt(int rowId);
@@ -481,27 +329,6 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract int getDictId(int rowId);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putLong(int rowId, long value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putLongs(int rowId, int count, long value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
-   */
-  public abstract void putLongs(int rowId, int count, long[] src, int srcIndex);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
-   * The data in src must be 8-byte little endian longs.
-   */
-  public abstract void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract long getLong(int rowId);
@@ -512,27 +339,6 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract long[] getLongs(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putFloat(int rowId, float value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putFloats(int rowId, int count, float value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
-   */
-  public abstract void putFloats(int rowId, int count, float[] src, int srcIndex);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
-   * The data in src must be ieee formatted floats.
-   */
-  public abstract void putFloats(int rowId, int count, byte[] src, int srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract float getFloat(int rowId);
@@ -543,27 +349,6 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract float[] getFloats(int rowId, int count);
 
   /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract void putDouble(int rowId, double value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to value.
-   */
-  public abstract void putDoubles(int rowId, int count, double value);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
-   */
-  public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
-
-  /**
-   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
-   * The data in src must be ieee formatted doubles.
-   */
-  public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
-
-  /**
    * Returns the value for rowId.
    */
   public abstract double getDouble(int rowId);
@@ -574,11 +359,6 @@ public abstract class ColumnVector implements AutoCloseable {
   public abstract double[] getDoubles(int rowId, int count);
 
   /**
-   * Puts a byte array that already exists in this column.
-   */
-  public abstract void putArray(int rowId, int offset, int length);
-
-  /**
    * Returns the length of the array at rowid.
    */
   public abstract int getArrayLength(int rowId);
@@ -608,7 +388,7 @@ public abstract class ColumnVector implements AutoCloseable {
   /**
    * Returns the array at rowid.
    */
-  public final Array getArray(int rowId) {
+  public final ColumnVector.Array getArray(int rowId) {
     resultArray.length = getArrayLength(rowId);
     resultArray.offset = getArrayOffset(rowId);
     return resultArray;
@@ -617,24 +397,7 @@ public abstract class ColumnVector implements AutoCloseable {
   /**
    * Loads the data into array.byteArray.
    */
-  public abstract void loadBytes(Array array);
-
-  /**
-   * Sets the value at rowId to `value`.
-   */
-  public abstract int putByteArray(int rowId, byte[] value, int offset, int count);
-  public final int putByteArray(int rowId, byte[] value) {
-    return putByteArray(rowId, value, 0, value.length);
-  }
-
-  /**
-   * Returns the value for rowId.
-   */
-  private Array getByteArray(int rowId) {
-    Array array = getArray(rowId);
-    array.data.loadBytes(array);
-    return array;
-  }
+  public abstract void loadBytes(ColumnVector.Array array);
 
   /**
    * Returns the value for rowId.
@@ -646,291 +409,27 @@ public abstract class ColumnVector implements AutoCloseable {
   /**
    * Returns the decimal for rowId.
    */
-  public Decimal getDecimal(int rowId, int precision, int scale) {
-    if (precision <= Decimal.MAX_INT_DIGITS()) {
-      return Decimal.createUnsafe(getInt(rowId), precision, scale);
-    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
-      return Decimal.createUnsafe(getLong(rowId), precision, scale);
-    } else {
-      // TODO: best perf?
-      byte[] bytes = getBinary(rowId);
-      BigInteger bigInteger = new BigInteger(bytes);
-      BigDecimal javaDecimal = new BigDecimal(bigInteger, scale);
-      return Decimal.apply(javaDecimal, precision, scale);
-    }
-  }
-
-
-  public void putDecimal(int rowId, Decimal value, int precision) {
-    if (precision <= Decimal.MAX_INT_DIGITS()) {
-      putInt(rowId, (int) value.toUnscaledLong());
-    } else if (precision <= Decimal.MAX_LONG_DIGITS()) {
-      putLong(rowId, value.toUnscaledLong());
-    } else {
-      BigInteger bigInteger = value.toJavaBigDecimal().unscaledValue();
-      putByteArray(rowId, bigInteger.toByteArray());
-    }
-  }
+  public abstract Decimal getDecimal(int rowId, int precision, int scale);
 
   /**
    * Returns the UTF8String for rowId.
    */
-  public UTF8String getUTF8String(int rowId) {
-    if (dictionary == null) {
-      ColumnVector.Array a = getByteArray(rowId);
-      return UTF8String.fromBytes(a.byteArray, a.byteArrayOffset, a.length);
-    } else {
-      byte[] bytes = dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
-      return UTF8String.fromBytes(bytes);
-    }
-  }
+  public abstract UTF8String getUTF8String(int rowId);
 
   /**
    * Returns the byte array for rowId.
    */
-  public byte[] getBinary(int rowId) {
-    if (dictionary == null) {
-      ColumnVector.Array array = getByteArray(rowId);
-      byte[] bytes = new byte[array.length];
-      System.arraycopy(array.byteArray, array.byteArrayOffset, bytes, 0, bytes.length);
-      return bytes;
-    } else {
-      return dictionary.decodeToBinary(dictionaryIds.getDictId(rowId));
-    }
-  }
-
-  /**
-   * Append APIs. These APIs all behave similarly and will append data to the current vector.  It
-   * is not valid to mix the put and append APIs. The append APIs are slower and should only be
-   * used if the sizes are not known up front.
-   * In all these cases, the return value is the rowId for the first appended element.
-   */
-  public final int appendNull() {
-    assert (!(dataType() instanceof StructType)); // Use appendStruct()
-    reserve(elementsAppended + 1);
-    putNull(elementsAppended);
-    return elementsAppended++;
-  }
-
-  public final int appendNotNull() {
-    reserve(elementsAppended + 1);
-    putNotNull(elementsAppended);
-    return elementsAppended++;
-  }
-
-  public final int appendNulls(int count) {
-    assert (!(dataType() instanceof StructType));
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putNulls(elementsAppended, count);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendNotNulls(int count) {
-    assert (!(dataType() instanceof StructType));
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putNotNulls(elementsAppended, count);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendBoolean(boolean v) {
-    reserve(elementsAppended + 1);
-    putBoolean(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendBooleans(int count, boolean v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putBooleans(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendByte(byte v) {
-    reserve(elementsAppended + 1);
-    putByte(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendBytes(int count, byte v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putBytes(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendBytes(int length, byte[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putBytes(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendShort(short v) {
-    reserve(elementsAppended + 1);
-    putShort(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendShorts(int count, short v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putShorts(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendShorts(int length, short[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putShorts(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendInt(int v) {
-    reserve(elementsAppended + 1);
-    putInt(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendInts(int count, int v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putInts(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendInts(int length, int[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putInts(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendLong(long v) {
-    reserve(elementsAppended + 1);
-    putLong(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendLongs(int count, long v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putLongs(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendLongs(int length, long[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putLongs(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendFloat(float v) {
-    reserve(elementsAppended + 1);
-    putFloat(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendFloats(int count, float v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putFloats(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendFloats(int length, float[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putFloats(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendDouble(double v) {
-    reserve(elementsAppended + 1);
-    putDouble(elementsAppended, v);
-    return elementsAppended++;
-  }
-
-  public final int appendDoubles(int count, double v) {
-    reserve(elementsAppended + count);
-    int result = elementsAppended;
-    putDoubles(elementsAppended, count, v);
-    elementsAppended += count;
-    return result;
-  }
-
-  public final int appendDoubles(int length, double[] src, int offset) {
-    reserve(elementsAppended + length);
-    int result = elementsAppended;
-    putDoubles(elementsAppended, length, src, offset);
-    elementsAppended += length;
-    return result;
-  }
-
-  public final int appendByteArray(byte[] value, int offset, int length) {
-    int copiedOffset = arrayData().appendBytes(length, value, offset);
-    reserve(elementsAppended + 1);
-    putArray(elementsAppended, copiedOffset, length);
-    return elementsAppended++;
-  }
-
-  public final int appendArray(int length) {
-    reserve(elementsAppended + 1);
-    putArray(elementsAppended, arrayData().elementsAppended, length);
-    return elementsAppended++;
-  }
-
-  /**
-   * Appends a NULL struct. This *has* to be used for structs instead of appendNull() as this
-   * recursively appends a NULL to its children.
-   * We don't have this logic as the general appendNull implementation to optimize the more
-   * common non-struct case.
-   */
-  public final int appendStruct(boolean isNull) {
-    if (isNull) {
-      appendNull();
-      for (ColumnVector c: childColumns) {
-        if (c.type instanceof StructType) {
-          c.appendStruct(true);
-        } else {
-          c.appendNull();
-        }
-      }
-    } else {
-      appendNotNull();
-    }
-    return elementsAppended;
-  }
+  public abstract byte[] getBinary(int rowId);
 
   /**
    * Returns the data for the underlying array.
    */
-  public final ColumnVector arrayData() { return childColumns[0]; }
+  public abstract ColumnVector arrayData();
 
   /**
    * Returns the ordinal's child data column.
    */
-  public final ColumnVector getChildColumn(int ordinal) { return childColumns[ordinal]; }
-
-  /**
-   * Returns the elements appended.
-   */
-  public final int getElementsAppended() { return elementsAppended; }
+  public abstract ColumnVector getChildColumn(int ordinal);
 
   /**
    * Returns true if this column is an array.
@@ -938,62 +437,14 @@ public abstract class ColumnVector implements AutoCloseable {
   public final boolean isArray() { return resultArray != null; }
 
   /**
-   * Marks this column as being constant.
-   */
-  public final void setIsConstant() { isConstant = true; }
-
-  /**
-   * Maximum number of rows that can be stored in this column.
-   */
-  protected int capacity;
-
-  /**
-   * Upper limit for the maximum capacity for this column.
-   */
-  @VisibleForTesting
-  protected int MAX_CAPACITY = Integer.MAX_VALUE;
-
-  /**
    * Data type for this column.
    */
   protected DataType type;
 
   /**
-   * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
-   */
-  protected int numNulls;
-
-  /**
-   * True if there is at least one NULL byte set. This is an optimization for the writer, to skip
-   * having to clear NULL bits.
-   */
-  protected boolean anyNullsSet;
-
-  /**
-   * True if this column's values are fixed. This means the column values never change, even
-   * across resets.
-   */
-  protected boolean isConstant;
-
-  /**
-   * Default size of each array length value. This grows as necessary.
-   */
-  protected static final int DEFAULT_ARRAY_LENGTH = 4;
-
-  /**
-   * Current write cursor (row index) when appending data.
-   */
-  protected int elementsAppended;
-
-  /**
-   * If this is a nested type (array or struct), the column for the child data.
-   */
-  protected ColumnVector[] childColumns;
-
-  /**
    * Reusable Array holder for getArray().
    */
-  protected Array resultArray;
+  protected ColumnVector.Array resultArray;
 
   /**
    * Reusable Struct holder for getStruct().
@@ -1013,32 +464,11 @@ public abstract class ColumnVector implements AutoCloseable {
   protected ColumnVector dictionaryIds;
 
   /**
-   * Update the dictionary.
-   */
-  public void setDictionary(Dictionary dictionary) {
-    this.dictionary = dictionary;
-  }
-
-  /**
    * Returns true if this column has a dictionary.
    */
   public boolean hasDictionary() { return this.dictionary != null; }
 
   /**
-   * Reserve a integer column for ids of dictionary.
-   */
-  public ColumnVector reserveDictionaryIds(int capacity) {
-    if (dictionaryIds == null) {
-      dictionaryIds = allocate(capacity, DataTypes.IntegerType,
-        this instanceof OnHeapColumnVector ? MemoryMode.ON_HEAP : MemoryMode.OFF_HEAP);
-    } else {
-      dictionaryIds.reset();
-      dictionaryIds.reserve(capacity);
-    }
-    return dictionaryIds;
-  }
-
-  /**
    * Returns the underlying integer column for ids of dictionary.
    */
   public ColumnVector getDictionaryIds() {
@@ -1049,43 +479,7 @@ public abstract class ColumnVector implements AutoCloseable {
    * Sets up the common state and also handles creating the child columns if this is a nested
    * type.
    */
-  protected ColumnVector(int capacity, DataType type, MemoryMode memMode) {
-    this.capacity = capacity;
+  protected ColumnVector(DataType type) {
     this.type = type;
-
-    if (type instanceof ArrayType || type instanceof BinaryType || type instanceof StringType
-        || DecimalType.isByteArrayDecimalType(type)) {
-      DataType childType;
-      int childCapacity = capacity;
-      if (type instanceof ArrayType) {
-        childType = ((ArrayType)type).elementType();
-      } else {
-        childType = DataTypes.ByteType;
-        childCapacity *= DEFAULT_ARRAY_LENGTH;
-      }
-      this.childColumns = new ColumnVector[1];
-      this.childColumns[0] = ColumnVector.allocate(childCapacity, childType, memMode);
-      this.resultArray = new Array(this.childColumns[0]);
-      this.resultStruct = null;
-    } else if (type instanceof StructType) {
-      StructType st = (StructType)type;
-      this.childColumns = new ColumnVector[st.fields().length];
-      for (int i = 0; i < childColumns.length; ++i) {
-        this.childColumns[i] = ColumnVector.allocate(capacity, st.fields()[i].dataType(), memMode);
-      }
-      this.resultArray = null;
-      this.resultStruct = new ColumnarBatch.Row(this.childColumns);
-    } else if (type instanceof CalendarIntervalType) {
-      // Two columns. Months as int. Microseconds as Long.
-      this.childColumns = new ColumnVector[2];
-      this.childColumns[0] = ColumnVector.allocate(capacity, DataTypes.IntegerType, memMode);
-      this.childColumns[1] = ColumnVector.allocate(capacity, DataTypes.LongType, memMode);
-      this.resultArray = null;
-      this.resultStruct = new ColumnarBatch.Row(this.childColumns);
-    } else {
-      this.childColumns = null;
-      this.resultArray = null;
-      this.resultStruct = null;
-    }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
index 900d7c4..adb859e 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVectorUtils.java
@@ -40,7 +40,7 @@ public class ColumnVectorUtils {
   /**
    * Populates the entire `col` with `row[fieldIdx]`
    */
-  public static void populate(ColumnVector col, InternalRow row, int fieldIdx) {
+  public static void populate(WritableColumnVector col, InternalRow row, int fieldIdx) {
     int capacity = col.capacity;
     DataType t = col.dataType();
 
@@ -115,7 +115,7 @@ public class ColumnVectorUtils {
     }
   }
 
-  private static void appendValue(ColumnVector dst, DataType t, Object o) {
+  private static void appendValue(WritableColumnVector dst, DataType t, Object o) {
     if (o == null) {
       if (t instanceof CalendarIntervalType) {
         dst.appendStruct(true);
@@ -165,7 +165,7 @@ public class ColumnVectorUtils {
     }
   }
 
-  private static void appendValue(ColumnVector dst, DataType t, Row src, int fieldIdx) {
+  private static void appendValue(WritableColumnVector dst, DataType t, Row src, int fieldIdx) {
     if (t instanceof ArrayType) {
       ArrayType at = (ArrayType)t;
       if (src.isNullAt(fieldIdx)) {
@@ -198,15 +198,23 @@ public class ColumnVectorUtils {
    */
   public static ColumnarBatch toBatch(
       StructType schema, MemoryMode memMode, Iterator<Row> row) {
-    ColumnarBatch batch = ColumnarBatch.allocate(schema, memMode);
+    int capacity = ColumnarBatch.DEFAULT_BATCH_SIZE;
+    WritableColumnVector[] columnVectors;
+    if (memMode == MemoryMode.OFF_HEAP) {
+      columnVectors = OffHeapColumnVector.allocateColumns(capacity, schema);
+    } else {
+      columnVectors = OnHeapColumnVector.allocateColumns(capacity, schema);
+    }
+
     int n = 0;
     while (row.hasNext()) {
       Row r = row.next();
       for (int i = 0; i < schema.fields().length; i++) {
-        appendValue(batch.column(i), schema.fields()[i].dataType(), r, i);
+        appendValue(columnVectors[i], schema.fields()[i].dataType(), r, i);
       }
       n++;
     }
+    ColumnarBatch batch = new ColumnarBatch(schema, columnVectors, capacity);
     batch.setNumRows(n);
     return batch;
   }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 34dc3af..e782756 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -19,7 +19,6 @@ package org.apache.spark.sql.execution.vectorized;
 import java.math.BigDecimal;
 import java.util.*;
 
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.expressions.GenericInternalRow;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
@@ -44,8 +43,7 @@ import org.apache.spark.unsafe.types.UTF8String;
  *  - Compaction: The batch and columns should be able to compact based on a selection vector.
  */
 public final class ColumnarBatch {
-  private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
-  private static MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+  public static final int DEFAULT_BATCH_SIZE = 4 * 1024;
 
   private final StructType schema;
   private final int capacity;
@@ -64,18 +62,6 @@ public final class ColumnarBatch {
   // Staging row returned from getRow.
   final Row row;
 
-  public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) {
-    return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
-  }
-
-  public static ColumnarBatch allocate(StructType type) {
-    return new ColumnarBatch(type, DEFAULT_BATCH_SIZE, DEFAULT_MEMORY_MODE);
-  }
-
-  public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) {
-    return new ColumnarBatch(schema, maxRows, memMode);
-  }
-
   /**
    * Called to close all the columns in this batch. It is not valid to access the data after
    * calling this. This must be called at the end to clean up memory allocations.
@@ -95,12 +81,19 @@ public final class ColumnarBatch {
     private final ColumnarBatch parent;
     private final int fixedLenRowSize;
     private final ColumnVector[] columns;
+    private final WritableColumnVector[] writableColumns;
 
     // Ctor used if this is a top level row.
     private Row(ColumnarBatch parent) {
       this.parent = parent;
       this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(parent.numCols());
       this.columns = parent.columns;
+      this.writableColumns = new WritableColumnVector[this.columns.length];
+      for (int i = 0; i < this.columns.length; i++) {
+        if (this.columns[i] instanceof WritableColumnVector) {
+          this.writableColumns[i] = (WritableColumnVector) this.columns[i];
+        }
+      }
     }
 
     // Ctor used if this is a struct.
@@ -108,6 +101,12 @@ public final class ColumnarBatch {
       this.parent = null;
       this.fixedLenRowSize = UnsafeRow.calculateFixedPortionByteSize(columns.length);
       this.columns = columns;
+      this.writableColumns = new WritableColumnVector[this.columns.length];
+      for (int i = 0; i < this.columns.length; i++) {
+        if (this.columns[i] instanceof WritableColumnVector) {
+          this.writableColumns[i] = (WritableColumnVector) this.columns[i];
+        }
+      }
     }
 
     /**
@@ -307,64 +306,69 @@ public final class ColumnarBatch {
 
     @Override
     public void setNullAt(int ordinal) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNull(rowId);
+      getWritableColumn(ordinal).putNull(rowId);
     }
 
     @Override
     public void setBoolean(int ordinal, boolean value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putBoolean(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putBoolean(rowId, value);
     }
 
     @Override
     public void setByte(int ordinal, byte value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putByte(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putByte(rowId, value);
     }
 
     @Override
     public void setShort(int ordinal, short value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putShort(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putShort(rowId, value);
     }
 
     @Override
     public void setInt(int ordinal, int value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putInt(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putInt(rowId, value);
     }
 
     @Override
     public void setLong(int ordinal, long value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putLong(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putLong(rowId, value);
     }
 
     @Override
     public void setFloat(int ordinal, float value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putFloat(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putFloat(rowId, value);
     }
 
     @Override
     public void setDouble(int ordinal, double value) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putDouble(rowId, value);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putDouble(rowId, value);
     }
 
     @Override
     public void setDecimal(int ordinal, Decimal value, int precision) {
-      assert (!columns[ordinal].isConstant);
-      columns[ordinal].putNotNull(rowId);
-      columns[ordinal].putDecimal(rowId, value, precision);
+      WritableColumnVector column = getWritableColumn(ordinal);
+      column.putNotNull(rowId);
+      column.putDecimal(rowId, value, precision);
+    }
+
+    private WritableColumnVector getWritableColumn(int ordinal) {
+      WritableColumnVector column = writableColumns[ordinal];
+      assert (!column.isConstant);
+      return column;
     }
   }
 
@@ -409,7 +413,9 @@ public final class ColumnarBatch {
    */
   public void reset() {
     for (int i = 0; i < numCols(); ++i) {
-      columns[i].reset();
+      if (columns[i] instanceof WritableColumnVector) {
+        ((WritableColumnVector) columns[i]).reset();
+      }
     }
     if (this.numRowsFiltered > 0) {
       Arrays.fill(filteredRows, false);
@@ -427,7 +433,7 @@ public final class ColumnarBatch {
     this.numRows = numRows;
 
     for (int ordinal : nullFilteredColumns) {
-      if (columns[ordinal].numNulls != 0) {
+      if (columns[ordinal].numNulls() != 0) {
         for (int rowId = 0; rowId < numRows; rowId++) {
           if (!filteredRows[rowId] && columns[ordinal].isNullAt(rowId)) {
             filteredRows[rowId] = true;
@@ -505,18 +511,12 @@ public final class ColumnarBatch {
     nullFilteredColumns.add(ordinal);
   }
 
-  private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) {
+  public ColumnarBatch(StructType schema, ColumnVector[] columns, int capacity) {
     this.schema = schema;
-    this.capacity = maxRows;
-    this.columns = new ColumnVector[schema.size()];
+    this.columns = columns;
+    this.capacity = capacity;
     this.nullFilteredColumns = new HashSet<>();
-    this.filteredRows = new boolean[maxRows];
-
-    for (int i = 0; i < schema.fields().length; ++i) {
-      StructField field = schema.fields()[i];
-      columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode);
-    }
-
+    this.filteredRows = new boolean[capacity];
     this.row = new Row(this);
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 2d1f3da..3568275 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -19,18 +19,39 @@ package org.apache.spark.sql.execution.vectorized;
 import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 
 /**
  * Column data backed using offheap memory.
  */
-public final class OffHeapColumnVector extends ColumnVector {
+public final class OffHeapColumnVector extends WritableColumnVector {
 
   private static final boolean bigEndianPlatform =
     ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
 
+  /**
+   * Allocates columns to store elements of each field of the schema off heap.
+   * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static OffHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
+    return allocateColumns(capacity, schema.fields());
+  }
+
+  /**
+   * Allocates columns to store elements of each field off heap.
+   * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static OffHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
+    OffHeapColumnVector[] vectors = new OffHeapColumnVector[fields.length];
+    for (int i = 0; i < fields.length; i++) {
+      vectors[i] = new OffHeapColumnVector(capacity, fields[i].dataType());
+    }
+    return vectors;
+  }
+
   // The data stored in these two allocations need to maintain binary compatible. We can
   // directly pass this buffer to external components.
   private long nulls;
@@ -40,8 +61,8 @@ public final class OffHeapColumnVector extends ColumnVector {
   private long lengthData;
   private long offsetData;
 
-  protected OffHeapColumnVector(int capacity, DataType type) {
-    super(capacity, type, MemoryMode.OFF_HEAP);
+  public OffHeapColumnVector(int capacity, DataType type) {
+    super(capacity, type);
 
     nulls = 0;
     data = 0;
@@ -519,4 +540,9 @@ public final class OffHeapColumnVector extends ColumnVector {
     Platform.setMemory(nulls + oldCapacity, (byte)0, newCapacity - oldCapacity);
     capacity = newCapacity;
   }
+
+  @Override
+  protected OffHeapColumnVector reserveNewColumn(int capacity, DataType type) {
+    return new OffHeapColumnVector(capacity, type);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index 5064343..96a4529 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -20,7 +20,6 @@ import java.nio.ByteBuffer;
 import java.nio.ByteOrder;
 import java.util.Arrays;
 
-import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.types.*;
 import org.apache.spark.unsafe.Platform;
 
@@ -28,11 +27,33 @@ import org.apache.spark.unsafe.Platform;
  * A column backed by an in memory JVM array. This stores the NULLs as a byte per value
  * and a java array for the values.
  */
-public final class OnHeapColumnVector extends ColumnVector {
+public final class OnHeapColumnVector extends WritableColumnVector {
 
   private static final boolean bigEndianPlatform =
     ByteOrder.nativeOrder().equals(ByteOrder.BIG_ENDIAN);
 
+  /**
+   * Allocates columns to store elements of each field of the schema on heap.
+   * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static OnHeapColumnVector[] allocateColumns(int capacity, StructType schema) {
+    return allocateColumns(capacity, schema.fields());
+  }
+
+  /**
+   * Allocates columns to store elements of each field on heap.
+   * Capacity is the initial capacity of the vector and it will grow as necessary. Capacity is
+   * in number of elements, not number of bytes.
+   */
+  public static OnHeapColumnVector[] allocateColumns(int capacity, StructField[] fields) {
+    OnHeapColumnVector[] vectors = new OnHeapColumnVector[fields.length];
+    for (int i = 0; i < fields.length; i++) {
+      vectors[i] = new OnHeapColumnVector(capacity, fields[i].dataType());
+    }
+    return vectors;
+  }
+
   // The data stored in these arrays need to maintain binary compatible. We can
   // directly pass this buffer to external components.
 
@@ -51,8 +72,9 @@ public final class OnHeapColumnVector extends ColumnVector {
   private int[] arrayLengths;
   private int[] arrayOffsets;
 
-  protected OnHeapColumnVector(int capacity, DataType type) {
-    super(capacity, type, MemoryMode.ON_HEAP);
+  public OnHeapColumnVector(int capacity, DataType type) {
+    super(capacity, type);
+
     reserveInternal(capacity);
     reset();
   }
@@ -529,4 +551,9 @@ public final class OnHeapColumnVector extends ColumnVector {
 
     capacity = newCapacity;
   }
+
+  @Override
+  protected OnHeapColumnVector reserveNewColumn(int capacity, DataType type) {
+    return new OnHeapColumnVector(capacity, type);
+  }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9e33954d/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
deleted file mode 100644
index e9f6e7c..0000000
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ReadOnlyColumnVector.java
+++ /dev/null
@@ -1,251 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.spark.sql.execution.vectorized;
-
-import org.apache.spark.memory.MemoryMode;
-import org.apache.spark.sql.types.*;
-
-/**
- * An abstract class for read-only column vector.
- */
-public abstract class ReadOnlyColumnVector extends ColumnVector {
-
-  protected ReadOnlyColumnVector(int capacity, DataType type, MemoryMode memMode) {
-    super(capacity, DataTypes.NullType, memMode);
-    this.type = type;
-    isConstant = true;
-  }
-
-  //
-  // APIs dealing with nulls
-  //
-
-  @Override
-  public final void putNotNull(int rowId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putNull(int rowId) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putNulls(int rowId, int count) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putNotNulls(int rowId, int count) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Booleans
-  //
-
-  @Override
-  public final void putBoolean(int rowId, boolean value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putBooleans(int rowId, int count, boolean value) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Bytes
-  //
-
-  @Override
-  public final void putByte(int rowId, byte value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putBytes(int rowId, int count, byte value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putBytes(int rowId, int count, byte[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Shorts
-  //
-
-  @Override
-  public final void putShort(int rowId, short value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putShorts(int rowId, int count, short value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putShorts(int rowId, int count, short[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Ints
-  //
-
-  @Override
-  public final void putInt(int rowId, int value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putInts(int rowId, int count, int value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putInts(int rowId, int count, int[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Longs
-  //
-
-  @Override
-  public final void putLong(int rowId, long value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putLongs(int rowId, int count, long value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putLongs(int rowId, int count, long[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putLongsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with floats
-  //
-
-  @Override
-  public final void putFloat(int rowId, float value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putFloats(int rowId, int count, float value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putFloats(int rowId, int count, float[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putFloats(int rowId, int count, byte[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with doubles
-  //
-
-  @Override
-  public final void putDouble(int rowId, double value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putDoubles(int rowId, int count, double value) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putDoubles(int rowId, int count, double[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Arrays
-  //
-
-  @Override
-  public final void putArray(int rowId, int offset, int length) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Byte Arrays
-  //
-
-  @Override
-  public final int putByteArray(int rowId, byte[] value, int offset, int count) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // APIs dealing with Decimals
-  //
-
-  @Override
-  public final void putDecimal(int rowId, Decimal value, int precision) {
-    throw new UnsupportedOperationException();
-  }
-
-  //
-  // Other APIs
-  //
-
-  @Override
-  public final void setDictionary(Dictionary dictionary) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  public final ColumnVector reserveDictionaryIds(int capacity) {
-    throw new UnsupportedOperationException();
-  }
-
-  @Override
-  protected final void reserveInternal(int newCapacity) {
-    throw new UnsupportedOperationException();
-  }
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org