You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2015/11/04 18:57:35 UTC

[3/4] parquet-mr git commit: PARQUET-77: ByteBuffer use in read and write paths

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
index eb9fdd9..86edd79 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesWriter.java
@@ -36,9 +36,11 @@ import it.unimi.dsi.fastutil.objects.Object2IntMap;
 import it.unimi.dsi.fastutil.objects.ObjectIterator;
 
 import java.io.IOException;
-import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.Iterator;
+import java.util.List;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
@@ -92,17 +94,28 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
   /* dictionary encoded values */
   protected IntList encodedValues = new IntList();
 
+  /** indicates if this is the first page being processed */
+  protected boolean firstPage = true;
+
+  protected ByteBufferAllocator allocator;
+  /* Track the list of writers used so they can be appropriately closed when necessary
+     (currently used for off-heap memory which is not garbage collected) */
+  private List<RunLengthBitPackingHybridEncoder> encoders = new ArrayList<RunLengthBitPackingHybridEncoder>();
+
   /**
    * @param maxDictionaryByteSize
    */
-  protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
+  protected DictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+    this.allocator = allocator;
     this.maxDictionaryByteSize = maxDictionaryByteSize;
     this.encodingForDataPage = encodingForDataPage;
     this.encodingForDictionaryPage = encodingForDictionaryPage;
   }
 
-  protected DictionaryPage dictPage(ValuesWriter dictionaryEncoder) {
-    return new DictionaryPage(dictionaryEncoder.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage);
+  protected DictionaryPage dictPage(ValuesWriter dictPageWriter) {
+    DictionaryPage ret = new DictionaryPage(dictPageWriter.getBytes(), lastUsedDictionarySize, encodingForDictionaryPage);
+    dictPageWriter.close();
+    return ret;
   }
 
   @Override
@@ -147,12 +160,12 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
     int maxDicId = getDictionarySize() - 1;
     if (DEBUG) LOG.debug("max dic id " + maxDicId);
     int bitWidth = BytesUtils.getWidthFromMaxInt(maxDicId);
-
     int initialSlabSize =
         CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_INITIAL_SLAB_SIZE, maxDictionaryByteSize, 10);
 
     RunLengthBitPackingHybridEncoder encoder =
-        new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize);
+        new RunLengthBitPackingHybridEncoder(bitWidth, initialSlabSize, maxDictionaryByteSize, this.allocator);
+    encoders.add(encoder);
     IntIterator iterator = encodedValues.iterator();
     try {
       while (iterator.hasNext()) {
@@ -179,10 +192,20 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
 
   @Override
   public void reset() {
+    close();
     encodedValues = new IntList();
   }
 
   @Override
+  public void close() {
+    encodedValues = null;
+    for (RunLengthBitPackingHybridEncoder encoder : encoders) {
+      encoder.close();
+    }
+    encoders.clear();
+  }
+
+  @Override
   public void resetDictionary() {
     lastUsedDictionaryByteSize = 0;
     lastUsedDictionarySize = 0;
@@ -225,8 +248,8 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
     /**
      * @param maxDictionaryByteSize
      */
-    public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
-      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+    public PlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
       binaryDictionaryContent.defaultReturnValue(-1);
     }
 
@@ -243,10 +266,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
     }
 
     @Override
-    public DictionaryPage createDictionaryPage() {
+    public DictionaryPage toDictPageAndClose() {
       if (lastUsedDictionarySize > 0) {
         // return a dictionary only if we actually used it
-        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
         Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
         // write only the part of the dict that we used
         for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -294,10 +317,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
 
     /**
      * @param maxDictionaryByteSize
-     * @param initialSize
      */
-    public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
-      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+    public PlainFixedLenArrayDictionaryValuesWriter(int maxDictionaryByteSize, int length, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
       this.length = length;
     }
 
@@ -313,10 +335,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
     }
 
     @Override
-    public DictionaryPage createDictionaryPage() {
+    public DictionaryPage toDictPageAndClose() {
       if (lastUsedDictionarySize > 0) {
         // return a dictionary only if we actually used it
-        FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        FixedLenByteArrayPlainValuesWriter dictionaryEncoder = new FixedLenByteArrayPlainValuesWriter(length, lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
         Iterator<Binary> binaryIterator = binaryDictionaryContent.keySet().iterator();
         // write only the part of the dict that we used
         for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -339,10 +361,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
 
     /**
      * @param maxDictionaryByteSize
-     * @param initialSize
      */
-    public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
-      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+    public PlainLongDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
       longDictionaryContent.defaultReturnValue(-1);
     }
 
@@ -358,10 +379,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
     }
 
     @Override
-    public DictionaryPage createDictionaryPage() {
+    public DictionaryPage toDictPageAndClose() {
       if (lastUsedDictionarySize > 0) {
         // return a dictionary only if we actually used it
-        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
         LongIterator longIterator = longDictionaryContent.keySet().iterator();
         // write only the part of the dict that we used
         for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -411,10 +432,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
 
     /**
      * @param maxDictionaryByteSize
-     * @param initialSize
      */
-    public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
-      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+    public PlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
       doubleDictionaryContent.defaultReturnValue(-1);
     }
 
@@ -430,10 +450,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
     }
 
     @Override
-    public DictionaryPage createDictionaryPage() {
+    public DictionaryPage toDictPageAndClose() {
       if (lastUsedDictionarySize > 0) {
         // return a dictionary only if we actually used it
-        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
         DoubleIterator doubleIterator = doubleDictionaryContent.keySet().iterator();
         // write only the part of the dict that we used
         for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -483,10 +503,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
 
     /**
      * @param maxDictionaryByteSize
-     * @param initialSize
      */
-    public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
-      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+    public PlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
       intDictionaryContent.defaultReturnValue(-1);
     }
 
@@ -502,10 +521,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
     }
 
     @Override
-    public DictionaryPage createDictionaryPage() {
+    public DictionaryPage toDictPageAndClose() {
       if (lastUsedDictionarySize > 0) {
         // return a dictionary only if we actually used it
-        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
         it.unimi.dsi.fastutil.ints.IntIterator intIterator = intDictionaryContent.keySet().iterator();
         // write only the part of the dict that we used
         for (int i = 0; i < lastUsedDictionarySize; i++) {
@@ -555,10 +574,9 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
 
     /**
      * @param maxDictionaryByteSize
-     * @param initialSize
      */
-    public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage) {
-      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage);
+    public PlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, Encoding encodingForDataPage, Encoding encodingForDictionaryPage, ByteBufferAllocator allocator) {
+      super(maxDictionaryByteSize, encodingForDataPage, encodingForDictionaryPage, allocator);
       floatDictionaryContent.defaultReturnValue(-1);
     }
 
@@ -574,10 +592,10 @@ public abstract class DictionaryValuesWriter extends ValuesWriter implements Req
     }
 
     @Override
-    public DictionaryPage createDictionaryPage() {
+    public DictionaryPage toDictPageAndClose() {
       if (lastUsedDictionarySize > 0) {
         // return a dictionary only if we actually used it
-        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize);
+        PlainValuesWriter dictionaryEncoder = new PlainValuesWriter(lastUsedDictionaryByteSize, maxDictionaryByteSize, allocator);
         FloatIterator floatIterator = floatDictionaryContent.keySet().iterator();
         // write only the part of the dict that we used
         for (int i = 0; i < lastUsedDictionarySize; i++) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
index e671310..0fa6cc6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -23,6 +23,7 @@ import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
 import static org.apache.parquet.column.Encoding.PLAIN;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.column.Dictionary;
@@ -86,9 +87,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryBytes = dictionaryPage.getBytes().toByteBuffer();
       binaryDictionaryContent = new Binary[dictionaryPage.getDictionarySize()];
-      int offset = 0;
+      // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes
+      int offset = dictionaryBytes.position();
       if (length == null) {
         // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes
         for (int i = 0; i < binaryDictionaryContent.length; i++) {
@@ -96,7 +98,7 @@ public abstract class PlainValuesDictionary extends Dictionary {
           // read the length
           offset += 4;
           // wrap the content in a binary
-          binaryDictionaryContent[i] = Binary.fromConstantByteArray(dictionaryBytes, offset, len);
+          binaryDictionaryContent[i] = Binary.fromConstantByteBuffer(dictionaryBytes, offset, len);
           // increment to the next value
           offset += len;
         }
@@ -106,7 +108,7 @@ public abstract class PlainValuesDictionary extends Dictionary {
             "Invalid byte array length: " + length);
         for (int i = 0; i < binaryDictionaryContent.length; i++) {
           // wrap the content in a Binary
-          binaryDictionaryContent[i] = Binary.fromConstantByteArray(
+          binaryDictionaryContent[i] = Binary.fromConstantByteBuffer(
               dictionaryBytes, offset, length);
           // increment to the next value
           offset += length;
@@ -148,10 +150,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
       longDictionaryContent = new long[dictionaryPage.getDictionarySize()];
       LongPlainValuesReader longReader = new LongPlainValuesReader();
-      longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
       for (int i = 0; i < longDictionaryContent.length; i++) {
         longDictionaryContent[i] = longReader.readLong();
       }
@@ -191,10 +193,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
       doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()];
       DoublePlainValuesReader doubleReader = new DoublePlainValuesReader();
-      doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0);
       for (int i = 0; i < doubleDictionaryContent.length; i++) {
         doubleDictionaryContent[i] = doubleReader.readDouble();
       }
@@ -234,10 +236,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
       intDictionaryContent = new int[dictionaryPage.getDictionarySize()];
       IntegerPlainValuesReader intReader = new IntegerPlainValuesReader();
-      intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0);
       for (int i = 0; i < intDictionaryContent.length; i++) {
         intDictionaryContent[i] = intReader.readInteger();
       }
@@ -277,10 +279,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
       floatDictionaryContent = new float[dictionaryPage.getDictionarySize()];
       FloatPlainValuesReader floatReader = new FloatPlainValuesReader();
-      floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
       for (int i = 0; i < floatDictionaryContent.length; i++) {
         floatDictionaryContent[i] = floatReader.readFloat();
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
index f66c7c9..19fed7d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/fallback/FallbackValuesWriter.java
@@ -97,11 +97,17 @@ public class FallbackValuesWriter<I extends ValuesWriter & RequiresFallback, F e
     currentWriter.reset();
   }
 
-  public DictionaryPage createDictionaryPage() {
+  @Override
+  public void close() {
+    initialWriter.close();
+    fallBackWriter.close();
+  }
+
+  public DictionaryPage toDictPageAndClose() {
     if (initialUsedAndHadDictionary) {
-      return initialWriter.createDictionaryPage();
+      return initialWriter.toDictPageAndClose();
     } else {
-      return currentWriter.createDictionaryPage();
+      return currentWriter.toDictPageAndClose();
     }
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
index 4346e02..26f5e29 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.plain;
 import static org.apache.parquet.Log.DEBUG;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesUtils;
@@ -30,7 +31,7 @@ import org.apache.parquet.io.api.Binary;
 
 public class BinaryPlainValuesReader extends ValuesReader {
   private static final Log LOG = Log.getLog(BinaryPlainValuesReader.class);
-  private byte[] in;
+  private ByteBuffer in;
   private int offset;
 
   @Override
@@ -39,7 +40,7 @@ public class BinaryPlainValuesReader extends ValuesReader {
       int length = BytesUtils.readIntLittleEndian(in, offset);
       int start = offset + 4;
       offset = start + length;
-      return Binary.fromConstantByteArray(in, start, length);
+      return Binary.fromConstantByteBuffer(in, start, length);
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
     } catch (RuntimeException e) {
@@ -60,11 +61,10 @@ public class BinaryPlainValuesReader extends ValuesReader {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset)
+  public void initFromPage(int valueCount, ByteBuffer in, int offset)
       throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
     this.in = in;
     this.offset = offset;
   }
-
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
index 31e711f..c330490 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -22,6 +22,7 @@ import static org.apache.parquet.Log.DEBUG;
 import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.column.values.ValuesReader;
@@ -62,8 +63,8 @@ public class BooleanPlainValuesReader extends ValuesReader {
    * @see org.apache.parquet.column.values.ValuesReader#initFromPage(byte[], int)
    */
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
     this.in.initFromPage(valueCount, in, offset);
   }
   

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
index 78920f0..c3e88ea 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column.values.plain;
 
+
 import static org.apache.parquet.column.Encoding.PLAIN;
 import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
 import org.apache.parquet.bytes.BytesInput;
@@ -61,6 +62,11 @@ public class BooleanPlainValuesWriter extends ValuesWriter {
   }
 
   @Override
+  public void close() {
+    bitPackingWriter.close();
+  }
+
+  @Override
   public long getAllocatedSize() {
     return bitPackingWriter.getAllocatedSize();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 098a486..8496e7e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import org.apache.parquet.Log;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -33,7 +34,7 @@ import static org.apache.parquet.Log.DEBUG;
  */
 public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
   private static final Log LOG = Log.getLog(FixedLenByteArrayPlainValuesReader.class);
-  private byte[] in;
+  private ByteBuffer in;
   private int offset;
   private int length;
 
@@ -46,7 +47,7 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
     try {
       int start = offset;
       offset = start + length;
-      return Binary.fromConstantByteArray(in, start, length);
+      return Binary.fromConstantByteBuffer(in, start, length);
     } catch (RuntimeException e) {
       throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
     }
@@ -58,9 +59,9 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset)
+  public void initFromPage(int valueCount, ByteBuffer in, int offset)
       throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
     this.in = in;
     this.offset = offset;
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
index 986ae0b..6ab2dea 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesWriter.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
@@ -40,10 +41,13 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
   private CapacityByteArrayOutputStream arrayOut;
   private LittleEndianDataOutputStream out;
   private int length;
+  private ByteBufferAllocator allocator;
+  
 
-  public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize) {
+  public FixedLenByteArrayPlainValuesWriter(int length, int initialSize, int pageSize, ByteBufferAllocator allocator) {
     this.length = length;
-    this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+    this.allocator = allocator;
+    this.arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, this.allocator);
     this.out = new LittleEndianDataOutputStream(arrayOut);
   }
 
@@ -82,6 +86,11 @@ public class FixedLenByteArrayPlainValuesWriter extends ValuesWriter {
   }
 
   @Override
+  public void close() {
+    arrayOut.close();
+  }
+
+  @Override
   public long getAllocatedSize() {
     return arrayOut.getCapacity();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
index bd938ee..c8fb303 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
@@ -20,9 +20,10 @@ package org.apache.parquet.column.values.plain;
 
 import static org.apache.parquet.Log.DEBUG;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.LittleEndianDataInputStream;
 import org.apache.parquet.column.values.ValuesReader;
@@ -41,12 +42,16 @@ abstract public class PlainValuesReader extends ValuesReader {
 
   /**
    * {@inheritDoc}
-   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(byte[], int)
+   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
    */
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
-    this.in = new LittleEndianDataInputStream(new ByteArrayInputStream(in, offset, in.length - offset));
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
+    this.in = new LittleEndianDataInputStream(toInputStream(in, offset));
+  }
+
+  private ByteBufferInputStream toInputStream(ByteBuffer in, int offset) {
+    return new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset);
   }
 
   public static class DoublePlainValuesReader extends PlainValuesReader {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
index f33bd81..add5495 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesWriter.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.plain;
 import java.io.IOException;
 import java.nio.charset.Charset;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
@@ -44,8 +45,8 @@ public class PlainValuesWriter extends ValuesWriter {
   private CapacityByteArrayOutputStream arrayOut;
   private LittleEndianDataOutputStream out;
 
-  public PlainValuesWriter(int initialSize, int pageSize) {
-    arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+  public PlainValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
+    arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
     out = new LittleEndianDataOutputStream(arrayOut);
   }
 
@@ -126,6 +127,12 @@ public class PlainValuesWriter extends ValuesWriter {
   }
 
   @Override
+  public void close() {
+    arrayOut.close();
+    out.close();
+  }
+
+  @Override
   public long getAllocatedSize() {
     return arrayOut.getCapacity();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index 38eb354..1280e8d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -20,10 +20,12 @@ package org.apache.parquet.column.values.rle;
 
 import static org.apache.parquet.Log.DEBUG;
 
-import java.io.ByteArrayInputStream;
 import java.io.DataInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.Log;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.BytesUtils;
@@ -43,14 +45,14 @@ public class RunLengthBitPackingHybridDecoder {
 
   private final int bitWidth;
   private final BytePacker packer;
-  private final ByteArrayInputStream in;
+  private final InputStream in;
 
   private MODE mode;
   private int currentCount;
   private int currentValue;
   private int[] currentBuffer;
 
-  public RunLengthBitPackingHybridDecoder(int bitWidth, ByteArrayInputStream in) {
+  public RunLengthBitPackingHybridDecoder(int bitWidth, InputStream in) {
     if (DEBUG) LOG.debug("decoding bitWidth " + bitWidth);
 
     Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
index 9d37574..001d3f6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridEncoder.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.rle;
 
 import java.io.IOException;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.BytesInput;
@@ -116,7 +117,7 @@ public class RunLengthBitPackingHybridEncoder {
 
   private boolean toBytesCalled;
 
-  public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize) {
+  public RunLengthBitPackingHybridEncoder(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
     if (DEBUG) {
       LOG.debug(String.format("Encoding: RunLengthBitPackingHybridEncoder with "
         + "bithWidth: %d initialCapacity %d", bitWidth, initialCapacity));
@@ -125,7 +126,7 @@ public class RunLengthBitPackingHybridEncoder {
     Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
 
     this.bitWidth = bitWidth;
-    this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
+    this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator);
     this.packBuffer = new byte[bitWidth];
     this.bufferedValues = new int[8];
     this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
@@ -281,6 +282,11 @@ public class RunLengthBitPackingHybridEncoder {
     reset(true);
   }
 
+  public void close() {
+    reset(false);
+    baos.close();
+  }
+
   public long getBufferedSize() {
     return baos.size();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index bd4e11d..4ccf2b8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -18,9 +18,10 @@
  */
 package org.apache.parquet.column.values.rle;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -41,8 +42,8 @@ public class RunLengthBitPackingHybridValuesReader extends ValuesReader {
   }
 
   @Override
-  public void initFromPage(int valueCountL, byte[] page, int offset) throws IOException {
-    ByteArrayInputStream in = new ByteArrayInputStream(page, offset, page.length - offset);
+  public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException {
+    ByteBufferInputStream in = new ByteBufferInputStream(page, offset, page.limit() - offset);
     int length = BytesUtils.readIntLittleEndian(in);
 
     decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
index bccfd34..14ef161 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesWriter.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.rle;
 
 import java.io.IOException;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Ints;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Encoding;
@@ -32,8 +33,8 @@ import org.apache.parquet.io.ParquetEncodingException;
 public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
   private final RunLengthBitPackingHybridEncoder encoder;
 
-  public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize) {
-    this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize);
+  public RunLengthBitPackingHybridValuesWriter(int bitWidth, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
+    this.encoder = new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity, pageSize, allocator);
   }
 
   @Override
@@ -82,6 +83,11 @@ public class RunLengthBitPackingHybridValuesWriter extends ValuesWriter {
   }
 
   @Override
+  public void close() {
+    encoder.close();
+  }
+
+  @Override
   public String memUsageString(String prefix) {
     return String.format("%s RunLengthBitPackingHybrid %d bytes", prefix, getAllocatedSize());
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
index f88d740..ff833ec 100644
--- a/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/io/api/Binary.java
@@ -62,12 +62,16 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   abstract boolean equals(byte[] bytes, int offset, int length);
 
+  abstract boolean equals(ByteBuffer bytes, int offset, int length);
+
   abstract boolean equals(Binary other);
 
   abstract public int compareTo(Binary other);
 
   abstract int compareTo(byte[] bytes, int offset, int length);
 
+  abstract int compareTo(ByteBuffer bytes, int offset, int length);
+
   abstract public ByteBuffer toByteBuffer();
 
   @Override
@@ -174,6 +178,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     }
 
     @Override
+    boolean equals(ByteBuffer bytes, int otherOffset, int otherLength) {
+      return Binary.equals(value, offset, length, bytes, otherOffset, otherLength);
+    }
+
+    @Override
     public int compareTo(Binary other) {
       return other.compareTo(value, offset, length);
     }
@@ -184,6 +193,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     }
 
     @Override
+    int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
+      return Binary.compareByteArrayToByteBuffer(value, offset, length, bytes, otherOffset, otherLength);
+    }
+
+    @Override
     public ByteBuffer toByteBuffer() {
       return ByteBuffer.wrap(value, offset, length);
     }
@@ -292,6 +306,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     }
 
     @Override
+    boolean equals(ByteBuffer bytes, int otherOffset, int otherLength) {
+      return Binary.equals(value, 0, value.length, bytes, otherOffset, otherLength);
+    }
+
+    @Override
     public int compareTo(Binary other) {
       return other.compareTo(value, 0, value.length);
     }
@@ -302,6 +321,11 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     }
 
     @Override
+    int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
+      return Binary.compareByteArrayToByteBuffer(value, 0, value.length, bytes, otherOffset, otherLength);
+    }
+
+    @Override
     public ByteBuffer toByteBuffer() {
       return ByteBuffer.wrap(value);
     }
@@ -330,36 +354,58 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
   }
 
   private static class ByteBufferBackedBinary extends Binary {
-    private transient ByteBuffer value;
-    private transient byte[] cachedBytes;
+    private ByteBuffer value;
+    private byte[] cachedBytes;
+    private final int offset;
+    private final int length;
 
-    public ByteBufferBackedBinary(ByteBuffer value, boolean isBackingBytesReused) {
+    public ByteBufferBackedBinary(ByteBuffer value, int offset, int length, boolean isBackingBytesReused) {
       this.value = value;
+      this.offset = offset;
+      this.length = length;
       this.isBackingBytesReused = isBackingBytesReused;
     }
 
     @Override
     public String toStringUsingUTF8() {
-      return UTF8.decode(value).toString();
+      int limit = value.limit();
+      value.limit(offset+length);
+      int position = value.position();
+      value.position(offset);
+      // no corresponding interface to read a subset of a buffer, would have to slice it
+      // which creates another ByteBuffer object or do what is done here to adjust the
+      // limit/offset and set them back after
+      String ret = UTF8.decode(value).toString();
+      value.limit(limit);
+      value.position(position);
+      return ret;
     }
 
     @Override
     public int length() {
-      return value.remaining();
+      return length;
     }
 
     @Override
     public void writeTo(OutputStream out) throws IOException {
-      // TODO: should not have to materialize those bytes
-      out.write(getBytesUnsafe());
+      if (value.hasArray()) {
+        out.write(value.array(), value.arrayOffset() + offset, length);
+      } else {
+        out.write(getBytesUnsafe(), 0, length);
+      }
     }
 
     @Override
     public byte[] getBytes() {
-      byte[] bytes = new byte[value.remaining()];
+      byte[] bytes = new byte[length];
 
-      value.mark();
-      value.get(bytes).reset();
+      int limit = value.limit();
+      value.limit(offset + length);
+      int position = value.position();
+      value.position(offset);
+      value.get(bytes);
+      value.limit(limit);
+      value.position(position);
       if (!isBackingBytesReused) { // backing buffer might change
         cachedBytes = bytes;
       }
@@ -375,60 +421,68 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     public Binary slice(int start, int length) {
       return Binary.fromConstantByteArray(getBytesUnsafe(), start, length);
     }
-
     @Override
     public int hashCode() {
       if (value.hasArray()) {
-        return Binary.hashCode(value.array(), value.arrayOffset() + value.position(),
-            value.arrayOffset() + value.remaining());
+        return Binary.hashCode(value.array(), value.arrayOffset() + offset, length);
+      } else {
+        return Binary.hashCode(value, offset, length);
       }
-      byte[] bytes = getBytesUnsafe();
-      return Binary.hashCode(bytes, 0, bytes.length);
     }
 
     @Override
     boolean equals(Binary other) {
       if (value.hasArray()) {
-        return other.equals(value.array(), value.arrayOffset() + value.position(),
-            value.arrayOffset() + value.remaining());
+        return other.equals(value.array(), value.arrayOffset() + offset, length);
+      } else {
+        return other.equals(value, offset, length);
       }
-      byte[] bytes = getBytesUnsafe();
-      return other.equals(bytes, 0, bytes.length);
     }
 
     @Override
     boolean equals(byte[] other, int otherOffset, int otherLength) {
       if (value.hasArray()) {
-        return Binary.equals(value.array(), value.arrayOffset() + value.position(),
-            value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
+        return Binary.equals(value.array(), value.arrayOffset() + offset, length, other, otherOffset, otherLength);
+      } else {
+        return Binary.equals(other, otherOffset, otherLength, value, offset, length);
       }
-      byte[] bytes = getBytesUnsafe();
-      return Binary.equals(bytes, 0, bytes.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    boolean equals(ByteBuffer otherBytes, int otherOffset, int otherLength) {
+      return Binary.equals(value, 0, length, otherBytes, otherOffset, otherLength);
     }
 
     @Override
     public int compareTo(Binary other) {
       if (value.hasArray()) {
-        return other.compareTo(value.array(), value.arrayOffset() + value.position(),
-            value.arrayOffset() + value.remaining());
+        return other.compareTo(value.array(), value.arrayOffset() + offset, length);
+      } else {
+        return other.compareTo(value, offset, length);
       }
-      byte[] bytes = getBytesUnsafe();
-      return other.compareTo(bytes, 0, bytes.length);
     }
 
     @Override
     int compareTo(byte[] other, int otherOffset, int otherLength) {
       if (value.hasArray()) {
-        return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + value.position(),
-            value.arrayOffset() + value.remaining(), other, otherOffset, otherLength);
+        return Binary.compareTwoByteArrays(value.array(), value.arrayOffset() + offset, length,
+            other, otherOffset, otherLength);
+      } {
+        return Binary.compareByteBufferToByteArray(value, offset, length, other, otherOffset, otherLength);
       }
-      byte[] bytes = getBytesUnsafe();
-      return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+    }
+
+    @Override
+    int compareTo(ByteBuffer bytes, int otherOffset, int otherLength) {
+      return Binary.compareTwoByteBuffers(value, offset, length, bytes, otherOffset, otherLength);
     }
 
     @Override
     public ByteBuffer toByteBuffer() {
-      return value;
+      ByteBuffer ret = value.slice();
+      ret.position(offset);
+      ret.limit(offset + length);
+      return ret;
     }
 
     @Override
@@ -456,12 +510,20 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
 
   }
 
+  public static Binary fromReusedByteBuffer(final ByteBuffer value, int offset, int length) {
+    return new ByteBufferBackedBinary(value, offset, length, true);
+  }
+
+  public static Binary fromConstantByteBuffer(final ByteBuffer value, int offset, int length) {
+    return new ByteBufferBackedBinary(value, offset, length, false);
+  }
+
   public static Binary fromReusedByteBuffer(final ByteBuffer value) {
-    return new ByteBufferBackedBinary(value, true);
+    return new ByteBufferBackedBinary(value, value.position(), value.remaining(), true);
   }
 
   public static Binary fromConstantByteBuffer(final ByteBuffer value) {
-    return new ByteBufferBackedBinary(value, false);
+    return new ByteBufferBackedBinary(value, value.position(), value.remaining(), false);
   }
 
   @Deprecated
@@ -492,6 +554,39 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     return result;
   }
 
+  private static final int hashCode(ByteBuffer buf, int offset, int length) {
+    int result = 1;
+    for (int i = offset; i < offset + length; i++) {
+      byte b = buf.get(i);
+      result = 31 * result + b;
+    }
+    return result;
+  }
+
+  private static final boolean equals(ByteBuffer buf1, int offset1, int length1, ByteBuffer buf2, int offset2, int length2) {
+    if (buf1 == null && buf2 == null) return true;
+    if (buf1 == null || buf2 == null) return false;
+    if (length1 != length2) return false;
+    for (int i = 0; i < length1; i++) {
+      if (buf1.get(i + offset1) != buf2.get(i + offset2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  private static final boolean equals(byte[] array1, int offset1, int length1, ByteBuffer buf, int offset2, int length2) {
+    if (array1 == null && buf == null) return true;
+    if (array1 == null || buf == null) return false;
+    if (length1 != length2) return false;
+    for (int i = 0; i < length1; i++) {
+      if (array1[i + offset1] != buf.get(i + offset2)) {
+        return false;
+      }
+    }
+    return true;
+  }
+
   /**
    * @see {@link Arrays#equals(byte[], byte[])}
    * @param array1
@@ -515,6 +610,47 @@ abstract public class Binary implements Comparable<Binary>, Serializable {
     return true;
   }
 
+  private static final int compareByteBufferToByteArray(ByteBuffer buf, int offset1, int length1,
+                                                        byte[] array, int offset2, int length2) {
+    return -1 * Binary.compareByteArrayToByteBuffer(array, offset1, length1, buf, offset2, length2);
+  }
+
+  private static final int compareByteArrayToByteBuffer(byte[] array1, int offset1, int length1,
+                                                        ByteBuffer buf, int offset2, int length2) {
+    if (array1 == null && buf == null) return 0;
+    int min_length = (length1 < length2) ? length1 : length2;
+    for (int i = 0; i < min_length; i++) {
+      if (array1[i + offset1] < buf.get(i + offset2)) {
+        return 1;
+      }
+      if (array1[i + offset1] > buf.get(i + offset2)) {
+        return -1;
+      }
+    }
+    // check remainder
+    if (length1 == length2) { return 0; }
+    else if (length1 < length2) { return 1;}
+    else { return -1; }
+  }
+
+  private static final int compareTwoByteBuffers(ByteBuffer buf1, int offset1, int length1,
+                                                        ByteBuffer buf2, int offset2, int length2) {
+    if (buf1 == null && buf2 == null) return 0;
+    int min_length = (length1 < length2) ? length1 : length2;
+    for (int i = 0; i < min_length; i++) {
+      if (buf1.get(i + offset1) < buf2.get(i + offset2)) {
+        return 1;
+      }
+      if (buf1.get(i + offset1) > buf2.get(i + offset2)) {
+        return -1;
+      }
+    }
+    // check remainder
+    if (length1 == length2) { return 0; }
+    else if (length1 < length2) { return 1;}
+    else { return -1; }
+  }
+
   private static final int compareTwoByteArrays(byte[] array1, int offset1, int length1,
                                                 byte[] array2, int offset2, int length2) {
     if (array1 == null && array2 == null) return 0;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
index 7988f4a..5c6e460 100644
--- a/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
+++ b/parquet-column/src/main/java/org/apache/parquet/schema/PrimitiveType.java
@@ -333,10 +333,9 @@ public final class PrimitiveType extends Type {
    * @param decimalMeta (optional) metadata about the decimal type
    * @param id the id of the field
    */
-  PrimitiveType(
-      Repetition repetition, PrimitiveTypeName primitive,
-      int length, String name, OriginalType originalType,
-      DecimalMetadata decimalMeta, ID id) {
+  public PrimitiveType(Repetition repetition, PrimitiveTypeName primitive,
+                       int length, String name, OriginalType originalType,
+                       DecimalMetadata decimalMeta, ID id) {
     super(name, repetition, originalType, id);
     this.primitive = primitive;
     this.length = length;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
index a1820e6..6792361 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestColumnReaderImpl.java
@@ -25,6 +25,7 @@ import java.util.List;
 
 import org.apache.parquet.Version;
 import org.apache.parquet.VersionParser;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.junit.Test;
 
 import org.apache.parquet.column.ColumnDescriptor;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
index 0327948..9bb2759 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
@@ -36,8 +36,10 @@ import org.apache.parquet.io.api.PrimitiveConverter;
 import org.apache.parquet.schema.PrimitiveType;
 import org.junit.Assert;
 import org.junit.Test;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 
 import java.lang.reflect.Field;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
@@ -74,16 +76,20 @@ public class TestCorruptDeltaByteArrays {
     assertFalse(CorruptDeltaByteArrays.requiresSequentialReads(fixed, Encoding.DELTA_BYTE_ARRAY));
   }
 
+  private DeltaByteArrayWriter getDeltaByteArrayWriter() {
+    return new DeltaByteArrayWriter(10, 100, new HeapByteBufferAllocator());
+  }
+
   @Test
   public void testReassemblyWithCorruptPage() throws Exception {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+    DeltaByteArrayWriter writer = getDeltaByteArrayWriter();
 
     String lastValue = null;
     for (int i = 0; i < 10; i += 1) {
       lastValue = str(i);
       writer.writeBytes(Binary.fromString(lastValue));
     }
-    byte[] firstPageBytes = writer.getBytes().toByteArray();
+    ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer();
 
     writer.reset(); // sets previous to new byte[0]
     corruptWriter(writer, lastValue);
@@ -91,7 +97,7 @@ public class TestCorruptDeltaByteArrays {
     for (int i = 10; i < 20; i += 1) {
       writer.writeBytes(Binary.fromString(str(i)));
     }
-    byte[] corruptPageBytes = writer.getBytes().toByteArray();
+    ByteBuffer corruptPageBytes = writer.getBytes().toByteBuffer();
 
     DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
     firstPageReader.initFromPage(10, firstPageBytes, 0);
@@ -119,19 +125,19 @@ public class TestCorruptDeltaByteArrays {
 
   @Test
   public void testReassemblyWithoutCorruption() throws Exception {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+    DeltaByteArrayWriter writer = getDeltaByteArrayWriter();
 
     for (int i = 0; i < 10; i += 1) {
       writer.writeBytes(Binary.fromString(str(i)));
     }
-    byte[] firstPageBytes = writer.getBytes().toByteArray();
+    ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer();
 
     writer.reset(); // sets previous to new byte[0]
 
     for (int i = 10; i < 20; i += 1) {
       writer.writeBytes(Binary.fromString(str(i)));
     }
-    byte[] secondPageBytes = writer.getBytes().toByteArray();
+    ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer();
 
     DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
     firstPageReader.initFromPage(10, firstPageBytes, 0);
@@ -150,19 +156,19 @@ public class TestCorruptDeltaByteArrays {
 
   @Test
   public void testOldReassemblyWithoutCorruption() throws Exception {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+    DeltaByteArrayWriter writer = getDeltaByteArrayWriter();
 
     for (int i = 0; i < 10; i += 1) {
       writer.writeBytes(Binary.fromString(str(i)));
     }
-    byte[] firstPageBytes = writer.getBytes().toByteArray();
+    ByteBuffer firstPageBytes = writer.getBytes().toByteBuffer();
 
     writer.reset(); // sets previous to new byte[0]
 
     for (int i = 10; i < 20; i += 1) {
       writer.writeBytes(Binary.fromString(str(i)));
     }
-    byte[] secondPageBytes = writer.getBytes().toByteArray();
+    ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer();
 
     DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
     firstPageReader.initFromPage(10, firstPageBytes, 0);
@@ -185,15 +191,16 @@ public class TestCorruptDeltaByteArrays {
     MemPageStore pages = new MemPageStore(0);
     PageWriter memWriter = pages.getPageWriter(column);
 
+    ParquetProperties parquetProps = new ParquetProperties(0, ParquetProperties.WriterVersion.PARQUET_1_0, false, new HeapByteBufferAllocator());
+
     // get generic repetition and definition level bytes to use for pages
-    ValuesWriter rdValues = ParquetProperties
-        .getColumnDescriptorValuesWriter(0, 10, 100);
+    ValuesWriter rdValues = parquetProps.getColumnDescriptorValuesWriter(0, 10, 100);
     for (int i = 0; i < 10; i += 1) {
       rdValues.writeInteger(0);
     }
     // use a byte array backed BytesInput because it is reused
     BytesInput rd = BytesInput.from(rdValues.getBytes().toByteArray());
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(10, 100);
+    DeltaByteArrayWriter writer = getDeltaByteArrayWriter();
     String lastValue = null;
     List<String> values = new ArrayList<String>();
     for (int i = 0; i < 10; i += 1) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
index 135123f..044fe2a 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/mem/TestMemColumn.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.mem;
 
 import static org.junit.Assert.assertEquals;
 
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.junit.Test;
 
 import org.apache.parquet.Log;
@@ -160,6 +161,6 @@ public class TestMemColumn {
   }
 
   private ColumnWriteStoreV1 newColumnWriteStoreImpl(MemPageStore memPageStore) {
-    return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0);
+    return new ColumnWriteStoreV1(memPageStore, 2048, 2048, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
index d5bfe22..ddab636 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/page/mem/MemPageWriter.java
@@ -109,5 +109,4 @@ public class MemPageWriter implements PageWriter {
     return String.format("%s %,d bytes", prefix, memSize);
 
   }
-
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
index c9a62b4..8caad2b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.values;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.apache.parquet.io.api.Binary;
@@ -61,7 +62,7 @@ public class Utils {
   public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length)
       throws IOException {
     Binary[] bins = new Binary[length];
-    reader.initFromPage(length, data, 0);
+    reader.initFromPage(length, ByteBuffer.wrap(data), 0);
     for(int i=0; i < length; i++) {
       bins[i] = reader.readBytes();
     }
@@ -76,7 +77,7 @@ public class Utils {
   public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length)
       throws IOException {
     int[] ints = new int[length];
-    reader.initFromPage(length, data, offset);
+    reader.initFromPage(length, ByteBuffer.wrap(data), offset);
     for(int i=0; i < length; i++) {
       ints[i] = reader.readInteger();
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
index e74e787..2733b72 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.bitpacking;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
@@ -87,7 +88,7 @@ public class BitPackingPerfTest {
     System.out.print(" no gc <");
     for (int k = 0; k < N; k++) {
       long t2 = System.nanoTime();
-      r.initFromPage(result.length, bytes, 0);
+      r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0);
       for (int i = 0; i < result.length; i++) {
         result[i] = r.readInteger();
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
index 2f311ec..aef259c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
@@ -23,9 +23,11 @@ import static org.junit.Assert.assertEquals;
 import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.ValuesWriter;
@@ -172,7 +174,7 @@ public class TestBitPackingColumn {
       LOG.debug("bytes: " + TestBitPacking.toString(bytes));
       assertEquals(type.toString(), expected, TestBitPacking.toString(bytes));
       ValuesReader r = type.getReader(bound);
-      r.initFromPage(vals.length, bytes, 0);
+      r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0);
       int[] result = new int[vals.length];
       for (int i = 0; i < result.length; i++) {
         result[i] = r.readInteger();
@@ -188,7 +190,7 @@ public class TestBitPackingColumn {
         return new BitPackingValuesReader(bound);
       }
       public ValuesWriter getWriter(final int bound) {
-        return new BitPackingValuesWriter(bound, 32*1024, 64*1024);
+        return new BitPackingValuesWriter(bound, 32*1024, 64*1024, new DirectByteBufferAllocator());
       }
     }
     ,

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
index ba979b7..d1e43d2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/boundedint/TestBoundedColumns.java
@@ -23,11 +23,14 @@ import static org.junit.Assert.assertTrue;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Random;
 
 import org.junit.Test;
 
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
+
 public class TestBoundedColumns {
   private final Random r = new Random(42L);
 
@@ -54,7 +57,7 @@ public class TestBoundedColumns {
   }
 
   private void compareOutput(int bound, int[] ints, String[] result) throws IOException {
-    BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024);
+    BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64*1024, 64*1024, new DirectByteBufferAllocator());
     for (int i : ints) {
       bicw.writeInteger(i);
     }
@@ -63,7 +66,7 @@ public class TestBoundedColumns {
     byte[] byteArray = bicw.getBytes().toByteArray();
     assertEquals(concat(result), toBinaryString(byteArray, 4));
     BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound);
-    bicr.initFromPage(1, byteArray, 0);
+    bicr.initFromPage(1, ByteBuffer.wrap(byteArray), 0);
     String expected = "";
     String got = "";
     for (int i : ints) {
@@ -123,7 +126,7 @@ public class TestBoundedColumns {
       ByteArrayOutputStream tmp = new ByteArrayOutputStream();
 
       int[] stream = new int[totalValuesInStream];
-      BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024);
+      BoundedIntValuesWriter bicw = new BoundedIntValuesWriter(bound, 64 * 1024, 64*1024, new DirectByteBufferAllocator());
       int idx = 0;
       for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) {
         int next = 0;
@@ -155,7 +158,7 @@ public class TestBoundedColumns {
       idx = 0;
       int offset = 0;
       for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) {
-        bicr.initFromPage(1, input, offset);
+        bicr.initFromPage(1, ByteBuffer.wrap(input), offset);
         offset = bicr.getNextOffset();
         for (int i = 0; i < valuesPerStripe[stripeNum]; i++) {
           int number = stream[idx++];

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
index d428fbf..6308e47 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
@@ -22,11 +22,13 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.junit.Before;
 import org.junit.Test;
 
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -42,13 +44,13 @@ public class DeltaBinaryPackingValuesWriterTest {
   public void setUp() {
     blockSize = 128;
     miniBlockNum = 4;
-    writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200);
+    writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 200, new DirectByteBufferAllocator());
     random = new Random();
   }
 
   @Test(expected = IllegalArgumentException.class)
   public void miniBlockSizeShouldBeMultipleOf8() {
-    new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100);
+    new DeltaBinaryPackingValuesWriter(1281, 4, 100, 100, new DirectByteBufferAllocator());
   }
 
   /* When data size is multiple of Block*/
@@ -154,7 +156,7 @@ public class DeltaBinaryPackingValuesWriterTest {
     System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
 
     //offset should be correct
-    reader.initFromPage(100, pageContent, contentOffsetInPage);
+    reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
     int offset= reader.getNextOffset();
     assertEquals(valueContent.length + contentOffsetInPage, offset);
 
@@ -187,7 +189,7 @@ public class DeltaBinaryPackingValuesWriterTest {
     }
     writeData(data);
     reader = new DeltaBinaryPackingValuesReader();
-    reader.initFromPage(100, writer.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
     for (int i = 0; i < data.length; i++) {
       if (i % 3 == 0) {
         reader.skip();
@@ -243,7 +245,7 @@ public class DeltaBinaryPackingValuesWriterTest {
         + blockFlushed * miniBlockNum //bitWidth of mini blocks
         + (5.0 * blockFlushed);//min delta for each block
     assertTrue(estimatedSize >= page.length);
-    reader.initFromPage(100, page, 0);
+    reader.initFromPage(100, ByteBuffer.wrap(page), 0);
 
     for (int i = 0; i < length; i++) {
       assertEquals(data[i], reader.readInteger());

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
index dc69fcc..40f6bfc 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkIntegerOutputSize.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.values.delta.benchmark;
 
 import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import java.util.Random;
@@ -77,8 +78,8 @@ public class BenchmarkIntegerOutputSize {
   }
 
   public void testRandomIntegers(IntFunc func,int bitWidth) {
-    DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000);
-    RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000);
+    DeltaBinaryPackingValuesWriter delta=new DeltaBinaryPackingValuesWriter(blockSize,miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+    RunLengthBitPackingHybridValuesWriter rle= new RunLengthBitPackingHybridValuesWriter(bitWidth, 100, 20000, new DirectByteBufferAllocator());
     for (int i = 0; i < dataSize; i++) {
       int v = func.getIntValue();
       delta.writeInteger(v);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
index 24b007f..4ad5dad 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
@@ -25,6 +25,7 @@ import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -33,6 +34,7 @@ import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesReade
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 @AxisRange(min = 0, max = 1)
@@ -54,8 +56,8 @@ public class BenchmarkReadingRandomIntegers {
       data[i] = random.nextInt(100) - 200;
     }
 
-    ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
-    ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000);
+    ValuesWriter delta = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
+    ValuesWriter rle = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000, new DirectByteBufferAllocator());
 
     for (int i = 0; i < data.length; i++) {
       delta.writeInteger(data[i]);
@@ -86,7 +88,7 @@ public class BenchmarkReadingRandomIntegers {
   }
 
   private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException {
-    reader.initFromPage(data.length, deltaBytes, 0);
+    reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0);
     for (int i = 0; i < data.length; i++) {
       reader.readInteger();
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
index 50c97cf..80e6533 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/RandomWritingBenchmarkTest.java
@@ -25,6 +25,7 @@ import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
 import org.junit.BeforeClass;
 import org.junit.Rule;
 import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesWriter;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
@@ -50,21 +51,21 @@ public class RandomWritingBenchmarkTest extends BenchMarkTest{
   @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
   @Test
   public void writeDeltaPackingTest(){
-    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
+    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
     runWriteTest(writer);
   }
 
   @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
   @Test
   public void writeRLETest(){
-    ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000);
+    ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(32, 100, 20000, new DirectByteBufferAllocator());
     runWriteTest(writer);
   }
 
   @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
   @Test
   public void writeDeltaPackingTest2(){
-    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000);
+    DeltaBinaryPackingValuesWriter writer = new DeltaBinaryPackingValuesWriter(blockSize, miniBlockNum, 100, 20000, new DirectByteBufferAllocator());
     runWriteTest(writer);
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
index 3141fd7..0dc7cb0 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/SmallRangeWritingBenchmarkTest.java
@@ -23,6 +23,7 @@ import com.carrotsearch.junitbenchmarks.annotation.AxisRange;
 import com.carrotsearch.junitbenchmarks.annotation.BenchmarkMethodChart;
 import org.junit.BeforeClass;
 import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.ValuesWriter;
 import org.apache.parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 import java.util.Random;
@@ -42,7 +43,7 @@ public class SmallRangeWritingBenchmarkTest extends RandomWritingBenchmarkTest {
   @BenchmarkOptions(benchmarkRounds = 10, warmupRounds = 2)
   @Test
   public void writeRLEWithSmallBitWidthTest(){
-    ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000);
+    ValuesWriter writer = new RunLengthBitPackingHybridValuesWriter(2, 100, 20000, new DirectByteBufferAllocator());
     runWriteTest(writer);
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
index aaae064..d7ebee5 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.junit.Test;
 import org.junit.Assert;
 
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.Utils;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -32,11 +33,15 @@ public class TestDeltaLengthByteArray {
 
   String[] values = { "parquet", "hadoop", "mapreduce"};
 
+  private DeltaLengthByteArrayValuesWriter getDeltaLengthByteArrayValuesWriter() {
+    return new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
+  }
+
   @Test
   public void testSerialization () throws IOException {
-    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+    DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter();
     DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
-
+    
     Utils.writeData(writer, values);
     Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
 
@@ -47,7 +52,7 @@ public class TestDeltaLengthByteArray {
 
   @Test
   public void testRandomStrings() throws IOException {
-    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+    DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter();
     DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
 
     String[] values = Utils.getRandomStringSamples(1000, 32);
@@ -61,7 +66,7 @@ public class TestDeltaLengthByteArray {
 
   @Test
   public void testLengths() throws IOException {
-    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+    DeltaLengthByteArrayValuesWriter writer = getDeltaLengthByteArrayValuesWriter();
     ValuesReader reader = new DeltaBinaryPackingValuesReader();
 
     Utils.writeData(writer, values);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
index f5f9d76..69c5e15 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.Utils;
 import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesReader;
 import org.apache.parquet.column.values.deltalengthbytearray.DeltaLengthByteArrayValuesWriter;
@@ -47,7 +48,7 @@ public class BenchmarkDeltaLengthByteArray {
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
   @Test
   public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException {
-    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
 
     Utils.writeData(writer, values);
@@ -59,7 +60,7 @@ public class BenchmarkDeltaLengthByteArray {
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
   @Test
   public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
-    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024);
+    DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
 
     Utils.writeData(writer, values);