You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@spark.apache.org by rx...@apache.org on 2016/01/16 02:40:29 UTC

spark git commit: [SPARK-12644][SQL] Update parquet reader to be vectorized.

Repository: spark
Updated Branches:
  refs/heads/master 3b5ccb12b -> 9039333c0


[SPARK-12644][SQL] Update parquet reader to be vectorized.

This inlines a few of the Parquet decoders and adds vectorized APIs to support decoding in batch.
There are a few particulars in the Parquet encodings that make this much more efficient. In
particular, RLE encodings are very well suited for batch decoding. The Parquet 2.0 encodings are
also very suited for this.

This is a work in progress and does not affect the current execution. In subsequent patches, we will
support more encodings and types before enabling this.

Simple benchmarks indicate this can decode single ints about > 3x faster.

Author: Nong Li <no...@databricks.com>
Author: Nong <no...@gmail.com>

Closes #10593 from nongli/spark-12644.


Project: http://git-wip-us.apache.org/repos/asf/spark/repo
Commit: http://git-wip-us.apache.org/repos/asf/spark/commit/9039333c
Tree: http://git-wip-us.apache.org/repos/asf/spark/tree/9039333c
Diff: http://git-wip-us.apache.org/repos/asf/spark/diff/9039333c

Branch: refs/heads/master
Commit: 9039333c0a0ce4bea32f012b81c1e82e31246fc1
Parents: 3b5ccb1
Author: Nong Li <no...@databricks.com>
Authored: Fri Jan 15 17:40:26 2016 -0800
Committer: Reynold Xin <rx...@databricks.com>
Committed: Fri Jan 15 17:40:26 2016 -0800

----------------------------------------------------------------------
 .../scala/org/apache/spark/util/Benchmark.scala |   6 +-
 .../parquet/UnsafeRowParquetRecordReader.java   | 146 +++++++++-
 .../parquet/VectorizedPlainValuesReader.java    |  66 +++++
 .../parquet/VectorizedRleValuesReader.java      | 274 +++++++++++++++++++
 .../parquet/VectorizedValuesReader.java         |  37 +++
 .../sql/execution/vectorized/ColumnVector.java  |   9 +-
 .../sql/execution/vectorized/ColumnarBatch.java |  13 +-
 .../vectorized/OffHeapColumnVector.java         |   2 +
 .../vectorized/OnHeapColumnVector.java          |   1 +
 .../parquet/ParquetReadBenchmark.scala          |  93 ++++++-
 .../vectorized/ColumnarBatchBenchmark.scala     |   7 +-
 .../vectorized/ColumnarBatchSuite.scala         |  27 +-
 12 files changed, 625 insertions(+), 56 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/core/src/main/scala/org/apache/spark/util/Benchmark.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/org/apache/spark/util/Benchmark.scala b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
index 457a1a0..d484cec 100644
--- a/core/src/main/scala/org/apache/spark/util/Benchmark.scala
+++ b/core/src/main/scala/org/apache/spark/util/Benchmark.scala
@@ -62,10 +62,10 @@ private[spark] class Benchmark(
     val firstRate = results.head.avgRate
     // The results are going to be processor specific so it is useful to include that.
     println(Benchmark.getProcessorName())
-    printf("%-24s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
-    println("-------------------------------------------------------------------------")
+    printf("%-30s %16s %16s %14s\n", name + ":", "Avg Time(ms)", "Avg Rate(M/s)", "Relative Rate")
+    println("-------------------------------------------------------------------------------")
     results.zip(benchmarks).foreach { r =>
-      printf("%-24s %16s %16s %14s\n",
+      printf("%-30s %16s %16s %14s\n",
         r._2.name,
         "%10.2f" format r._1.avgMs,
         "%10.2f" format r._1.avgRate,

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
index 47818c0..80805f1 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/UnsafeRowParquetRecordReader.java
@@ -21,10 +21,10 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.List;
 
-import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.TaskAttemptContext;
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.Encoding;
@@ -35,9 +35,12 @@ import org.apache.parquet.schema.OriginalType;
 import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Type;
 
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow;
 import org.apache.spark.sql.catalyst.expressions.codegen.BufferHolder;
 import org.apache.spark.sql.catalyst.expressions.codegen.UnsafeRowWriter;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.sql.execution.vectorized.ColumnarBatch;
 import org.apache.spark.sql.types.Decimal;
 import org.apache.spark.unsafe.Platform;
 import org.apache.spark.unsafe.types.UTF8String;
@@ -103,6 +106,25 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
   private static final int DEFAULT_VAR_LEN_SIZE = 32;
 
   /**
+   * columnBatch object that is used for batch decoding. This is created on first use and triggers
+   * batched decoding. It is not valid to interleave calls to the batched interface with the row
+   * by row RecordReader APIs.
+   * This is only enabled with additional flags for development. This is still a work in progress
+   * and currently unsupported cases will fail with potentially difficult to diagnose errors.
+   * This should be only turned on for development to work on this feature.
+   *
+   * TODOs:
+   *  - Implement all the encodings to support vectorized.
+   *  - Implement v2 page formats (just make sure we create the correct decoders).
+   */
+  private ColumnarBatch columnarBatch;
+
+  /**
+   * The default config on whether columnarBatch should be offheap.
+   */
+  private static final MemoryMode DEFAULT_MEMORY_MODE = MemoryMode.ON_HEAP;
+
+  /**
    * Tries to initialize the reader for this split. Returns true if this reader supports reading
    * this split and false otherwise.
    */
@@ -136,6 +158,15 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
   }
 
   @Override
+  public void close() throws IOException {
+    if (columnarBatch != null) {
+      columnarBatch.close();
+      columnarBatch = null;
+    }
+    super.close();
+  }
+
+  @Override
   public boolean nextKeyValue() throws IOException, InterruptedException {
     if (batchIdx >= numBatched) {
       if (!loadBatch()) return false;
@@ -154,6 +185,46 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     return (float) rowsReturned / totalRowCount;
   }
 
+  /**
+   * Returns the ColumnarBatch object that will be used for all rows returned by this reader.
+   * This object is reused. Calling this enables the vectorized reader. This should be called
+   * before any calls to nextKeyValue/nextBatch.
+   */
+  public ColumnarBatch resultBatch() {
+    return resultBatch(DEFAULT_MEMORY_MODE);
+  }
+
+  public ColumnarBatch resultBatch(MemoryMode memMode) {
+    if (columnarBatch == null) {
+      columnarBatch = ColumnarBatch.allocate(sparkSchema, memMode);
+    }
+    return columnarBatch;
+  }
+
+  /**
+   * Advances to the next batch of rows. Returns false if there are no more.
+   */
+  public boolean nextBatch() throws IOException {
+    assert(columnarBatch != null);
+    columnarBatch.reset();
+    if (rowsReturned >= totalRowCount) return false;
+    checkEndOfRowGroup();
+
+    int num = (int)Math.min((long) columnarBatch.capacity(), totalRowCount - rowsReturned);
+    for (int i = 0; i < columnReaders.length; ++i) {
+      switch (columnReaders[i].descriptor.getType()) {
+        case INT32:
+          columnReaders[i].readIntBatch(num, columnarBatch.column(i));
+          break;
+        default:
+          throw new IOException("Unsupported type: " + columnReaders[i].descriptor.getType());
+      }
+    }
+    rowsReturned += num;
+    columnarBatch.setNumRows(num);
+    return true;
+  }
+
   private void initializeInternal() throws IOException {
     /**
      * Check that the requested schema is supported.
@@ -382,7 +453,7 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
    *
    * Decoder to return values from a single column.
    */
-  private static final class ColumnReader {
+  private final class ColumnReader {
     /**
      * Total number of values read.
      */
@@ -416,6 +487,10 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     private IntIterator definitionLevelColumn;
     private ValuesReader dataColumn;
 
+    // Only set if vectorized decoding is true. This is used instead of the row by row decoding
+    // with `definitionLevelColumn`.
+    private VectorizedRleValuesReader defColumn;
+
     /**
      * Total number of values in this column (in this row group).
      */
@@ -521,6 +596,35 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       return definitionLevelColumn.nextInt() == maxDefLevel;
     }
 
+    /**
+     * Reads `total` values from this columnReader into column.
+     * TODO: implement the other encodings.
+     */
+    private void readIntBatch(int total, ColumnVector column) throws IOException {
+      int rowId = 0;
+      while (total > 0) {
+        // Compute the number of values we want to read in this page.
+        int leftInPage = (int)(endOfPageValueCount - valuesRead);
+        if (leftInPage == 0) {
+          readPage();
+          leftInPage = (int)(endOfPageValueCount - valuesRead);
+        }
+        int num = Math.min(total, leftInPage);
+        defColumn.readIntegers(
+            num, column, rowId, maxDefLevel, (VectorizedValuesReader)dataColumn, 0);
+
+        // Remap the values if it is dictionary encoded.
+        if (useDictionary) {
+          for (int i = rowId; i < rowId + num; ++i) {
+            column.putInt(i, dictionary.decodeToInt(column.getInt(i)));
+          }
+        }
+        valuesRead += num;
+        rowId += num;
+        total -= num;
+      }
+    }
+
     private void readPage() throws IOException {
       DataPage page = pageReader.readPage();
       // TODO: Why is this a visitor?
@@ -547,21 +651,28 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
       });
     }
 
-    private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount)
-        throws IOException {
-      this.pageValueCount = valueCount;
+    private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset)throws IOException {
       this.endOfPageValueCount = valuesRead + pageValueCount;
       if (dataEncoding.usesDictionary()) {
+        this.dataColumn = null;
         if (dictionary == null) {
           throw new IOException(
               "could not read page in col " + descriptor +
                   " as the dictionary was missing for encoding " + dataEncoding);
         }
-        this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
-            descriptor, VALUES, dictionary);
+        if (columnarBatch != null && dataEncoding == Encoding.PLAIN_DICTIONARY) {
+          this.dataColumn = new VectorizedRleValuesReader();
+        } else {
+          this.dataColumn = dataEncoding.getDictionaryBasedValuesReader(
+              descriptor, VALUES, dictionary);
+        }
         this.useDictionary = true;
       } else {
-        this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+        if (columnarBatch != null && dataEncoding == Encoding.PLAIN) {
+          this.dataColumn = new VectorizedPlainValuesReader(4);
+        } else {
+          this.dataColumn = dataEncoding.getValuesReader(descriptor, VALUES);
+        }
         this.useDictionary = false;
       }
 
@@ -573,8 +684,19 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
     }
 
     private void readPageV1(DataPageV1 page) throws IOException {
+      this.pageValueCount = page.getValueCount();
       ValuesReader rlReader = page.getRlEncoding().getValuesReader(descriptor, REPETITION_LEVEL);
-      ValuesReader dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+      ValuesReader dlReader;
+
+      // Initialize the decoders. Use custom ones if vectorized decoding is enabled.
+      if (columnarBatch != null && page.getDlEncoding() == Encoding.RLE) {
+        int bitWidth = BytesUtils.getWidthFromMaxInt(descriptor.getMaxDefinitionLevel());
+        assert(bitWidth != 0); // not implemented
+        this.defColumn = new VectorizedRleValuesReader(bitWidth);
+        dlReader = this.defColumn;
+      } else {
+        dlReader = page.getDlEncoding().getValuesReader(descriptor, DEFINITION_LEVEL);
+      }
       this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
       this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
       try {
@@ -583,20 +705,20 @@ public class UnsafeRowParquetRecordReader extends SpecificParquetRecordReaderBas
         int next = rlReader.getNextOffset();
         dlReader.initFromPage(pageValueCount, bytes, next);
         next = dlReader.getNextOffset();
-        initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+        initDataReader(page.getValueEncoding(), bytes, next);
       } catch (IOException e) {
         throw new IOException("could not read page " + page + " in col " + descriptor, e);
       }
     }
 
     private void readPageV2(DataPageV2 page) throws IOException {
+      this.pageValueCount = page.getValueCount();
       this.repetitionLevelColumn = createRLEIterator(descriptor.getMaxRepetitionLevel(),
           page.getRepetitionLevels(), descriptor);
       this.definitionLevelColumn = createRLEIterator(descriptor.getMaxDefinitionLevel(),
           page.getDefinitionLevels(), descriptor);
       try {
-        initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0,
-            page.getValueCount());
+        initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0);
       } catch (IOException e) {
         throw new IOException("could not read page " + page + " in col " + descriptor, e);
       }

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
new file mode 100644
index 0000000..dac0c52
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedPlainValuesReader.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import java.io.IOException;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+import org.apache.spark.unsafe.Platform;
+
+import org.apache.parquet.column.values.ValuesReader;
+
+/**
+ * An implementation of the Parquet PLAIN decoder that supports the vectorized interface.
+ */
+public class VectorizedPlainValuesReader extends ValuesReader implements VectorizedValuesReader {
+  private byte[] buffer;
+  private int offset;
+  private final int byteSize;
+
+  public VectorizedPlainValuesReader(int byteSize) {
+    this.byteSize = byteSize;
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] bytes, int offset) throws IOException {
+    this.buffer = bytes;
+    this.offset = offset + Platform.BYTE_ARRAY_OFFSET;
+  }
+
+  @Override
+  public void skip() {
+    offset += byteSize;
+  }
+
+  @Override
+  public void skip(int n) {
+    offset += n * byteSize;
+  }
+
+  @Override
+  public void readIntegers(int total, ColumnVector c, int rowId) {
+    c.putIntsLittleEndian(rowId, total, buffer, offset - Platform.BYTE_ARRAY_OFFSET);
+    offset += 4 * total;
+  }
+
+  @Override
+  public int readInteger() {
+    int v = Platform.getInt(buffer, offset);
+    offset += 4;
+    return v;
+  }
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
new file mode 100644
index 0000000..493ec9d
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedRleValuesReader.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.BytesUtils;
+import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.column.values.bitpacking.BytePacker;
+import org.apache.parquet.column.values.bitpacking.Packer;
+import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+/**
+ * A values reader for Parquet's run-length encoded data. This is based off of the version in
+ * parquet-mr with these changes:
+ *  - Supports the vectorized interface.
+ *  - Works on byte arrays(byte[]) instead of making byte streams.
+ *
+ * This encoding is used in multiple places:
+ *  - Definition/Repetition levels
+ *  - Dictionary ids.
+ */
+public final class VectorizedRleValuesReader extends ValuesReader {
+  // Current decoding mode. The encoded data contains groups of either run length encoded data
+  // (RLE) or bit packed data. Each group contains a header that indicates which group it is and
+  // the number of values in the group.
+  // More details here: https://github.com/Parquet/parquet-format/blob/master/Encodings.md
+  private enum MODE {
+    RLE,
+    PACKED
+  }
+
+  // Encoded data.
+  private byte[] in;
+  private int end;
+  private int offset;
+
+  // bit/byte width of decoded data and utility to batch unpack them.
+  private int bitWidth;
+  private int bytesWidth;
+  private BytePacker packer;
+
+  // Current decoding mode and values
+  private MODE mode;
+  private int currentCount;
+  private int currentValue;
+
+  // Buffer of decoded values if the values are PACKED.
+  private int[] currentBuffer = new int[16];
+  private int currentBufferIdx = 0;
+
+  // If true, the bit width is fixed. This decoder is used in different places and this also
+  // controls if we need to read the bitwidth from the beginning of the data stream.
+  private final boolean fixedWidth;
+
+  public VectorizedRleValuesReader() {
+    fixedWidth = false;
+  }
+
+  public VectorizedRleValuesReader(int bitWidth) {
+    fixedWidth = true;
+    init(bitWidth);
+  }
+
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int start) {
+    this.offset = start;
+    this.in = page;
+    if (fixedWidth) {
+      int length = readIntLittleEndian();
+      this.end = this.offset + length;
+    } else {
+      this.end = page.length;
+      if (this.end != this.offset) init(page[this.offset++] & 255);
+    }
+    this.currentCount = 0;
+  }
+
+  /**
+   * Initializes the internal state for decoding ints of `bitWidth`.
+   */
+  private void init(int bitWidth) {
+    Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+    this.bitWidth = bitWidth;
+    this.bytesWidth = BytesUtils.paddedByteCountFromBits(bitWidth);
+    this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+  }
+
+  @Override
+  public int getNextOffset() {
+    return this.end;
+  }
+
+  @Override
+  public boolean readBoolean() {
+    return this.readInteger() != 0;
+  }
+
+  @Override
+  public void skip() {
+    this.readInteger();
+  }
+
+  @Override
+  public int readValueDictionaryId() {
+    return readInteger();
+  }
+
+  @Override
+  public int readInteger() {
+    if (this.currentCount == 0) { this.readNextGroup(); }
+
+    this.currentCount--;
+    switch (mode) {
+      case RLE:
+        return this.currentValue;
+      case PACKED:
+        return this.currentBuffer[currentBufferIdx++];
+    }
+    throw new RuntimeException("Unreachable");
+  }
+
+  /**
+   * Reads `total` ints into `c` filling them in starting at `c[rowId]`. This reader
+   * reads the definition levels and then will read from `data` for the non-null values.
+   * If the value is null, c will be populated with `nullValue`.
+   *
+   * This is a batched version of this logic:
+   *  if (this.readInt() == level) {
+   *    c[rowId] = data.readInteger();
+   *  } else {
+   *    c[rowId] = nullValue;
+   *  }
+   */
+  public void readIntegers(int total, ColumnVector c, int rowId, int level,
+      VectorizedValuesReader data, int nullValue) {
+    int left = total;
+    while (left > 0) {
+      if (this.currentCount == 0) this.readNextGroup();
+      int n = Math.min(left, this.currentCount);
+      switch (mode) {
+        case RLE:
+          if (currentValue == level) {
+            data.readIntegers(n, c, rowId);
+            c.putNotNulls(rowId, n);
+          } else {
+            c.putNulls(rowId, n);
+          }
+          break;
+        case PACKED:
+          for (int i = 0; i < n; ++i) {
+            if (currentBuffer[currentBufferIdx++] == level) {
+              c.putInt(rowId + i, data.readInteger());
+              c.putNotNull(rowId + i);
+            } else {
+              c.putInt(rowId + i, nullValue);
+              c.putNull(rowId + i);
+            }
+          }
+          break;
+      }
+      rowId += n;
+      left -= n;
+      currentCount -= n;
+    }
+  }
+
+  /**
+   * Reads the next varint encoded int.
+   */
+  private int readUnsignedVarInt() {
+    int value = 0;
+    int shift = 0;
+    int b;
+    do {
+      b = in[offset++] & 255;
+      value |= (b & 0x7F) << shift;
+      shift += 7;
+    } while ((b & 0x80) != 0);
+    return value;
+  }
+
+  /**
+   * Reads the next 4 byte little endian int.
+   */
+  private int readIntLittleEndian() {
+    int ch4 = in[offset] & 255;
+    int ch3 = in[offset + 1] & 255;
+    int ch2 = in[offset + 2] & 255;
+    int ch1 = in[offset + 3] & 255;
+    offset += 4;
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+
+  /**
+   * Reads the next byteWidth little endian int.
+   */
+  private int readIntLittleEndianPaddedOnBitWidth() {
+    switch (bytesWidth) {
+      case 0:
+        return 0;
+      case 1:
+        return in[offset++] & 255;
+      case 2: {
+        int ch2 = in[offset] & 255;
+        int ch1 = in[offset + 1] & 255;
+        offset += 2;
+        return (ch1 << 8) + ch2;
+      }
+      case 3: {
+        int ch3 = in[offset] & 255;
+        int ch2 = in[offset + 1] & 255;
+        int ch1 = in[offset + 2] & 255;
+        offset += 3;
+        return (ch1 << 16) + (ch2 << 8) + (ch3 << 0);
+      }
+      case 4: {
+        return readIntLittleEndian();
+      }
+    }
+    throw new RuntimeException("Unreachable");
+  }
+
+  /**
+   * Reads the next group.
+   */
+  private void readNextGroup()  {
+    Preconditions.checkArgument(this.offset < this.end,
+        "Reading past RLE/BitPacking stream. offset=" + this.offset + " end=" + this.end);
+    int header = readUnsignedVarInt();
+    this.mode = (header & 1) == 0 ? MODE.RLE : MODE.PACKED;
+    switch (mode) {
+      case RLE:
+        this.currentCount = header >>> 1;
+        this.currentValue = readIntLittleEndianPaddedOnBitWidth();
+        return;
+      case PACKED:
+        int numGroups = header >>> 1;
+        this.currentCount = numGroups * 8;
+
+        if (this.currentBuffer.length < this.currentCount) {
+          this.currentBuffer = new int[this.currentCount];
+        }
+        currentBufferIdx = 0;
+        int bytesToRead = (int)Math.ceil((double)(this.currentCount * this.bitWidth) / 8.0D);
+
+        bytesToRead = Math.min(bytesToRead, this.end - this.offset);
+        int valueIndex = 0;
+        for (int byteIndex = offset; valueIndex < this.currentCount; byteIndex += this.bitWidth) {
+          this.packer.unpack8Values(in, byteIndex, this.currentBuffer, valueIndex);
+          valueIndex += 8;
+        }
+        offset += bytesToRead;
+        return;
+      default:
+        throw new ParquetDecodingException("not a valid mode " + this.mode);
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
new file mode 100644
index 0000000..49a9ed8
--- /dev/null
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/VectorizedValuesReader.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.spark.sql.execution.datasources.parquet;
+
+import org.apache.spark.sql.execution.vectorized.ColumnVector;
+
+/**
+ * Interface for value decoding that supports vectorized (aka batched) decoding.
+ * TODO: merge this into parquet-mr.
+ */
+public interface VectorizedValuesReader {
+  int readInteger();
+
+  /*
+   * Reads `total` values into `c` start at `c[rowId]`
+   */
+  void readIntegers(int total, ColumnVector c, int rowId);
+
+  // TODO: add all the other parquet types.
+
+  void skip(int n);
+}

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
index d9dde92..8550975 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnVector.java
@@ -16,6 +16,7 @@
  */
 package org.apache.spark.sql.execution.vectorized;
 
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.types.DataType;
 
 /**
@@ -33,8 +34,8 @@ public abstract class ColumnVector {
   /**
    * Allocates a column with each element of size `width` either on or off heap.
    */
-  public static ColumnVector allocate(int capacity, DataType type, boolean offHeap) {
-    if (offHeap) {
+  public static ColumnVector allocate(int capacity, DataType type, MemoryMode mode) {
+    if (mode == MemoryMode.OFF_HEAP) {
       return new OffHeapColumnVector(capacity, type);
     } else {
       return new OnHeapColumnVector(capacity, type);
@@ -111,7 +112,7 @@ public abstract class ColumnVector {
   public abstract void putInts(int rowId, int count, int[] src, int srcIndex);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
    * The data in src must be 4-byte little endian ints.
    */
   public abstract void putIntsLittleEndian(int rowId, int count, byte[] src, int srcIndex);
@@ -138,7 +139,7 @@ public abstract class ColumnVector {
   public abstract void putDoubles(int rowId, int count, double[] src, int srcIndex);
 
   /**
-   * Sets values from [rowId, rowId + count) to [src + srcIndex, src + srcIndex + count)
+   * Sets values from [rowId, rowId + count) to [src[srcIndex], src[srcIndex + count])
    * The data in src must be ieee formated doubles.
    */
   public abstract void putDoubles(int rowId, int count, byte[] src, int srcIndex);

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
index 47defac..2c55f85 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/ColumnarBatch.java
@@ -19,6 +19,7 @@ package org.apache.spark.sql.execution.vectorized;
 import java.util.Arrays;
 import java.util.Iterator;
 
+import org.apache.spark.memory.MemoryMode;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.catalyst.util.ArrayData;
 import org.apache.spark.sql.catalyst.util.MapData;
@@ -59,12 +60,12 @@ public final class ColumnarBatch {
   // Total number of rows that have been filtered.
   private int numRowsFiltered = 0;
 
-  public static ColumnarBatch allocate(StructType schema, boolean offHeap) {
-    return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, offHeap);
+  public static ColumnarBatch allocate(StructType schema, MemoryMode memMode) {
+    return new ColumnarBatch(schema, DEFAULT_BATCH_SIZE, memMode);
   }
 
-  public static ColumnarBatch allocate(StructType schema, boolean offHeap, int maxRows) {
-    return new ColumnarBatch(schema, maxRows, offHeap);
+  public static ColumnarBatch allocate(StructType schema, MemoryMode memMode, int maxRows) {
+    return new ColumnarBatch(schema, maxRows, memMode);
   }
 
   /**
@@ -282,7 +283,7 @@ public final class ColumnarBatch {
     ++numRowsFiltered;
   }
 
-  private ColumnarBatch(StructType schema, int maxRows, boolean offHeap) {
+  private ColumnarBatch(StructType schema, int maxRows, MemoryMode memMode) {
     this.schema = schema;
     this.capacity = maxRows;
     this.columns = new ColumnVector[schema.size()];
@@ -290,7 +291,7 @@ public final class ColumnarBatch {
 
     for (int i = 0; i < schema.fields().length; ++i) {
       StructField field = schema.fields()[i];
-      columns[i] = ColumnVector.allocate(maxRows, field.dataType(), offHeap);
+      columns[i] = ColumnVector.allocate(maxRows, field.dataType(), memMode);
     }
   }
 }

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
index 2a9a2d1..6180dd3 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OffHeapColumnVector.java
@@ -49,6 +49,7 @@ public final class OffHeapColumnVector extends ColumnVector {
     } else {
       throw new RuntimeException("Unhandled " + type);
     }
+    anyNullsSet = true;
     reset();
   }
 
@@ -98,6 +99,7 @@ public final class OffHeapColumnVector extends ColumnVector {
 
   @Override
   public final void putNotNulls(int rowId, int count) {
+    if (!anyNullsSet) return;
     long offset = nulls + rowId;
     for (int i = 0; i < count; ++i, ++offset) {
       Platform.putByte(null, offset, (byte) 0);

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
----------------------------------------------------------------------
diff --git a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
index a7b3add..76d9956 100644
--- a/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
+++ b/sql/core/src/main/java/org/apache/spark/sql/execution/vectorized/OnHeapColumnVector.java
@@ -97,6 +97,7 @@ public final class OnHeapColumnVector extends ColumnVector {
 
   @Override
   public final void putNotNulls(int rowId, int count) {
+    if (!anyNullsSet) return;
     for (int i = 0; i < count; ++i) {
       nulls[rowId + i] = (byte)0;
     }

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
index ae95b50..14be9ee 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetReadBenchmark.scala
@@ -59,24 +59,31 @@ object ParquetReadBenchmark {
   }
 
   def intScanBenchmark(values: Int): Unit = {
+    // Benchmarks running through spark sql.
+    val sqlBenchmark = new Benchmark("SQL Single Int Column Scan", values)
+    // Benchmarks driving reader component directly.
+    val parquetReaderBenchmark = new Benchmark("Parquet Reader Single Int Column Scan", values)
+
     withTempPath { dir =>
-      sqlContext.range(values).write.parquet(dir.getCanonicalPath)
-      withTempTable("tempTable") {
+      withTempTable("t1", "tempTable") {
+        sqlContext.range(values).registerTempTable("t1")
+        sqlContext.sql("select cast(id as INT) as id from t1")
+            .write.parquet(dir.getCanonicalPath)
         sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
-        val benchmark = new Benchmark("Single Int Column Scan", values)
 
-        benchmark.addCase("SQL Parquet Reader") { iter =>
+        sqlBenchmark.addCase("SQL Parquet Reader") { iter =>
           sqlContext.sql("select sum(id) from tempTable").collect()
         }
 
-        benchmark.addCase("SQL Parquet MR") { iter =>
+        sqlBenchmark.addCase("SQL Parquet MR") { iter =>
           withSQLConf(SQLConf.PARQUET_UNSAFE_ROW_RECORD_READER_ENABLED.key -> "false") {
             sqlContext.sql("select sum(id) from tempTable").collect()
           }
         }
 
         val files = SpecificParquetRecordReaderBase.listDirectory(dir).toArray
-        benchmark.addCase("ParquetReader") { num =>
+        // Driving the parquet reader directly without Spark.
+        parquetReaderBenchmark.addCase("ParquetReader") { num =>
           var sum = 0L
           files.map(_.asInstanceOf[String]).foreach { p =>
             val reader = new UnsafeRowParquetRecordReader
@@ -87,26 +94,82 @@ object ParquetReadBenchmark {
               if (!record.isNullAt(0)) sum += record.getInt(0)
             }
             reader.close()
-        }}
+          }
+        }
+
+        // Driving the parquet reader in batch mode directly.
+        parquetReaderBenchmark.addCase("ParquetReader(Batched)") { num =>
+          var sum = 0L
+          files.map(_.asInstanceOf[String]).foreach { p =>
+            val reader = new UnsafeRowParquetRecordReader
+            try {
+              reader.initialize(p, ("id" :: Nil).asJava)
+              val batch = reader.resultBatch()
+              val col = batch.column(0)
+              while (reader.nextBatch()) {
+                val numRows = batch.numRows()
+                var i = 0
+                while (i < numRows) {
+                  if (!col.getIsNull(i)) sum += col.getInt(i)
+                  i += 1
+                }
+              }
+            } finally {
+              reader.close()
+            }
+          }
+        }
+
+        // Decoding in vectorized but having the reader return rows.
+        parquetReaderBenchmark.addCase("ParquetReader(Batch -> Row)") { num =>
+          var sum = 0L
+          files.map(_.asInstanceOf[String]).foreach { p =>
+            val reader = new UnsafeRowParquetRecordReader
+            try {
+              reader.initialize(p, ("id" :: Nil).asJava)
+              val batch = reader.resultBatch()
+              while (reader.nextBatch()) {
+                val it = batch.rowIterator()
+                while (it.hasNext) {
+                  val record = it.next()
+                  if (!record.isNullAt(0)) sum += record.getInt(0)
+                }
+              }
+            } finally {
+              reader.close()
+            }
+          }
+        }
 
         /*
-          Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
-          Single Int Column Scan:      Avg Time(ms)    Avg Rate(M/s)  Relative Rate
-          -------------------------------------------------------------------------
-          SQL Parquet Reader                 1910.0            13.72         1.00 X
-          SQL Parquet MR                     2330.0            11.25         0.82 X
-          ParquetReader                      1252.6            20.93         1.52 X
+        Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+        Single Int Column Scan:            Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+        -------------------------------------------------------------------------------
+        SQL Parquet Reader                       1682.6            15.58         1.00 X
+        SQL Parquet MR                           2379.6            11.02         0.71 X
         */
-        benchmark.run()
+        sqlBenchmark.run()
+
+        /*
+        Intel(R) Core(TM) i7-4870HQ CPU @ 2.50GHz
+        Parquet Reader Single Int Column Scan:     Avg Time(ms)    Avg Rate(M/s)  Relative Rate
+        -------------------------------------------------------------------------------
+        ParquetReader                            610.40            25.77         1.00 X
+        ParquetReader(Batched)                   172.66            91.10         3.54 X
+        ParquetReader(Batch -> Row)              192.28            81.80         3.17 X
+        */
+        parquetReaderBenchmark.run()
       }
     }
   }
 
   def intStringScanBenchmark(values: Int): Unit = {
+    val benchmark = new Benchmark("Int and String Scan", values)
+
     withTempPath { dir =>
       withTempTable("t1", "tempTable") {
         sqlContext.range(values).registerTempTable("t1")
-        sqlContext.sql("select id as c1, cast(id as STRING) as c2 from t1")
+        sqlContext.sql("select cast(id as INT) as c1, cast(id as STRING) as c2 from t1")
             .write.parquet(dir.getCanonicalPath)
         sqlContext.read.parquet(dir.getCanonicalPath).registerTempTable("tempTable")
 

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
index e28153d..bfe944d 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchBenchmark.scala
@@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.datasources.parquet
 
 import java.nio.ByteBuffer
 
+import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql.catalyst.expressions.UnsafeRow
 import org.apache.spark.sql.execution.vectorized.ColumnVector
 import org.apache.spark.sql.types.IntegerType
@@ -136,7 +137,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with on heap memory
     val columnOnHeap = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, false)
+      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.ON_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -155,7 +156,7 @@ object ColumnarBatchBenchmark {
 
     // Access through the column API with off heap memory
     def columnOffHeap = { i: Int => {
-      val col = ColumnVector.allocate(count, IntegerType, true)
+      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var i = 0
@@ -174,7 +175,7 @@ object ColumnarBatchBenchmark {
 
     // Access by directly getting the buffer backing the column.
     val columnOffheapDirect = { i: Int =>
-      val col = ColumnVector.allocate(count, IntegerType, true)
+      val col = ColumnVector.allocate(count, IntegerType, MemoryMode.OFF_HEAP)
       var sum = 0L
       for (n <- 0L until iters) {
         var addr = col.valuesNativeAddress()

http://git-wip-us.apache.org/repos/asf/spark/blob/9039333c/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
----------------------------------------------------------------------
diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
index 305a83e..d5e517c 100644
--- a/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
+++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/vectorized/ColumnarBatchSuite.scala
@@ -21,6 +21,7 @@ import scala.collection.mutable
 import scala.util.Random
 
 import org.apache.spark.SparkFunSuite
+import org.apache.spark.memory.MemoryMode
 import org.apache.spark.sql.Row
 import org.apache.spark.sql.catalyst.InternalRow
 import org.apache.spark.sql.types.{DoubleType, IntegerType, StructType}
@@ -28,10 +29,10 @@ import org.apache.spark.unsafe.Platform
 
 class ColumnarBatchSuite extends SparkFunSuite {
   test("Null Apis") {
-    (false :: true :: Nil).foreach { offHeap => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val reference = mutable.ArrayBuffer.empty[Boolean]
 
-      val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+      val column = ColumnVector.allocate(1024, IntegerType, memMode)
       var idx = 0
       assert(column.anyNullsSet() == false)
 
@@ -64,7 +65,7 @@ class ColumnarBatchSuite extends SparkFunSuite {
 
       reference.zipWithIndex.foreach { v =>
         assert(v._1 == column.getIsNull(v._2))
-        if (offHeap) {
+        if (memMode == MemoryMode.OFF_HEAP) {
           val addr = column.nullsNativeAddress()
           assert(v._1 == (Platform.getByte(null, addr + v._2) == 1), "index=" + v._2)
         }
@@ -74,12 +75,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   test("Int Apis") {
-    (false :: true :: Nil).foreach { offHeap => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Int]
 
-      val column = ColumnVector.allocate(1024, IntegerType, offHeap)
+      val column = ColumnVector.allocate(1024, IntegerType, memMode)
       var idx = 0
 
       val values = (1 :: 2 :: 3 :: 4 :: 5 :: Nil).toArray
@@ -131,8 +132,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
-        if (offHeap) {
+        assert(v._1 == column.getInt(v._2), "Seed = " + seed + " Mem Mode=" + memMode)
+        if (memMode == MemoryMode.OFF_HEAP) {
           val addr = column.valuesNativeAddress()
           assert(v._1 == Platform.getInt(null, addr + 4 * v._2))
         }
@@ -142,12 +143,12 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   test("Double APIs") {
-    (false :: true :: Nil).foreach { offHeap => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val seed = System.currentTimeMillis()
       val random = new Random(seed)
       val reference = mutable.ArrayBuffer.empty[Double]
 
-      val column = ColumnVector.allocate(1024, DoubleType, offHeap)
+      val column = ColumnVector.allocate(1024, DoubleType, memMode)
       var idx = 0
 
       val values = (1.0 :: 2.0 :: 3.0 :: 4.0 :: 5.0 :: Nil).toArray
@@ -198,8 +199,8 @@ class ColumnarBatchSuite extends SparkFunSuite {
       }
 
       reference.zipWithIndex.foreach { v =>
-        assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " Off Heap=" + offHeap)
-        if (offHeap) {
+        assert(v._1 == column.getDouble(v._2), "Seed = " + seed + " MemMode=" + memMode)
+        if (memMode == MemoryMode.OFF_HEAP) {
           val addr = column.valuesNativeAddress()
           assert(v._1 == Platform.getDouble(null, addr + 8 * v._2))
         }
@@ -209,13 +210,13 @@ class ColumnarBatchSuite extends SparkFunSuite {
   }
 
   test("ColumnarBatch basic") {
-    (false :: true :: Nil).foreach { offHeap => {
+    (MemoryMode.ON_HEAP :: MemoryMode.OFF_HEAP :: Nil).foreach { memMode => {
       val schema = new StructType()
         .add("intCol", IntegerType)
         .add("doubleCol", DoubleType)
         .add("intCol2", IntegerType)
 
-      val batch = ColumnarBatch.allocate(schema, offHeap)
+      val batch = ColumnarBatch.allocate(schema, memMode)
       assert(batch.numCols() == 3)
       assert(batch.numRows() == 0)
       assert(batch.numValidRows() == 0)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@spark.apache.org
For additional commands, e-mail: commits-help@spark.apache.org