You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/13 03:21:14 UTC

spark git commit: [SPARK-12785][SQL] Add ColumnarBatch, an in memory columnar format for execution.

Repository: spark
Updated Branches:
  refs/heads/master 4f60651cb -> 924708496


[SPARK-12785][SQL] Add ColumnarBatch, an in memory columnar format for execution.

There are many potential benefits of having an efficient in memory columnar format as an alternate
to UnsafeRow. This patch introduces ColumnarBatch/ColumnarVector which starts this effort. The
remaining implementation can be done as follow up patches.

As stated in the in the JIRA, there are useful external components that operate on memory in a
simple columnar format. ColumnarBatch would serve that purpose and could server as a
zero-serialization/zero-copy exchange for this use case.

This patch supports running the underlying data either on heap or off heap. On heap runs a bit
faster but we would need offheap for zero-copy exchanges. Currently, this mode is hidden behind one
interface (ColumnVector).

This differs from Parquet or the existing columnar cache because this is *not* intended to be used
as a storage format. The focus is entirely on CPU efficiency as we expect to only have 1 of these
batches in memory per task. The layout of the values is just dense arrays of the value type.

Author: Nong Li <no...@databricks.com>
Author: Nong <no...@gmail.com>

Closes #10628 from nongli/spark-12635.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/92470849
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/92470849
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/92470849

Branch: refs/heads/master
Commit: 9247084962259ebbbac4c5a80a6ccb271776f019
Parents: 4f60651
Author: Nong Li <no...@databricks.com>
Authored: Tue Jan 12 18:21:04 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Tue Jan 12 18:21:04 2016 -0800

----------------------------------------------------------------------
 .../sql/execution/vectorized/ColumnVector.java  | 176 ++++++++++
 .../sql/execution/vectorized/ColumnarBatch.java | 296 +++++++++++++++++
 .../vectorized/OffHeapColumnVector.java         | 179 +++++++++++
 .../vectorized/OnHeapColumnVector.java          | 175 ++++++++++
 .../vectorized/ColumnarBatchBenchmark.scala     | 320 +++++++++++++++++++
 .../vectorized/ColumnarBatchSuite.scala         | 317 ++++++++++++++++++
 6 files changed, 1463 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/92470849/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
new file mode 100644
index 0000000..d9dde92
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.sql.types.DataType;
+
+/**
+ * This class represents a column of values and provides the main APIs to access the data
+ * values. It supports all the types and contains get/put APIs as well as their batched versions.
+ * The batched versions are preferable whenever possible.
+ *
+ * Most of the APIs take the rowId as a parameter. This is the local 0-based row id for values
+ * in the current RowBatch.
+ *
+ * A ColumnVector should be considered immutable once originally created. In other words, it is not
+ * valid to call put APIs after reads until reset() is called.
+ */
+public abstract class ColumnVector {
+  /**
+   * Allocates a column with each element of size `width` either on or off heap.
+   */
+  public static ColumnVector allocate(int capacity, DataType type, boolean offHeap) {
+    if (offHeap) {
+      return new OffHeapColumnVector(capacity, type);
+    } else {
+      return new OnHeapColumnVector(capacity, type);
+    }
+  }
+
+  public final DataType dataType() { return type; }
+
+  /**
+   * Resets this column for writing. The currently stored values are no longer accessible.
+   */
+  public void reset() {
+    numNulls = 0;
+    if (anyNullsSet) {
+      putNotNulls(0, capacity);
+      anyNullsSet = false;
+    }
+  }
+
+  /**
+   * Cleans up memory for this column. The column is not usable after this.
+   * TODO: this should probably have ref-counted semantics.
+   */
+  public abstract void close();
+
+  /**
+   * Returns the number of nulls in this column.
+   */
+  public final int numNulls() { return numNulls; }
+
+  /**
+   * Returns true if any of the nulls indicator are set for this column. This can be used
+   * as an optimization to prevent setting nulls.
+   */
+  public final boolean anyNullsSet() { return anyNullsSet; }
+
+  /**
+   * Returns the off heap ptr for the arrays backing the NULLs and values buffer. Only valid
+   * to call for off heap columns.
+   */
+  public abstract long nullsNativeAddress();
+  public abstract long valuesNativeAddress();
+
+  /**
+   * Sets the value at rowId to null/not null.
+   */
+  public abstract void putNotNull(int rowId);
+  public abstract void putNull(int rowId);
+
+  /**
+   * Sets the values from [rowId, rowId + count) to null/not null.
+   */
+  public abstract void putNulls(int rowId, int count);
+  public abstract void putNotNulls(int rowId, int count);
+
+  /**
+   * Returns whether the value at rowId is NULL.
+   */
+  public abstract boolean getIsNull(int rowId);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putInt(int rowId, int value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putInts(int rowId, int count, int value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   */
+  public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   * The data in src must be 4-byte little endian ints.
+   */
+  public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Returns the integer for rowId.
+   */
+  public abstract int getInt(int rowId);
+
+  /**
+   * Sets the value at rowId to `value`.
+   */
+  public abstract void putDouble(int rowId, double value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to value.
+   */
+  public abstract void putDoubles(int rowId, int count, double value);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   * src should contain `count` doubles written as ieee format.
+   */
+  public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
+
+  /**
+   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   * The data in src must be ieee formated doubles.
+   */
+  public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);
+
+  /**
+   * Returns the double for rowId.
+   */
+  public abstract double getDouble(int rowId);
+
+  /**
+   * Maximum number of rows that can be stored in this column.
+   */
+  protected final int capacity;
+
+  /**
+   * Number of nulls in this column. This is an optimization for the reader, to skip NULL checks.
+   */
+  protected int numNulls;
+
+  /**
+   * True if there is at least one NULL byte set. This is an optimization for the writer, to skip
+   * having to clear NULL bits.
+   */
+  protected boolean anyNullsSet;
+
+  /**
+   * Data type for this column.
+   */
+  protected final DataType type;
+
+  protected ColumnVector(int capacity, DataType type) {
+    this.capacity = capacity;
+    this.type = type;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/92470849/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
new file mode 100644
index 0000000..47defac
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.util.Arrays;
+import java.util.Iterator;
+
+import org.apache.spark.sql.catalyst.InternalRow;
+import org.apache.spark.sql.catalyst.util.ArrayData;
+import org.apache.spark.sql.catalyst.util.MapData;
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.Decimal;
+import org.apache.spark.sql.types.StructField;
+import org.apache.spark.sql.types.StructType;
+import org.apache.spark.unsafe.types.CalendarInterval;
+import org.apache.spark.unsafe.types.UTF8String;
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * This class is the in memory representation of rows as they are streamed through operators. It
+ * is designed to maximize CPU efficiency and not storage footprint. Since it is expected that
+ * each operator allocates one of thee objects, the storage footprint on the task is negligible.
+ *
+ * The layout is a columnar with values encoded in their native format. Each RowBatch contains
+ * a horizontal partitioning of the data, split into columns.
+ *
+ * The ColumnarBatch supports either on heap or offheap modes with (mostly) the identical API.
+ *
+ * TODO:
+ *  - There are many TODOs for the existing APIs. They should throw a not implemented exception.
+ *  - Compaction: The batch and columns should be able to compact based on a selection vector.
+ */
+public final class ColumnarBatch {
+  private static final int DEFAULT_BATCH_SIZE = 4 * 1024;
+
+  private final StructType schema;
+  private final int capacity;
+  private int numRows;
+  private final ColumnVector[] columns;
+
+  // True if the row is filtered.
+  private final boolean[] filteredRows;
+
+  // Total number of rows that have been filtered.
+  private int numRowsFiltered = 0;
+
+  public static ColumnarBatch allocate(StructType schema, boolean offHeap) {
+    return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, offHeap);
+  }
+
+  public static ColumnarBatch allocate(StructType schema, boolean offHeap, int maxRows) {
+    return new ColumnarBatch(schema, maxRows, offHeap);
+  }
+
+  /**
+   * Called to close all the columns in this batch. It is not valid to access the data after
+   * calling this. This must be called at the end to clean up memory allcoations.
+   */
+  public void close() {
+    for (ColumnVector c: columns) {
+      c.close();
+    }
+  }
+
+  /**
+   * Adapter class to interop with existing components that expect internal row. A lot of
+   * performance is lost with this translation.
+   */
+  public final class Row extends InternalRow {
+    private int rowId;
+
+    /**
+     * Marks this row as being filtered out. This means a subsequent iteration over the rows
+     * in this batch will not include this row.
+     */
+    public final void markFiltered() {
+      ColumnarBatch.this.markFiltered(rowId);
+    }
+
+    @Override
+    public final int numFields() {
+      return ColumnarBatch.this.numCols();
+    }
+
+    @Override
+    public final InternalRow copy() {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final boolean anyNull() {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final boolean isNullAt(int ordinal) {
+      return ColumnarBatch.this.column(ordinal).getIsNull(rowId);
+    }
+
+    @Override
+    public final boolean getBoolean(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final byte getByte(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final short getShort(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final int getInt(int ordinal) {
+      return ColumnarBatch.this.column(ordinal).getInt(rowId);
+    }
+
+    @Override
+    public final long getLong(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final float getFloat(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final double getDouble(int ordinal) {
+      return ColumnarBatch.this.column(ordinal).getDouble(rowId);
+    }
+
+    @Override
+    public final Decimal getDecimal(int ordinal, int precision, int scale) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final UTF8String getUTF8String(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final byte[] getBinary(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final CalendarInterval getInterval(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final InternalRow getStruct(int ordinal, int numFields) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final ArrayData getArray(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final MapData getMap(int ordinal) {
+      throw new NotImplementedException();
+    }
+
+    @Override
+    public final Object get(int ordinal, DataType dataType) {
+      throw new NotImplementedException();
+    }
+  }
+
+  /**
+   * Returns an iterator over the rows in this batch. This skips rows that are filtered out.
+   */
+  public Iterator<Row> rowIterator() {
+    final int maxRows = ColumnarBatch.this.numRows();
+    final Row row = new Row();
+    return new Iterator<Row>() {
+      int rowId = 0;
+
+      @Override
+      public boolean hasNext() {
+        while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
+          ++rowId;
+        }
+        return rowId < maxRows;
+      }
+
+      @Override
+      public Row next() {
+        assert(hasNext());
+        while (rowId < maxRows && ColumnarBatch.this.filteredRows[rowId]) {
+          ++rowId;
+        }
+        row.rowId = rowId++;
+        return row;
+      }
+
+      @Override
+      public void remove() {
+        throw new UnsupportedOperationException();
+      }
+    };
+  }
+
+  /**
+   * Resets the batch for writing.
+   */
+  public void reset() {
+    for (int i = 0; i < numCols(); ++i) {
+      columns[i].reset();
+    }
+    if (this.numRowsFiltered > 0) {
+      Arrays.fill(filteredRows, false);
+    }
+    this.numRows = 0;
+    this.numRowsFiltered = 0;
+  }
+
+  /**
+   * Sets the number of rows that are valid.
+   */
+  public void setNumRows(int numRows) {
+    assert(numRows <= this.capacity);
+    this.numRows = numRows;
+  }
+
+  /**
+   * Returns the number of columns that make up this batch.
+   */
+  public int numCols() { return columns.length; }
+
+  /**
+   * Returns the number of rows for read, including filtered rows.
+   */
+  public int numRows() { return numRows; }
+
+  /**
+   * Returns the number of valid rowss.
+   */
+  public int numValidRows() {
+    assert(numRowsFiltered <= numRows);
+    return numRows - numRowsFiltered;
+  }
+
+  /**
+   * Returns the max capacity (in number of rows) for this batch.
+   */
+  public int capacity() { return capacity; }
+
+  /**
+   * Returns the column at `ordinal`.
+   */
+  public ColumnVector column(int ordinal) { return columns[ordinal]; }
+
+  /**
+   * Marks this row as being filtered out. This means a subsequent iteration over the rows
+   * in this batch will not include this row.
+   */
+  public final void markFiltered(int rowId) {
+    assert(filteredRows[rowId] == false);
+    filteredRows[rowId] = true;
+    ++numRowsFiltered;
+  }
+
+  private ColumnarBatch(StructType schema, int maxRows, boolean offHeap) {
+    this.schema = schema;
+    this.capacity = maxRows;
+    this.columns = new ColumnVector[schema.size()];
+    this.filteredRows = new boolean[maxRows];
+
+    for (int i = 0; i < schema.fields().length; ++i) {
+      StructField field = schema.fields()[i];
+      columns[i] = ColumnVector.allocate(maxRows, field.dataType(), offHeap);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/92470849/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
new file mode 100644
index 0000000..2a9a2d1
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import java.nio.ByteOrder;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.unsafe.Platform;
+
+
+import org.apache.commons.lang.NotImplementedException;
+
+/**
+ * Column data backed using offheap memory.
+ */
+public final class OffHeapColumnVector extends ColumnVector {
+  // The data stored in these two allocations need to maintain binary compatible. We can
+  // directly pass this buffer to external components.
+  private long nulls;
+  private long data;
+
+  protected OffHeapColumnVector(int capacity, DataType type) {
+    super(capacity, type);
+    if (!ByteOrder.nativeOrder().equals(ByteOrder.LITTLE_ENDIAN)) {
+      throw new NotImplementedException("Only little endian is supported.");
+    }
+
+    this.nulls = Platform.allocateMemory(capacity);
+    if (type instanceof IntegerType) {
+      this.data = Platform.allocateMemory(capacity * 4);
+    } else if (type instanceof DoubleType) {
+      this.data = Platform.allocateMemory(capacity * 8);
+    } else {
+      throw new RuntimeException("Unhandled " + type);
+    }
+    reset();
+  }
+
+  @Override
+  public final long valuesNativeAddress() {
+    return data;
+  }
+
+  @Override
+  public long nullsNativeAddress() {
+    return nulls;
+  }
+
+  @Override
+  public final void close() {
+    Platform.freeMemory(nulls);
+    Platform.freeMemory(data);
+    nulls = 0;
+    data = 0;
+  }
+
+  //
+  // APIs dealing with nulls
+  //
+
+  @Override
+  public final void putNotNull(int rowId) {
+    Platform.putByte(null, nulls + rowId, (byte) 0);
+  }
+
+  @Override
+  public final void putNull(int rowId) {
+    Platform.putByte(null, nulls + rowId, (byte) 1);
+    ++numNulls;
+    anyNullsSet = true;
+  }
+
+  @Override
+  public final void putNulls(int rowId, int count) {
+    long offset = nulls + rowId;
+    for (int i = 0; i < count; ++i, ++offset) {
+      Platform.putByte(null, offset, (byte) 1);
+    }
+    anyNullsSet = true;
+    numNulls += count;
+  }
+
+  @Override
+  public final void putNotNulls(int rowId, int count) {
+    long offset = nulls + rowId;
+    for (int i = 0; i < count; ++i, ++offset) {
+      Platform.putByte(null, offset, (byte) 0);
+    }
+  }
+
+  @Override
+  public final boolean getIsNull(int rowId) {
+    return Platform.getByte(null, nulls + rowId) == 1;
+  }
+
+  //
+  // APIs dealing with ints
+  //
+
+  @Override
+  public final void putInt(int rowId, int value) {
+    Platform.putInt(null, data + 4 * rowId, value);
+  }
+
+  @Override
+  public final void putInts(int rowId, int count, int value) {
+    long offset = data + 4 * rowId;
+    for (int i = 0; i < count; ++i, offset += 4) {
+      Platform.putInt(null, offset, value);
+    }
+  }
+
+  @Override
+  public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.INT_ARRAY_OFFSET + srcIndex * 4,
+        null, data + 4 * rowId, count * 4);
+  }
+
+  @Override
+  public final void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, srcIndex + Platform.BYTE_ARRAY_OFFSET,
+        null, data + 4 * rowId, count * 4);
+  }
+
+  @Override
+  public final int getInt(int rowId) {
+    return Platform.getInt(null, data + 4 * rowId);
+  }
+
+  //
+  // APIs dealing with doubles
+  //
+
+  @Override
+  public final void putDouble(int rowId, double value) {
+    Platform.putDouble(null, data + rowId * 8, value);
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, double value) {
+    long offset = data + 8 * rowId;
+    for (int i = 0; i < count; ++i, offset += 8) {
+      Platform.putDouble(null, offset, value);
+    }
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex * 8,
+      null, data + 8 * rowId, count * 8);
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.DOUBLE_ARRAY_OFFSET + srcIndex,
+        null, data + rowId * 8, count * 8);
+  }
+
+  @Override
+  public final double getDouble(int rowId) {
+    return Platform.getDouble(null, data + rowId * 8);
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/92470849/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
new file mode 100644
index 0000000..a7b3add
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -0,0 +1,175 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.vectorized;
+
+import org.apache.spark.sql.types.DataType;
+import org.apache.spark.sql.types.DoubleType;
+import org.apache.spark.sql.types.IntegerType;
+import org.apache.spark.unsafe.Platform;
+
+import java.nio.ByteBuffer;
+import java.nio.DoubleBuffer;
+import java.util.Arrays;
+
+/**
+ * A column backed by an in memory JVM array. This stores the NULLs as a byte per value
+ * and a java array for the values.
+ */
+public final class OnHeapColumnVector extends ColumnVector {
+  // The data stored in these arrays need to maintain binary compatible. We can
+  // directly pass this buffer to external components.
+
+  // This is faster than a boolean array and we optimize this over memory footprint.
+  private byte[] nulls;
+
+  // Array for each type. Only 1 is populated for any type.
+  private int[] intData;
+  private double[] doubleData;
+
+  protected OnHeapColumnVector(int capacity, DataType type) {
+    super(capacity, type);
+    if (type instanceof IntegerType) {
+      this.intData = new int[capacity];
+    } else if (type instanceof DoubleType) {
+      this.doubleData = new double[capacity];
+    } else {
+      throw new RuntimeException("Unhandled " + type);
+    }
+    this.nulls = new byte[capacity];
+    reset();
+  }
+
+  @Override
+  public final long valuesNativeAddress() {
+    throw new RuntimeException("Cannot get native address for on heap column");
+  }
+  @Override
+  public final long nullsNativeAddress() {
+    throw new RuntimeException("Cannot get native address for on heap column");
+  }
+
+  @Override
+  public final void close() {
+    nulls = null;
+    intData = null;
+    doubleData = null;
+  }
+
+
+  //
+  // APIs dealing with nulls
+  //
+
+  @Override
+  public final void putNotNull(int rowId) {
+    nulls[rowId] = (byte)0;
+  }
+
+  @Override
+  public final void putNull(int rowId) {
+    nulls[rowId] = (byte)1;
+    ++numNulls;
+    anyNullsSet = true;
+  }
+
+  @Override
+  public final void putNulls(int rowId, int count) {
+    for (int i = 0; i < count; ++i) {
+      nulls[rowId + i] = (byte)1;
+    }
+    anyNullsSet = true;
+    numNulls += count;
+  }
+
+  @Override
+  public final void putNotNulls(int rowId, int count) {
+    for (int i = 0; i < count; ++i) {
+      nulls[rowId + i] = (byte)0;
+    }
+  }
+
+  @Override
+  public final boolean getIsNull(int rowId) {
+    return nulls[rowId] == 1;
+  }
+
+  //
+  // APIs dealing with Ints
+  //
+
+  @Override
+  public final void putInt(int rowId, int value) {
+    intData[rowId] = value;
+  }
+
+  @Override
+  public final void putInts(int rowId, int count, int value) {
+    for (int i = 0; i < count; ++i) {
+      intData[i + rowId] = value;
+    }
+  }
+
+  @Override
+  public final void putInts(int rowId, int count, int[] src, int srcIndex) {
+    System.arraycopy(src, srcIndex, intData, rowId, count);
+  }
+
+  @Override
+  public final void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex) {
+    int srcOffset = srcIndex + Platform.BYTE_ARRAY_OFFSET;
+    for (int i = 0; i < count; ++i) {
+      intData[i + rowId] = Platform.getInt(src, srcOffset);;
+      srcIndex += 4;
+      srcOffset += 4;
+    }
+  }
+
+  @Override
+  public final int getInt(int rowId) {
+    return intData[rowId];
+  }
+
+  //
+  // APIs dealing with doubles
+  //
+
+  @Override
+  public final void putDouble(int rowId, double value) {
+    doubleData[rowId] = value;
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, double value) {
+    Arrays.fill(doubleData, rowId, rowId + count, value);
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, double[] src, int srcIndex) {
+    System.arraycopy(src, srcIndex, doubleData, rowId, count);
+  }
+
+  @Override
+  public final void putDoubles(int rowId, int count, byte[] src, int srcIndex) {
+    Platform.copyMemory(src, Platform.BYTE_ARRAY_OFFSET + srcIndex, doubleData,
+        Platform.DOUBLE_ARRAY_OFFSET + rowId * 8, count * 8);
+  }
+
+  @Override
+  public final double getDouble(int rowId) {
+    return doubleData[rowId];
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/92470849/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
new file mode 100644
index 0000000..e28153d
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -0,0 +1,320 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet
+
+import java.nio.ByteBuffer
+
+import org.apache.spark.sql.catalyst.expressions.UnsafeRow
+import org.apache.spark.sql.execution.vectorized.ColumnVector
+import org.apache.spark.sql.types.IntegerType
+import org.apache.spark.unsafe.Platform
+import org.apache.spark.util.Benchmark
+import org.apache.spark.util.collection.BitSet
+
+/**
+ * Benchmark to low level memory access using different ways to manage buffers.
+ */
+object ColumnarBatchBenchmark {
+
+  // This benchmark reads and writes an array of ints.
+  // TODO: there is a big (2x) penalty for a random access API for off heap.
+  // Note: carefully if modifying this code. It's hard to reason about the JIT.
+  def intAccess(iters: Long): Unit = {
+    val count = 8 * 1000
+
+    // Accessing a java array.
+    val javaArray = { i: Int =>
+      val data = new Array[Int](count)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          data(i) = i
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          sum += data(i)
+          i += 1
+        }
+      }
+    }
+
+    // Accessing ByteBuffers
+    val byteBufferUnsafe = { i: Int =>
+      val data = ByteBuffer.allocate(count * 4)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          Platform.putInt(data.array(), Platform.BYTE_ARRAY_OFFSET + i * 4, i)
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          sum += Platform.getInt(data.array(), Platform.BYTE_ARRAY_OFFSET + i * 4)
+          i += 1
+        }
+      }
+    }
+
+    // Accessing offheap byte buffers
+    val directByteBuffer = { i: Int =>
+      val data = ByteBuffer.allocateDirect(count * 4).asIntBuffer()
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          data.put(i)
+          i += 1
+        }
+        data.rewind()
+        i = 0
+        while (i < count) {
+          sum += data.get()
+          i += 1
+        }
+        data.rewind()
+      }
+    }
+
+    // Accessing ByteBuffer using the typed APIs
+    val byteBufferApi = { i: Int =>
+      val data = ByteBuffer.allocate(count * 4)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          data.putInt(i)
+          i += 1
+        }
+        data.rewind()
+        i = 0
+        while (i < count) {
+          sum += data.getInt()
+          i += 1
+        }
+        data.rewind()
+      }
+    }
+
+    // Using unsafe memory
+    val unsafeBuffer = { i: Int =>
+      val data: Long = Platform.allocateMemory(count * 4)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var ptr = data
+        var i = 0
+        while (i < count) {
+          Platform.putInt(null, ptr, i)
+          ptr += 4
+          i += 1
+        }
+        ptr = data
+        i = 0
+        while (i < count) {
+          sum += Platform.getInt(null, ptr)
+          ptr += 4
+          i += 1
+        }
+      }
+    }
+
+    // Access through the column API with on heap memory
+    val columnOnHeap = { i: Int =>
+      val col = ColumnVector.allocate(count, IntegerType, false)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          col.putInt(i, i)
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          sum += col.getInt(i)
+          i += 1
+        }
+      }
+      col.close
+    }
+
+    // Access through the column API with off heap memory
+    def columnOffHeap = { i: Int => {
+      val col = ColumnVector.allocate(count, IntegerType, true)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          col.putInt(i, i)
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          sum += col.getInt(i)
+          i += 1
+        }
+      }
+      col.close
+    }}
+
+    // Access by directly getting the buffer backing the column.
+    val columnOffheapDirect = { i: Int =>
+      val col = ColumnVector.allocate(count, IntegerType, true)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var addr = col.valuesNativeAddress()
+        var i = 0
+        while (i < count) {
+          Platform.putInt(null, addr, i)
+          addr += 4
+          i += 1
+        }
+        i = 0
+        addr = col.valuesNativeAddress()
+        while (i < count) {
+          sum += Platform.getInt(null, addr)
+          addr += 4
+          i += 1
+        }
+      }
+      col.close
+    }
+
+    // Access by going through a batch of unsafe rows.
+    val unsafeRowOnheap = { i: Int =>
+      val buffer = new Array[Byte](count * 16)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        val row = new UnsafeRow(1)
+        var i = 0
+        while (i < count) {
+          row.pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * 16, 16)
+          row.setInt(0, i)
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          row.pointTo(buffer, Platform.BYTE_ARRAY_OFFSET + i * 16, 16)
+          sum += row.getInt(0)
+          i += 1
+        }
+      }
+    }
+
+    // Access by going through a batch of unsafe rows.
+    val unsafeRowOffheap = { i: Int =>
+      val buffer = Platform.allocateMemory(count * 16)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        val row = new UnsafeRow(1)
+        var i = 0
+        while (i < count) {
+          row.pointTo(null, buffer + i * 16, 16)
+          row.setInt(0, i)
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          row.pointTo(null, buffer + i * 16, 16)
+          sum += row.getInt(0)
+          i += 1
+        }
+      }
+      Platform.freeMemory(buffer)
+    }
+
+    /*
+    Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+    Int Read/Write:              Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+    -------------------------------------------------------------------------
+    Java Array                          248.8          1317.04         1.00 X
+    ByteBuffer Unsafe                   435.6           752.25         0.57 X
+    ByteBuffer API                     1752.0           187.03         0.14 X
+    DirectByteBuffer                    595.4           550.35         0.42 X
+    Unsafe Buffer                       235.2          1393.20         1.06 X
+    Column(on heap)                     189.8          1726.45         1.31 X
+    Column(off heap)                    408.4           802.35         0.61 X
+    Column(off heap direct)             237.6          1379.12         1.05 X
+    UnsafeRow (on heap)                 414.6           790.35         0.60 X
+    UnsafeRow (off heap)                487.2           672.58         0.51 X
+    */
+    val benchmark = new Benchmark("Int Read/Write", count * iters)
+    benchmark.addCase("Java Array")(javaArray)
+    benchmark.addCase("ByteBuffer Unsafe")(byteBufferUnsafe)
+    benchmark.addCase("ByteBuffer API")(byteBufferApi)
+    benchmark.addCase("DirectByteBuffer")(directByteBuffer)
+    benchmark.addCase("Unsafe Buffer")(unsafeBuffer)
+    benchmark.addCase("Column(on heap)")(columnOnHeap)
+    benchmark.addCase("Column(off heap)")(columnOffHeap)
+    benchmark.addCase("Column(off heap direct)")(columnOffheapDirect)
+    benchmark.addCase("UnsafeRow (on heap)")(unsafeRowOnheap)
+    benchmark.addCase("UnsafeRow (off heap)")(unsafeRowOffheap)
+    benchmark.run()
+  }
+
+  def booleanAccess(iters: Int): Unit = {
+    val count = 8 * 1024
+    val benchmark = new Benchmark("Boolean Read/Write", iters * count)
+    benchmark.addCase("Bitset") { i: Int => {
+      val b = new BitSet(count)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          if (i % 2 == 0) b.set(i)
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          if (b.get(i)) sum += 1
+          i += 1
+        }
+      }
+    }}
+
+    benchmark.addCase("Byte Array") { i: Int => {
+      val b = new Array[Byte](count)
+      var sum = 0L
+      for (n <- 0L until iters) {
+        var i = 0
+        while (i < count) {
+          if (i % 2 == 0) b(i) = 1;
+          i += 1
+        }
+        i = 0
+        while (i < count) {
+          if (b(i) == 1) sum += 1
+          i += 1
+        }
+      }
+    }}
+    /*
+    Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+    Boolean Read/Write:          Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+    -------------------------------------------------------------------------
+    Bitset                             895.88           374.54         1.00 X
+    Byte Array                         578.96           579.56         1.55 X
+    */
+    benchmark.run()
+  }
+
+  def main(args: Array[String]): Unit = {
+    intAccess(1024 * 40)
+    booleanAccess(1024 * 40)
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/92470849/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
new file mode 100644
index 0000000..305a83e
--- /dev/null
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -0,0 +1,317 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.vectorized
+
+import scala.collection.mutable
+import scala.util.Random
+
+import org.apache.spark.SparkFunSuite
+import org.apache.spark.sql.Row
+import org.apache.spark.sql.catalyst.InternalRow
+import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
+import org.apache.spark.unsafe.Platform
+
+class ColumnarBatchSuite extends SparkFunSuite {
+  test("Null Apis") {
+    (false :: true :: Nil).foreach { offHeap => {
+      val reference = mutable.ArrayBuffer.empty[Boolean]
+
+      val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+      var idx = 0
+      assert(column.anyNullsSet() == false)
+
+      column.putNotNull(idx)
+      reference += false
+      idx += 1
+      assert(column.anyNullsSet() == false)
+
+      column.putNull(idx)
+      reference += true
+      idx += 1
+      assert(column.anyNullsSet() == true)
+      assert(column.numNulls() == 1)
+
+      column.putNulls(idx, 3)
+      reference += true
+      reference += true
+      reference += true
+      idx += 3
+      assert(column.anyNullsSet() == true)
+
+      column.putNotNulls(idx, 4)
+      reference += false
+      reference += false
+      reference += false
+      reference += false
+      idx += 4
+      assert(column.anyNullsSet() == true)
+      assert(column.numNulls() == 4)
+
+      reference.zipWithIndex.foreach { v =>
+        assert(v._1 == column.getIsNull(v._2))
+        if (offHeap) {
+          val addr = column.nullsNativeAddress()
+          assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
+        }
+      }
+      column.close
+    }}
+  }
+
+  test("Int Apis") {
+    (false :: true :: Nil).foreach { offHeap => {
+      val seed = System.currentTimeMillis()
+      val random = new Random(seed)
+      val reference = mutable.ArrayBuffer.empty[Int]
+
+      val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+      var idx = 0
+
+      val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
+      column.putInts(idx, 2, values, 0)
+      reference += 1
+      reference += 2
+      idx += 2
+
+      column.putInts(idx, 3, values, 2)
+      reference += 3
+      reference += 4
+      reference += 5
+      idx += 3
+
+      val littleEndian = new Array[Byte](8)
+      littleEndian(0) = 7
+      littleEndian(1) = 1
+      littleEndian(4) = 6
+      littleEndian(6) = 1
+
+      column.putIntsLittleEndian(idx, 1, littleEndian, 4)
+      column.putIntsLittleEndian(idx + 1, 1, littleEndian, 0)
+      reference += 6 + (1 << 16)
+      reference += 7 + (1 << 8)
+      idx += 2
+
+      column.putIntsLittleEndian(idx, 2, littleEndian, 0)
+      reference += 7 + (1 << 8)
+      reference += 6 + (1 << 16)
+      idx += 2
+
+      while (idx < column.capacity) {
+        val single = random.nextBoolean()
+        if (single) {
+          val v = random.nextInt()
+          column.putInt(idx, v)
+          reference += v
+          idx += 1
+        } else {
+          val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
+          column.putInts(idx, n, n + 1)
+          var i = 0
+          while (i < n) {
+            reference += (n + 1)
+            i += 1
+          }
+          idx += n
+        }
+      }
+
+      reference.zipWithIndex.foreach { v =>
+        assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
+        if (offHeap) {
+          val addr = column.valuesNativeAddress()
+          assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
+        }
+      }
+      column.close
+    }}
+  }
+
+  test("Double APIs") {
+    (false :: true :: Nil).foreach { offHeap => {
+      val seed = System.currentTimeMillis()
+      val random = new Random(seed)
+      val reference = mutable.ArrayBuffer.empty[Double]
+
+      val column = ColumnVector.allocate(1024, DoubleType, offHeap)
+      var idx = 0
+
+      val values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
+      column.putDoubles(idx, 2, values, 0)
+      reference += 1.0
+      reference += 2.0
+      idx += 2
+
+      column.putDoubles(idx, 3, values, 2)
+      reference += 3.0
+      reference += 4.0
+      reference += 5.0
+      idx += 3
+
+      val buffer = new Array[Byte](16)
+      Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET, 2.234)
+      Platform.putDouble(buffer, Platform.BYTE_ARRAY_OFFSET + 8, 1.123)
+
+      column.putDoubles(idx, 1, buffer, 8)
+      column.putDoubles(idx + 1, 1, buffer, 0)
+      reference += 1.123
+      reference += 2.234
+      idx += 2
+
+      column.putDoubles(idx, 2, buffer, 0)
+      reference += 2.234
+      reference += 1.123
+      idx += 2
+
+      while (idx < column.capacity) {
+        val single = random.nextBoolean()
+        if (single) {
+          val v = random.nextDouble()
+          column.putDouble(idx, v)
+          reference += v
+          idx += 1
+        } else {
+          val n = math.min(random.nextInt(column.capacity / 20), column.capacity - idx)
+          val v = random.nextDouble()
+          column.putDoubles(idx, n, v)
+          var i = 0
+          while (i < n) {
+            reference += v
+            i += 1
+          }
+          idx += n
+        }
+      }
+
+      reference.zipWithIndex.foreach { v =>
+        assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
+        if (offHeap) {
+          val addr = column.valuesNativeAddress()
+          assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
+        }
+      }
+      column.close
+    }}
+  }
+
+  test("ColumnarBatch basic") {
+    (false :: true :: Nil).foreach { offHeap => {
+      val schema = new StructType()
+        .add("intCol", IntegerType)
+        .add("doubleCol", DoubleType)
+        .add("intCol2", IntegerType)
+
+      val batch = ColumnarBatch.allocate(schema, offHeap)
+      assert(batch.numCols() == 3)
+      assert(batch.numRows() == 0)
+      assert(batch.numValidRows() == 0)
+      assert(batch.capacity() > 0)
+      assert(batch.rowIterator().hasNext == false)
+
+      // Add a row [1, 1.1, NULL]
+      batch.column(0).putInt(0, 1)
+      batch.column(1).putDouble(0, 1.1)
+      batch.column(2).putNull(0)
+      batch.setNumRows(1)
+
+      // Verify the results of the row.
+      assert(batch.numCols() == 3)
+      assert(batch.numRows() == 1)
+      assert(batch.numValidRows() == 1)
+      assert(batch.rowIterator().hasNext == true)
+      assert(batch.rowIterator().hasNext == true)
+
+      assert(batch.column(0).getInt(0) == 1)
+      assert(batch.column(0).getIsNull(0) == false)
+      assert(batch.column(1).getDouble(0) == 1.1)
+      assert(batch.column(1).getIsNull(0) == false)
+      assert(batch.column(2).getIsNull(0) == true)
+
+      // Verify the iterator works correctly.
+      val it = batch.rowIterator()
+      assert(it.hasNext())
+      val row = it.next()
+      assert(row.getInt(0) == 1)
+      assert(row.isNullAt(0) == false)
+      assert(row.getDouble(1) == 1.1)
+      assert(row.isNullAt(1) == false)
+      assert(row.isNullAt(2) == true)
+      assert(it.hasNext == false)
+      assert(it.hasNext == false)
+
+      // Filter out the row.
+      row.markFiltered()
+      assert(batch.numRows() == 1)
+      assert(batch.numValidRows() == 0)
+      assert(batch.rowIterator().hasNext == false)
+
+      // Reset and add 3 throws
+      batch.reset()
+      assert(batch.numRows() == 0)
+      assert(batch.numValidRows() == 0)
+      assert(batch.rowIterator().hasNext == false)
+
+      // Add rows [NULL, 2.2, 2], [3, NULL, 3], [4, 4.4, 4]
+      batch.column(0).putNull(0)
+      batch.column(1).putDouble(0, 2.2)
+      batch.column(2).putInt(0, 2)
+
+      batch.column(0).putInt(1, 3)
+      batch.column(1).putNull(1)
+      batch.column(2).putInt(1, 3)
+
+      batch.column(0).putInt(2, 4)
+      batch.column(1).putDouble(2, 4.4)
+      batch.column(2).putInt(2, 4)
+      batch.setNumRows(3)
+
+      def rowEquals(x: InternalRow, y: Row): Unit = {
+        assert(x.isNullAt(0) == y.isNullAt(0))
+        if (!x.isNullAt(0)) assert(x.getInt(0) == y.getInt(0))
+
+        assert(x.isNullAt(1) == y.isNullAt(1))
+        if (!x.isNullAt(1)) assert(x.getDouble(1) == y.getDouble(1))
+
+        assert(x.isNullAt(2) == y.isNullAt(2))
+        if (!x.isNullAt(2)) assert(x.getInt(2) == y.getInt(2))
+      }
+      // Verify
+      assert(batch.numRows() == 3)
+      assert(batch.numValidRows() == 3)
+      val it2 = batch.rowIterator()
+      rowEquals(it2.next(), Row(null, 2.2, 2))
+      rowEquals(it2.next(), Row(3, null, 3))
+      rowEquals(it2.next(), Row(4, 4.4, 4))
+      assert(!it.hasNext)
+
+      // Filter out some rows and verify
+      batch.markFiltered(1)
+      assert(batch.numValidRows() == 2)
+      val it3 = batch.rowIterator()
+      rowEquals(it3.next(), Row(null, 2.2, 2))
+      rowEquals(it3.next(), Row(4, 4.4, 4))
+      assert(!it.hasNext)
+
+      batch.markFiltered(2)
+      assert(batch.numValidRows() == 1)
+      val it4 = batch.rowIterator()
+      rowEquals(it4.next(), Row(null, 2.2, 2))
+
+      batch.close
+    }}
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org