You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@parquet.apache.org by "ASF GitHub Bot (JIRA)" <ji...@apache.org> on 2018/06/05 22:18:00 UTC

[jira] [Commented] (PARQUET-77) Improvements in ByteBuffer read path

    [ https://issues.apache.org/jira/browse/PARQUET-77?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16502575#comment-16502575 ] 

ASF GitHub Bot commented on PARQUET-77:
---------------------------------------

parthchandra closed pull request #49: PARQUET-77 zero copy improvements
URL: https://github.com/apache/parquet-mr/pull/49
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
index a58bfd9ec..f06ab9412 100644
--- a/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/parquet/column/impl/ColumnReaderImpl.java
@@ -20,6 +20,7 @@
 import static parquet.Preconditions.checkNotNull;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.column.ColumnDescriptor;
@@ -518,16 +519,16 @@ private void readPage() {
     this.pageValueCount = page.getValueCount();
     this.endOfPageValueCount = readValues + pageValueCount;
     try {
-      byte[] bytes = page.getBytes().toByteArray();
-      if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
+      ByteBuffer byteBuf = page.getBytes().toByteBuffer();
+      if (DEBUG) LOG.debug("page size " + page.getBytes().size() + " bytes and " + pageValueCount + " records");
       if (DEBUG) LOG.debug("reading repetition levels at 0");
-      repetitionLevelColumn.initFromPage(pageValueCount, bytes, 0);
+      repetitionLevelColumn.initFromPage(pageValueCount, byteBuf, 0);
       int next = repetitionLevelColumn.getNextOffset();
       if (DEBUG) LOG.debug("reading definition levels at " + next);
-      definitionLevelColumn.initFromPage(pageValueCount, bytes, next);
+      definitionLevelColumn.initFromPage(pageValueCount, byteBuf, next);
       next = definitionLevelColumn.getNextOffset();
       if (DEBUG) LOG.debug("reading data at " + next);
-      dataColumn.initFromPage(pageValueCount, bytes, next);
+      dataColumn.initFromPage(pageValueCount, byteBuf, next);
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
     }
diff --git a/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java b/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java
index f125b2f0e..6f0d1d8dc 100644
--- a/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java
+++ b/parquet-column/src/main/java/parquet/column/statistics/BinaryStatistics.java
@@ -67,13 +67,13 @@ public String toString() {
   }
 
   public void updateStats(Binary min_value, Binary max_value) {
-    if (min.compareTo(min_value) > 0) { min = min_value; }
-    if (max.compareTo(max_value) < 0) { max = max_value; }
+    if (min.compareTo(min_value) > 0) { min = Binary.fromByteArray(min_value.getBytes()); }
+    if (max.compareTo(max_value) < 0) { max = Binary.fromByteArray(max_value.getBytes()); }
   }
 
   public void initializeStats(Binary min_value, Binary max_value) {
-      min = min_value;
-      max = max_value;
+      min = Binary.fromByteArray(min_value.getBytes());
+      max = Binary.fromByteArray(max_value.getBytes());
       this.markAsNotEmpty();
   }
 
diff --git a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java
index ec2c0386a..3cf5b5cfc 100644
--- a/parquet-column/src/main/java/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/ValuesReader.java
@@ -19,11 +19,12 @@
 
 import parquet.io.ParquetDecodingException;
 import parquet.io.api.Binary;
+import java.nio.ByteBuffer;
 
 /**
  * Base class to implement an encoding for a given column type.
  *
- * A ValuesReader is provided with a page (byte-array) and is responsible
+ * A ValuesReader is provided with a page (byte-buffer) and is responsible
  * for deserializing the primitive values stored in that page.
  *
  * Given that pages are homogeneous (store only a single type), typical subclasses
@@ -55,6 +56,11 @@
    *
    * @throws IOException
    */
+  public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;
+  
+  /*
+   * Compatitble Interface.
+   */
   public abstract void initFromPage(int valueCount, byte[] page, int offset) throws IOException;
   
   /**
diff --git a/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesReader.java
index 7d8513a4f..74ed80dd1 100644
--- a/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/bitpacking/BitPackingValuesReader.java
@@ -18,11 +18,12 @@
 import static parquet.bytes.BytesUtils.getWidthFromMaxInt;
 import static parquet.column.values.bitpacking.BitPacking.createBitPackingReader;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.bytes.BytesUtils;
+import parquet.bytes.ByteBufferInputStream;
 import parquet.column.values.ValuesReader;
 import parquet.column.values.bitpacking.BitPacking.BitPackingReader;
 import parquet.io.ParquetDecodingException;
@@ -36,7 +37,7 @@
 public class BitPackingValuesReader extends ValuesReader {
   private static final Log LOG = Log.getLog(BitPackingValuesReader.class);
 
-  private ByteArrayInputStream in;
+  private ByteBufferInputStream in;
   private BitPackingReader bitPackingReader;
   private final int bitsPerValue;
   private int nextOffset;
@@ -66,15 +67,20 @@ public int readInteger() {
    * @see parquet.column.values.ValuesReader#initFromPage(long, byte[], int)
    */
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
     int effectiveBitLength = valueCount * bitsPerValue;
     int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
     if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." );
-    this.in = new ByteArrayInputStream(in, offset, length);
+    this.in = new ByteBufferInputStream(in.duplicate(), offset, length);
     this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
     this.nextOffset = offset + length;
   }
   
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+  
   @Override
   public int getNextOffset() {
     return nextOffset;
diff --git a/parquet-column/src/main/java/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
index 22a6d92c0..8af93f149 100644
--- a/parquet-column/src/main/java/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
@@ -17,6 +17,7 @@
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.bytes.BytesUtils;
@@ -31,7 +32,7 @@
   private final BytePacker packer;
   private final int[] decoded = new int[VALUES_AT_A_TIME];
   private int decodedPosition = VALUES_AT_A_TIME - 1;
-  private byte[] encoded;
+  private ByteBuffer encoded;
   private int encodedPos;
   private int nextOffset;
 
@@ -39,16 +40,21 @@ public ByteBitPackingValuesReader(int bound, Packer packer) {
     this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
     this.packer = packer.newBytePacker(bitWidth);
   }
-
+  
   @Override
   public int readInteger() {
     ++ decodedPosition;
     if (decodedPosition == decoded.length) {
-      if (encodedPos + bitWidth > encoded.length) {
-        packer.unpack8Values(Arrays.copyOfRange(encoded, encodedPos, encodedPos + bitWidth), 0, decoded, 0);
+      encoded.position(encodedPos);
+      if (encodedPos + bitWidth > encoded.limit()) {
+        // unpack8Values needs at least bitWidth bytes to read from,
+        // We have to fill in 0 byte at the end of encoded bytes.
+        byte[] tempEncode = new byte[bitWidth];
+        encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
+        packer.unpack8Values(ByteBuffer.wrap(tempEncode), 0, decoded, 0);
       } else {
         packer.unpack8Values(encoded, encodedPos, decoded, 0);
-      }
+      }      
       encodedPos += bitWidth;
       decodedPosition = 0;
     }
@@ -56,7 +62,7 @@ public int readInteger() {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] page, int offset)
+  public void initFromPage(int valueCount, ByteBuffer page, int offset)
       throws IOException {
     int effectiveBitLength = valueCount * bitWidth;
     int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
@@ -67,6 +73,11 @@ public void initFromPage(int valueCount, byte[] page, int offset)
     this.nextOffset = offset + length;
   }
   
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+  
   @Override
   public int getNextOffset() {
     return nextOffset;
diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BitReader.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BitReader.java
index a7abd7dbe..6b4bc4b5c 100644
--- a/parquet-column/src/main/java/parquet/column/values/boundedint/BitReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BitReader.java
@@ -16,13 +16,14 @@
 package parquet.column.values.boundedint;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.io.ParquetDecodingException;
 
 class BitReader {
   private int currentByte = 0;
   private int currentPosition = 8;
-  private byte[] buf;
+  private ByteBuffer buf;
   private int currentBufferPosition = 0;
   private static final int[] byteGetValueMask = new int[8];
   private static final int[] readMask = new int[32];
@@ -47,7 +48,7 @@
    * The array is not copied, so must not be mutated during the course of
    * reading.
    */
-  public void prepare(byte[] buf, int offset, int length) {
+  public void prepare(ByteBuffer buf, int offset, int length) {
     this.buf = buf;
     this.endBufferPosistion = offset + length;
     currentByte = 0;
@@ -84,7 +85,7 @@ public int readNBitInteger(int bitsPerValue) {
 
   private int getNextByte() {
     if (currentBufferPosition < endBufferPosistion) {
-      return buf[currentBufferPosition++] & 0xFF;
+      return buf.get(currentBufferPosition++) & 0xFF;
     }
     return 0;
   }
diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesReader.java b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesReader.java
index 9244ee24f..61928e40b 100644
--- a/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/boundedint/BoundedIntValuesReader.java
@@ -18,6 +18,7 @@
 import static parquet.Log.DEBUG;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.bytes.BytesUtils;
@@ -67,8 +68,8 @@ public int readInteger() {
   // bytes would have to be serialized). This is the flip-side
   // to BoundedIntColumnWriter.writeData(BytesOutput)
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
-    if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in[offset] + " " + in[offset + 1] + " " + in[offset + 2] + " " + in[offset + 3] + " ");
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+    if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in.get(offset) + " " + in.get(offset + 1) + " " + in.get(offset + 2) + " " + in.get(offset + 3) + " ");
     int totalBytes = BytesUtils.readIntLittleEndian(in, offset);
     if (DEBUG) LOG.debug("will read "+ totalBytes + " bytes");
     currentValueCt = 0;
@@ -78,6 +79,11 @@ public void initFromPage(int valueCount, byte[] in, int offset) throws IOExcepti
     this.nextOffset = offset + totalBytes + 4;
   }
   
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+  
   @Override
   public int getNextOffset() {
     return this.nextOffset;
diff --git a/parquet-column/src/main/java/parquet/column/values/boundedint/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
index 9cab44c82..905eafdc1 100644
--- a/parquet-column/src/main/java/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
@@ -16,6 +16,7 @@
 package parquet.column.values.boundedint;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.column.values.ValuesReader;
 
@@ -33,10 +34,15 @@ public int readInteger() {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
     this.nextOffset = offset;
   }
   
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+  
   @Override
   public int getNextOffset() {
     return nextOffset;
diff --git a/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index e79ac6efb..0acf2ecd5 100644
--- a/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -21,9 +21,11 @@
 import parquet.column.values.bitpacking.BytePacker;
 import parquet.column.values.bitpacking.Packer;
 import parquet.io.ParquetDecodingException;
+import parquet.bytes.ByteBufferInputStream;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
 
 /**
  * Read values written by {@link DeltaBinaryPackingValuesWriter}
@@ -37,7 +39,7 @@
    */
   private int valuesRead;
   private int minDeltaInCurrentBlock;
-  private byte[] page;
+  private ByteBuffer page;
   /**
    * stores the decoded values including the first value which is written to the header
    */
@@ -47,7 +49,7 @@
    * when data is not aligned to mini block, which means padding 0s are in the buffer
    */
   private int valuesBuffered;
-  private ByteArrayInputStream in;
+  private ByteBufferInputStream in;
   private int nextOffset;
   private DeltaBinaryPackingConfig config;
   private int[] bitWidths;
@@ -61,8 +63,8 @@
    * @throws IOException
    */
   @Override
-  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
-    in = new ByteArrayInputStream(page, offset, page.length - offset);
+  public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
+    in = new ByteBufferInputStream(page.duplicate(), offset, page.limit() - offset);
     this.config = DeltaBinaryPackingConfig.readConfig(in);
     this.page = page;
     this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
@@ -75,7 +77,12 @@ public void initFromPage(int valueCount, byte[] page, int offset) throws IOExcep
     while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
       loadNewBlockToBuffer();
     }
-    this.nextOffset = page.length - in.available();
+    this.nextOffset = page.limit() - in.available();
+  }
+  
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
   }
   
   @Override
@@ -148,7 +155,7 @@ private void unpackMiniBlock(BytePacker packer) {
 
   private void unpack8Values(BytePacker packer) {
     //calculate the pos because the packer api uses array not stream
-    int pos = page.length - in.available();
+    int pos = page.limit() - in.available();
     packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
     this.valuesBuffered += 8;
     //sync the pos in stream
diff --git a/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index 902f1a450..467f49832 100644
--- a/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -18,6 +18,7 @@
 import static parquet.Log.DEBUG;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.column.values.ValuesReader;
@@ -34,7 +35,7 @@
 
   private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesReader.class);
   private ValuesReader lengthReader;
-  private byte[] in;
+  private ByteBuffer in;
   private int offset;
 
   public DeltaLengthByteArrayValuesReader() {
@@ -42,21 +43,26 @@ public DeltaLengthByteArrayValuesReader() {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset)
+  public void initFromPage(int valueCount, ByteBuffer in, int offset)
       throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
     lengthReader.initFromPage(valueCount, in, offset);
     offset = lengthReader.getNextOffset();
     this.in = in;
     this.offset = offset;
   }
 
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+  
   @Override
   public Binary readBytes() {
     int length = lengthReader.readInteger();
     int start = offset;
     offset = start + length;
-    return Binary.fromByteArray(in, start, length);
+    return Binary.fromByteBuffer(in, start, length);
   }
 
   @Override
diff --git a/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayReader.java
index f62a74695..723a31d48 100644
--- a/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -16,6 +16,7 @@
 package parquet.column.values.deltastrings;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.column.values.ValuesReader;
 import parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -41,12 +42,17 @@ public DeltaByteArrayReader() {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] page, int offset)
+  public void initFromPage(int valueCount, ByteBuffer page, int offset)
       throws IOException {
     prefixLengthReader.initFromPage(valueCount, page, offset);
     int next = prefixLengthReader.getNextOffset();
     suffixReader.initFromPage(valueCount, page, next);	
   }
+  
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
 
   @Override
   public void skip() {
diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java
index 5c105ed73..23f8644ed 100644
--- a/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -17,10 +17,12 @@
 
 import static parquet.Log.DEBUG;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
+import parquet.bytes.ByteBufferInputStream;
 import parquet.bytes.BytesUtils;
 import parquet.column.Dictionary;
 import parquet.column.values.ValuesReader;
@@ -37,7 +39,7 @@
 public class DictionaryValuesReader extends ValuesReader {
   private static final Log LOG = Log.getLog(DictionaryValuesReader.class);
 
-  private ByteArrayInputStream in;
+  private ByteBufferInputStream in;
 
   private Dictionary dictionary;
 
@@ -48,12 +50,12 @@ public DictionaryValuesReader(Dictionary dictionary) {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] page, int offset)
+  public void initFromPage(int valueCount, ByteBuffer page, int offset)
       throws IOException {
-    this.in = new ByteArrayInputStream(page, offset, page.length - offset);
-    if (page.length - offset > 0) {
+    this.in = new ByteBufferInputStream(page, offset, page.limit() - offset);
+    if (page.limit() - offset > 0) {
       if (DEBUG)
-        LOG.debug("init from page at offset " + offset + " for length " + (page.length - offset));
+        LOG.debug("init from page at offset " + offset + " for length " + (page.limit() - offset));
       int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
       if (DEBUG) LOG.debug("bit width " + bitWidth);
       decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
@@ -66,7 +68,12 @@ public int readInt() throws IOException {
       };
     }
   }
-
+  
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+  
   @Override
   public int readValueDictionaryId() {
     try {
diff --git a/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java
index 33deab87c..3e6ec2a1a 100644
--- a/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java
+++ b/parquet-column/src/main/java/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -19,6 +19,7 @@
 import static parquet.column.Encoding.PLAIN_DICTIONARY;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.Preconditions;
 import parquet.column.Dictionary;
@@ -81,17 +82,18 @@ public PlainBinaryDictionary(DictionaryPage dictionaryPage) throws IOException {
      */
     public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryBytes = dictionaryPage.getBytes().toByteBuffer();
       binaryDictionaryContent = new Binary[dictionaryPage.getDictionarySize()];
-      int offset = 0;
+      int offset = dictionaryBytes.position();
       if (length == null) {
         // dictionary values are stored in order: size (4 bytes LE) followed by {size} bytes
+        
         for (int i = 0; i < binaryDictionaryContent.length; i++) {
           int len = readIntLittleEndian(dictionaryBytes, offset);
           // read the length
           offset += 4;
           // wrap the content in a binary
-          binaryDictionaryContent[i] = Binary.fromByteArray(dictionaryBytes, offset, len);
+          binaryDictionaryContent[i] = Binary.fromByteBuffer(dictionaryBytes, offset, len);
           // increment to the next value
           offset += len;
         }
@@ -101,14 +103,14 @@ public PlainBinaryDictionary(DictionaryPage dictionaryPage, Integer length) thro
             "Invalid byte array length: " + length);
         for (int i = 0; i < binaryDictionaryContent.length; i++) {
           // wrap the content in a Binary
-          binaryDictionaryContent[i] = Binary.fromByteArray(
+          binaryDictionaryContent[i] = Binary.fromByteBuffer(
               dictionaryBytes, offset, length);
           // increment to the next value
           offset += length;
         }
       }
     }
-
+    
     @Override
     public Binary decodeToBinary(int id) {
       return binaryDictionaryContent[id];
@@ -143,10 +145,10 @@ public int getMaxId() {
      */
     public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
       longDictionaryContent = new long[dictionaryPage.getDictionarySize()];
       LongPlainValuesReader longReader = new LongPlainValuesReader();
-      longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
       for (int i = 0; i < longDictionaryContent.length; i++) {
         longDictionaryContent[i] = longReader.readLong();
       }
@@ -186,10 +188,10 @@ public int getMaxId() {
      */
     public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
       doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()];
       DoublePlainValuesReader doubleReader = new DoublePlainValuesReader();
-      doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
       for (int i = 0; i < doubleDictionaryContent.length; i++) {
         doubleDictionaryContent[i] = doubleReader.readDouble();
       }
@@ -229,10 +231,10 @@ public int getMaxId() {
      */
     public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
       intDictionaryContent = new int[dictionaryPage.getDictionarySize()];
       IntegerPlainValuesReader intReader = new IntegerPlainValuesReader();
-      intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
       for (int i = 0; i < intDictionaryContent.length; i++) {
         intDictionaryContent[i] = intReader.readInteger();
       }
@@ -272,10 +274,10 @@ public int getMaxId() {
      */
     public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final byte[] dictionaryBytes = dictionaryPage.getBytes().toByteArray();
+      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
       floatDictionaryContent = new float[dictionaryPage.getDictionarySize()];
       FloatPlainValuesReader floatReader = new FloatPlainValuesReader();
-      floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryBytes, 0);
+      floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
       for (int i = 0; i < floatDictionaryContent.length; i++) {
         floatDictionaryContent[i] = floatReader.readFloat();
       }
diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java
index e1d890633..d884ed8dc 100644
--- a/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -18,6 +18,7 @@
 import static parquet.Log.DEBUG;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.bytes.BytesUtils;
@@ -27,7 +28,7 @@
 
 public class BinaryPlainValuesReader extends ValuesReader {
   private static final Log LOG = Log.getLog(BinaryPlainValuesReader.class);
-  private byte[] in;
+  private ByteBuffer in;
   private int offset;
 
   @Override
@@ -36,7 +37,7 @@ public Binary readBytes() {
       int length = BytesUtils.readIntLittleEndian(in, offset);
       int start = offset + 4;
       offset = start + length;
-      return Binary.fromByteArray(in, start, length);
+      return Binary.fromByteBuffer(in, start, length);
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
     } catch (RuntimeException e) {
@@ -57,11 +58,15 @@ public void skip() {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset)
+  public void initFromPage(int valueCount, ByteBuffer in, int offset)
       throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
     this.in = in;
     this.offset = offset;
   }
 
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
 }
diff --git a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java
index e349f876a..96bfb72c6 100644
--- a/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -19,6 +19,7 @@
 import static parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.column.values.ValuesReader;
@@ -59,14 +60,19 @@ public void skip() {
    * @see parquet.column.values.ValuesReader#initFromPage(byte[], int)
    */
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
     this.in.initFromPage(valueCount, in, offset);
   }
   
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+  
   @Override
   public int getNextOffset() {
     return this.in.getNextOffset();
   }
 
-}
\ No newline at end of file
+}
diff --git a/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 084da9cda..6ba6e8484 100644
--- a/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -16,6 +16,7 @@
 package parquet.column.values.plain;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import parquet.Log;
 import parquet.column.values.ValuesReader;
 import parquet.io.ParquetDecodingException;
@@ -30,7 +31,7 @@
  */
 public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
   private static final Log LOG = Log.getLog(FixedLenByteArrayPlainValuesReader.class);
-  private byte[] in;
+  private ByteBuffer in;
   private int offset;
   private int length;
 
@@ -43,7 +44,7 @@ public Binary readBytes() {
     try {
       int start = offset;
       offset = start + length;
-      return Binary.fromByteArray(in, start, length);
+      return Binary.fromByteBuffer(in, start, length);
     } catch (RuntimeException e) {
       throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
     }
@@ -55,10 +56,15 @@ public void skip() {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset)
+  public void initFromPage(int valueCount, ByteBuffer in, int offset)
       throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
     this.in = in;
     this.offset = offset;
   }
+  
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
 }
diff --git a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java
index 27702ad1c..572512113 100644
--- a/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/plain/PlainValuesReader.java
@@ -17,11 +17,12 @@
 
 import static parquet.Log.DEBUG;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.bytes.LittleEndianDataInputStream;
+import parquet.bytes.ByteBufferInputStream;
 import parquet.column.values.ValuesReader;
 import parquet.io.ParquetDecodingException;
 
@@ -41,9 +42,14 @@
    * @see parquet.column.values.ValuesReader#initFromPage(byte[], int)
    */
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
-    this.in = new LittleEndianDataInputStream(new ByteArrayInputStream(in, offset, in.length - offset));
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
+    this.in = new LittleEndianDataInputStream(new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset));
+  }
+  
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
   }
 
   public static class DoublePlainValuesReader extends PlainValuesReader {
@@ -129,4 +135,4 @@ public long readLong() {
       }
     }
   }
-}
\ No newline at end of file
+}
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index 04d3eeb3c..051ad212a 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -21,9 +21,12 @@
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
 
 import parquet.Log;
 import parquet.Preconditions;
+import parquet.bytes.ByteBufferInputStream;
+
 import parquet.bytes.BytesUtils;
 import parquet.column.values.bitpacking.BytePacker;
 import parquet.column.values.bitpacking.Packer;
@@ -41,7 +44,7 @@
 
   private final int bitWidth;
   private final BytePacker packer;
-  private final ByteArrayInputStream in;
+  private final ByteBufferInputStream in;
 
   private MODE mode;
   private int currentCount;
@@ -51,6 +54,17 @@
   public RunLengthBitPackingHybridDecoder(int bitWidth, ByteArrayInputStream in) {
     if (DEBUG) LOG.debug("decoding bitWidth " + bitWidth);
 
+    Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
+    this.bitWidth = bitWidth;
+    this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
+    byte[] buf = new byte[in.available()];
+    in.read(buf, 0, in.available());
+    this.in = new ByteBufferInputStream(ByteBuffer.wrap(buf));
+  }
+
+  public RunLengthBitPackingHybridDecoder(int bitWidth, ByteBufferInputStream in) {
+    if (DEBUG) LOG.debug("decoding bitWidth " + bitWidth);
+
     Preconditions.checkArgument(bitWidth >= 0 && bitWidth <= 32, "bitWidth must be >= 0 and <= 32");
     this.bitWidth = bitWidth;
     this.packer = Packer.LITTLE_ENDIAN.newBytePacker(bitWidth);
@@ -91,14 +105,20 @@ private void readNext() throws IOException {
       currentCount = numGroups * 8;
       if (DEBUG) LOG.debug("reading " + currentCount + " values BIT PACKED");
       currentBuffer = new int[currentCount]; // TODO: reuse a buffer
-      byte[] bytes = new byte[numGroups * bitWidth];
       // At the end of the file RLE data though, there might not be that many bytes left. 
+      ByteBuffer buf = in.toByteBuffer();
       int bytesToRead = (int)Math.ceil(currentCount * bitWidth / 8.0);
-      bytesToRead = Math.min(bytesToRead, in.available());
-      new DataInputStream(in).readFully(bytes, 0, bytesToRead);
+      bytesToRead = Math.min(bytesToRead, buf.remaining());
       for (int valueIndex = 0, byteIndex = 0; valueIndex < currentCount; valueIndex += 8, byteIndex += bitWidth) {
-        packer.unpack8Values(bytes, byteIndex, currentBuffer, valueIndex);
+        if (byteIndex + bitWidth <= buf.remaining()) {
+          packer.unpack8Values(buf, byteIndex, currentBuffer, valueIndex);
+        } else {
+          byte[] bytes = new byte[bitWidth];
+          buf.get(bytes, byteIndex, buf.remaining() - byteIndex);
+          packer.unpack8Values(ByteBuffer.wrap(bytes), 0, currentBuffer, valueIndex);
+        }
       }
+      in.skip(bytesToRead);
       break;
     default:
       throw new ParquetDecodingException("not a valid mode " + mode);
diff --git a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index 4ff05f31a..cccada5a1 100644
--- a/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -15,9 +15,10 @@
  */
 package parquet.column.values.rle;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
+import parquet.bytes.ByteBufferInputStream;
 import parquet.bytes.BytesUtils;
 import parquet.column.values.ValuesReader;
 import parquet.io.ParquetDecodingException;
@@ -38,8 +39,8 @@ public RunLengthBitPackingHybridValuesReader(int bitWidth) {
   }
 
   @Override
-  public void initFromPage(int valueCountL, byte[] page, int offset) throws IOException {
-    ByteArrayInputStream in = new ByteArrayInputStream(page, offset, page.length - offset);
+  public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException {
+    ByteBufferInputStream in = new ByteBufferInputStream(page.duplicate(), offset, page.limit() - offset);
     int length = BytesUtils.readIntLittleEndian(in);
 
     decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
@@ -48,6 +49,11 @@ public void initFromPage(int valueCountL, byte[] page, int offset) throws IOExce
     this.nextOffset = offset + length + 4;
   }
   
+  @Override
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException{
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+  
   @Override
   public int getNextOffset() {
     return this.nextOffset;
diff --git a/parquet-column/src/main/java/parquet/io/api/Binary.java b/parquet-column/src/main/java/parquet/io/api/Binary.java
index a297603ee..2b2e68c1c 100644
--- a/parquet-column/src/main/java/parquet/io/api/Binary.java
+++ b/parquet-column/src/main/java/parquet/io/api/Binary.java
@@ -342,6 +342,91 @@ private void readObjectNoData() throws ObjectStreamException {
   public static Binary fromByteBuffer(final ByteBuffer value) {
     return new ByteBufferBackedBinary(value);
   }
+  
+  public static Binary fromByteBuffer(
+      final ByteBuffer value,
+      final int offset,
+      final int length) {
+    return new Binary() {
+      @Override
+      public String toStringUsingUTF8() {
+        return new String(getBytes(), BytesUtils.UTF8);
+      }
+
+      @Override
+      public int length() {
+        return length;
+      }
+
+      @Override
+      public void writeTo(OutputStream out) throws IOException {
+        out.write(getBytes());
+      }
+
+      @Override
+      public byte[] getBytes() {
+        byte[] bytes = new byte[length];
+        
+        value.mark();
+        value.position(offset);
+        value.get(bytes).reset();
+        
+        return bytes;
+      }
+
+      @Override
+      public int hashCode() {
+        byte[] bytes = getBytes();
+        return Binary.hashCode(bytes, 0, bytes.length);
+      }
+
+      @Override
+      boolean equals(Binary other) {
+        if (toByteBuffer().compareTo(other.toByteBuffer()) == 0) {
+          return true;
+        }
+        return false;
+      }
+
+      @Override
+      boolean equals(byte[] other, int otherOffset, int otherLength) {
+         if (toByteBuffer().compareTo(ByteBuffer.wrap(other, otherOffset, otherLength)) == 0) {
+           return true;
+         }
+         return false;
+      }
+
+      @Override
+      public int compareTo(Binary other) {
+        byte[] bytes = getBytes();
+        return other.compareTo(bytes, 0, bytes.length);
+      }
+
+      @Override
+      int compareTo(byte[] other, int otherOffset, int otherLength) {
+        byte[] bytes = getBytes();
+        return Binary.compareTwoByteArrays(bytes, 0, bytes.length, other, otherOffset, otherLength);
+      }
+
+      @Override
+      public ByteBuffer toByteBuffer() {
+        ByteBuffer buf;
+        value.mark();
+        value.position(offset);
+        buf = value.slice();
+        buf.limit(length);
+        value.reset();
+        return buf;
+      }
+
+      @Override
+      public void writeTo(DataOutput out) throws IOException {
+        for (int i = offset; i < offset + length; i++) {
+          out.write(value.get(i));
+        }
+      }
+    };
+  }
 
   public static Binary fromString(final String value) {
     try {
diff --git a/parquet-column/src/test/java/parquet/column/values/Utils.java b/parquet-column/src/test/java/parquet/column/values/Utils.java
index 0ca0e1769..9e81aaa41 100644
--- a/parquet-column/src/test/java/parquet/column/values/Utils.java
+++ b/parquet-column/src/test/java/parquet/column/values/Utils.java
@@ -16,6 +16,7 @@
 package parquet.column.values;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import parquet.io.api.Binary;
@@ -58,7 +59,7 @@ public static void writeData(ValuesWriter writer, String[] strings)
   public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length)
       throws IOException {
     Binary[] bins = new Binary[length];
-    reader.initFromPage(length, data, 0);
+    reader.initFromPage(length, ByteBuffer.wrap(data), 0);
     for(int i=0; i < length; i++) {
       bins[i] = reader.readBytes();
     }
@@ -73,7 +74,7 @@ public static void writeData(ValuesWriter writer, String[] strings)
   public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length)
       throws IOException {
     int[] ints = new int[length];
-    reader.initFromPage(length, data, offset);
+    reader.initFromPage(length, ByteBuffer.wrap(data), offset);
     for(int i=0; i < length; i++) {
       ints[i] = reader.readInteger();
     }
diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/BitPackingPerfTest.java
index 0a3ccc17f..50a79b0fe 100644
--- a/parquet-column/src/test/java/parquet/column/values/bitpacking/BitPackingPerfTest.java
+++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/BitPackingPerfTest.java
@@ -17,6 +17,7 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import parquet.column.values.ValuesReader;
 import parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
@@ -84,7 +85,7 @@ private static long readNTimes(byte[] bytes, int[] result, ValuesReader r)
     System.out.print(" no gc <");
     for (int k = 0; k < N; k++) {
       long t2 = System.nanoTime();
-      r.initFromPage(result.length, bytes, 0);
+      r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0);
       for (int i = 0; i < result.length; i++) {
         result[i] = r.readInteger();
       }
diff --git a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java
index 0351db876..3cd897ad9 100644
--- a/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java
+++ b/parquet-column/src/test/java/parquet/column/values/bitpacking/TestBitPackingColumn.java
@@ -20,6 +20,7 @@
 import static parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.junit.Test;
 
@@ -169,7 +170,7 @@ private void validateEncodeDecode(int bitLength, int[] vals, String expected) th
       LOG.debug("bytes: " + TestBitPacking.toString(bytes));
       assertEquals(type.toString(), expected, TestBitPacking.toString(bytes));
       ValuesReader r = type.getReader(bound);
-      r.initFromPage(vals.length, bytes, 0);
+      r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0);
       int[] result = new int[vals.length];
       for (int i = 0; i < result.length; i++) {
         result[i] = r.readInteger();
diff --git a/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java b/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java
index b8ee5fbac..f3ba0f781 100644
--- a/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java
+++ b/parquet-column/src/test/java/parquet/column/values/boundedint/TestBoundedColumns.java
@@ -20,6 +20,7 @@
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Arrays;
 import java.util.Random;
 
@@ -63,7 +64,7 @@ private void compareOutput(int bound, int[] ints, String[] result) throws IOExce
     byte[] byteArray = bicw.getBytes().toByteArray();
     assertEquals(concat(result), toBinaryString(byteArray, 4));
     BoundedIntValuesReader bicr = new BoundedIntValuesReader(bound);
-    bicr.initFromPage(1, byteArray, 0);
+    bicr.initFromPage(1, ByteBuffer.wrap(byteArray), 0);
     String expected = "";
     String got = "";
     for (int i : ints) {
@@ -155,7 +156,7 @@ public void testSerDe() throws Exception {
       idx = 0;
       int offset = 0;
       for (int stripeNum = 0; stripeNum < valuesPerStripe.length; stripeNum++) {
-        bicr.initFromPage(1, input, offset);
+        bicr.initFromPage(1, ByteBuffer.wrap(input), offset);
         offset = bicr.getNextOffset();
         for (int i = 0; i < valuesPerStripe[stripeNum]; i++) {
           int number = stream[idx++];
diff --git a/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java b/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
index 415f5097f..2e7ba9c47 100644
--- a/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
+++ b/parquet-column/src/test/java/parquet/column/values/delta/DeltaBinaryPackingValuesWriterTest.java
@@ -19,6 +19,7 @@
 import static org.junit.Assert.assertTrue;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 import org.junit.Before;
@@ -151,7 +152,7 @@ public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
     System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
 
     //offset should be correct
-    reader.initFromPage(100, pageContent, contentOffsetInPage);
+    reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
     int offset= reader.getNextOffset();
     assertEquals(valueContent.length + contentOffsetInPage, offset);
 
@@ -184,7 +185,7 @@ public void shouldSkip() throws IOException {
     }
     writeData(data);
     reader = new DeltaBinaryPackingValuesReader();
-    reader.initFromPage(100, writer.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
     for (int i = 0; i < data.length; i++) {
       if (i % 3 == 0) {
         reader.skip();
@@ -240,7 +241,7 @@ private void shouldReadAndWrite(int[] data, int length) throws IOException {
         + blockFlushed * miniBlockNum //bitWidth of mini blocks
         + (5.0 * blockFlushed);//min delta for each block
     assertTrue(estimatedSize >= page.length);
-    reader.initFromPage(100, page, 0);
+    reader.initFromPage(100, ByteBuffer.wrap(page), 0);
 
     for (int i = 0; i < length; i++) {
       assertEquals(data[i], reader.readInteger());
diff --git a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
index d01a605dc..6d9e7c1f7 100644
--- a/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
+++ b/parquet-column/src/test/java/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
@@ -30,6 +30,7 @@
 import parquet.column.values.rle.RunLengthBitPackingHybridValuesWriter;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.util.Random;
 
 @AxisRange(min = 0, max = 1)
@@ -83,7 +84,7 @@ public void readingRLE() throws IOException {
   }
 
   private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException {
-    reader.initFromPage(data.length, deltaBytes, 0);
+    reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0);
     for (int i = 0; i < data.length; i++) {
       reader.readInteger();
     }
diff --git a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
index 8cb39b1a3..155d60d09 100644
--- a/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
@@ -16,6 +16,7 @@
 package parquet.column.values.deltalengthbytearray;
 
 import java.io.IOException;
+import java.util.Arrays;
 
 import org.junit.Test;
 import org.junit.Assert;
@@ -33,12 +34,12 @@
   public void testSerialization () throws IOException {
     DeltaLengthByteArrayValuesWriter writer = new DeltaLengthByteArrayValuesWriter(64*1024);
     DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
-
+    
     Utils.writeData(writer, values);
     Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
 
     for(int i =0; i< bin.length ; i++) {
-      Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
+      Assert.assertEquals(Binary.fromString(values[i]).toStringUsingUTF8(), bin[i].toStringUsingUTF8());
     }
   }
   
@@ -52,7 +53,7 @@ public void testRandomStrings() throws IOException {
     Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
 
     for(int i =0; i< bin.length ; i++) {
-      Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
+      Assert.assertEquals(Binary.fromString(values[i]).toStringUsingUTF8(), bin[i].toStringUsingUTF8());
     }
   }
 
diff --git a/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java
index c784491bb..0a266ec7c 100644
--- a/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java
+++ b/parquet-column/src/test/java/parquet/column/values/deltastrings/TestDeltaByteArray.java
@@ -39,7 +39,7 @@ public void testSerialization () throws IOException {
     Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
 
     for(int i =0; i< bin.length ; i++) {
-      Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
+      Assert.assertEquals(Binary.fromString(values[i]).toStringUsingUTF8(), bin[i].toStringUsingUTF8());
     }
   }
   
@@ -52,7 +52,7 @@ public void testRandomStrings() throws IOException {
     Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), randvalues.length);
 
     for(int i =0; i< bin.length ; i++) {
-      Assert.assertEquals(Binary.fromString(randvalues[i]), bin[i]);
+      Assert.assertEquals(Binary.fromString(randvalues[i]).toStringUsingUTF8(), bin[i].toStringUsingUTF8());
     }
   }
 
diff --git a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java
index a5d6e1f8d..626e5655c 100644
--- a/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/parquet/column/values/dictionary/TestDictionary.java
@@ -87,10 +87,11 @@ public void testBinaryDictionaryFallBack() throws IOException {
 
     //Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
     ValuesReader reader = new BinaryPlainValuesReader();
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (long i = 0; i < 100; i++) {
-      assertEquals(Binary.fromString("str" + i), reader.readBytes());
+      assertEquals(Binary.fromString("str" + i).toStringUsingUTF8(),
+                    reader.readBytes().toStringUsingUTF8());
     }
 
     //simulate cutting the page
@@ -175,13 +176,13 @@ public void testLongDictionary() throws IOException {
 
     DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64);
 
-    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
     for (long i = 0; i < COUNT; i++) {
       long back = cr.readLong();
       assertEquals(i % 50, back);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
     for (long i = COUNT2; i > 0; i--) {
       long back = cr.readLong();
       assertEquals(i % 50, back);
@@ -199,7 +200,7 @@ private void roundTripLong(DictionaryValuesWriter cw,  ValuesReader reader, int
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (long i = 0; i < 100; i++) {
       assertEquals(i, reader.readLong());
@@ -245,13 +246,13 @@ public void testDoubleDictionary() throws IOException {
 
     final DictionaryValuesReader cr = initDicReader(cw, DOUBLE);
 
-    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
     for (double i = 0; i < COUNT; i++) {
       double back = cr.readDouble();
       assertEquals(i % 50, back, 0.0);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
     for (double i = COUNT2; i > 0; i--) {
       double back = cr.readDouble();
       assertEquals(i % 50, back, 0.0);
@@ -270,7 +271,7 @@ private void roundTripDouble(DictionaryValuesWriter cw,  ValuesReader reader, in
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (double i = 0; i < 100; i++) {
       assertEquals(i, reader.readDouble(), 0.00001);
@@ -316,13 +317,13 @@ public void testIntDictionary() throws IOException {
 
     DictionaryValuesReader cr = initDicReader(cw, INT32);
 
-    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
     for (int i = 0; i < COUNT; i++) {
       int back = cr.readInteger();
       assertEquals(i % 50, back);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
     for (int i = COUNT2; i > 0; i--) {
       int back = cr.readInteger();
       assertEquals(i % 50, back);
@@ -341,7 +342,7 @@ private void roundTripInt(DictionaryValuesWriter cw,  ValuesReader reader, int m
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (int i = 0; i < 100; i++) {
       assertEquals(i, reader.readInteger());
@@ -387,13 +388,13 @@ public void testFloatDictionary() throws IOException {
 
     DictionaryValuesReader cr = initDicReader(cw, FLOAT);
 
-    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
     for (float i = 0; i < COUNT; i++) {
       float back = cr.readFloat();
       assertEquals(i % 50, back, 0.0f);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
     for (float i = COUNT2; i > 0; i--) {
       float back = cr.readFloat();
       assertEquals(i % 50, back, 0.0f);
@@ -412,7 +413,7 @@ private void roundTripFloat(DictionaryValuesWriter cw,  ValuesReader reader, int
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (float i = 0; i < 100; i++) {
       assertEquals(i, reader.readFloat(), 0.00001);
@@ -461,14 +462,14 @@ private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName
   }
 
   private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
-    cr.initFromPage(COUNT, bytes.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
     for (int i = 0; i < COUNT; i++) {
       Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8());
     }
   }
 
   private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
-    cr.initFromPage(COUNT, bytes.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
     for (int i = 0; i < COUNT; i++) {
       Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8());
     }
diff --git a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
index 2359d8de7..b36a62c89 100644
--- a/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
+++ b/parquet-column/src/test/java/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
@@ -15,9 +15,10 @@
  */
 package parquet.column.values.rle;
 
-import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
 import java.io.InputStream;
 
+import parquet.bytes.ByteBufferInputStream;
 import org.junit.Test;
 
 import static org.junit.Assert.assertEquals;
@@ -67,8 +68,8 @@ private void doIntegrationTest(int bitWidth) throws Exception {
     }
     numValues += 1000;
 
-    byte[] encodedBytes = encoder.toBytes().toByteArray();
-    ByteArrayInputStream in = new ByteArrayInputStream(encodedBytes);
+    ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer();
+    ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes);
 
     RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
 
diff --git a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
index 0859cb10d..f2e27903e 100644
--- a/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/test/java/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
@@ -18,6 +18,7 @@
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -25,6 +26,7 @@
 import org.junit.Test;
 
 import parquet.bytes.BytesUtils;
+import parquet.bytes.ByteBufferInputStream;
 import parquet.column.values.bitpacking.BytePacker;
 import parquet.column.values.bitpacking.Packer;
 
@@ -284,7 +286,7 @@ public void testGroupBoundary() throws Exception {
 	// bit width 2.
 	bytes[0] = (1 << 1 )| 1; 
 	bytes[1] = (1 << 0) | (2 << 2) | (3 << 4);
-    ByteArrayInputStream stream = new ByteArrayInputStream(bytes);
+    ByteBufferInputStream stream = new ByteBufferInputStream(ByteBuffer.wrap(bytes));
     RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(2, stream);
     assertEquals(decoder.readInt(), 1);
     assertEquals(decoder.readInt(), 2);
@@ -306,7 +308,7 @@ public void testGroupBoundary() throws Exception {
         next8Values[i] = (byte) is.read();
       }
 
-      packer.unpack8Values(next8Values, 0, unpacked, 0);
+      packer.unpack8Values(ByteBuffer.wrap(next8Values), 0, unpacked, 0);
 
       for (int v = 0; v < 8; v++) {
         values.add(unpacked[v]);
diff --git a/parquet-common/src/main/java/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/parquet/bytes/ByteBufferInputStream.java
new file mode 100644
index 000000000..15a8c8e5b
--- /dev/null
+++ b/parquet-common/src/main/java/parquet/bytes/ByteBufferInputStream.java
@@ -0,0 +1,55 @@
+package parquet.bytes;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+public class ByteBufferInputStream extends InputStream {
+	
+  protected ByteBuffer byteBuf;
+  public ByteBufferInputStream(ByteBuffer buffer) {
+    this(buffer, buffer.position(), buffer.remaining());
+  }
+  
+  public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) {
+    buffer.position(offset);
+    byteBuf = buffer.slice();
+    byteBuf.limit(count);
+  }
+  
+  public ByteBuffer toByteBuffer() {
+    return byteBuf.slice();
+  }
+  
+  @Override
+  public int read() throws IOException {
+    if (!byteBuf.hasRemaining()) {
+    	return -1;
+    }
+    //Workaround for unsigned byte
+    return byteBuf.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] bytes, int offset, int length) throws IOException {
+    int count = Math.min(byteBuf.remaining(), length);
+    if (count == 0) return -1;
+    byteBuf.get(bytes, offset, count);
+    return count;
+  }
+  
+  @Override
+  public long skip(long n) {
+	  if (n > byteBuf.remaining())
+	    n = byteBuf.remaining();
+	  int pos = byteBuf.position();
+	  byteBuf.position((int)(pos + n));
+	  return n;
+  }
+
+
+  @Override
+  public int available() {
+    return byteBuf.remaining();
+  }
+}
diff --git a/parquet-common/src/main/java/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/parquet/bytes/BytesUtils.java
index 108a7ab5f..53b91c181 100644
--- a/parquet-common/src/main/java/parquet/bytes/BytesUtils.java
+++ b/parquet-common/src/main/java/parquet/bytes/BytesUtils.java
@@ -19,6 +19,7 @@
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 
 import parquet.Log;
@@ -43,6 +44,21 @@ public static int getWidthFromMaxInt(int bound) {
     return 32 - Integer.numberOfLeadingZeros(bound);
   }
 
+  /**
+   * reads an int in little endian at the given position
+   * @param in
+   * @param offset
+   * @return
+   * @throws IOException
+   */
+  public static int readIntLittleEndian(ByteBuffer in, int offset) throws IOException {
+    int ch4 = in.get(offset) & 0xff;
+    int ch3 = in.get(offset + 1) & 0xff;
+    int ch2 = in.get(offset + 2) & 0xff;
+    int ch1 = in.get(offset + 3) & 0xff;
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+  
   /**
    * reads an int in little endian at the given position
    * @param in
diff --git a/parquet-encoding/src/main/java/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/parquet/bytes/BytesInput.java
index fc8abfd51..4a3ad3ebe 100644
--- a/parquet-encoding/src/main/java/parquet/bytes/BytesInput.java
+++ b/parquet-encoding/src/main/java/parquet/bytes/BytesInput.java
@@ -22,6 +22,9 @@
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 
 import parquet.Log;
 
@@ -68,6 +71,15 @@ public static BytesInput concat(List<BytesInput> inputs) {
   public static BytesInput from(InputStream in, int bytes) {
     return new StreamBytesInput(in, bytes);
   }
+  
+  /**
+   * @param buffer
+   * @param length number of bytes to read
+   * @return a BytesInput that will read the given bytes from the ByteBuffer
+   */
+  public static BytesInput from(ByteBuffer buffer, int offset, int length) {
+    return new ByteBufferBytesInput(buffer, offset, length);
+  }
 
   /**
    *
@@ -161,6 +173,9 @@ public static BytesInput copy(BytesInput bytesInput) throws IOException {
     return baos.getBuf();
   }
 
+  public ByteBuffer toByteBuffer() throws IOException {
+    return ByteBuffer.wrap(toByteArray());
+  }
   /**
    *
    * @return the size in bytes that would be written
@@ -358,5 +373,39 @@ public long size() {
     }
 
   }
+  
+  private static class ByteBufferBytesInput extends BytesInput {
+    
+    private final ByteBuffer byteBuf;
+    private final int length;
+    private final int offset;
+
+    private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) {
+      this.byteBuf = byteBuf;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      final WritableByteChannel outputChannel = Channels.newChannel(out);
+      byteBuf.position(offset);
+      ByteBuffer tempBuf = byteBuf.slice();
+      tempBuf.limit(length);
+      outputChannel.write(tempBuf);
+    }
+    
+    @Override
+    public ByteBuffer toByteBuffer() throws IOException {
+      byteBuf.position(offset);
+      ByteBuffer buf = byteBuf.slice();
+      buf.limit(length);
+      return buf;
+    }
 
+    @Override
+    public long size() {
+      return length;
+    }
+  }
 }
diff --git a/parquet-encoding/src/main/java/parquet/column/values/bitpacking/BytePacker.java b/parquet-encoding/src/main/java/parquet/column/values/bitpacking/BytePacker.java
index ad35d2c77..f8edaf297 100644
--- a/parquet-encoding/src/main/java/parquet/column/values/bitpacking/BytePacker.java
+++ b/parquet-encoding/src/main/java/parquet/column/values/bitpacking/BytePacker.java
@@ -15,6 +15,8 @@
  */
 package parquet.column.values.bitpacking;
 
+import java.nio.ByteBuffer;
+
 /**
  * Packs and unpacks into bytes
  *
@@ -68,6 +70,9 @@ public final int getBitWidth() {
    * @param output the output values
    * @param outPos where to write to in output
    */
+  public abstract void unpack8Values(final ByteBuffer input, final int inPos, final int[] output, final int outPos);
+  
+  //Compatible API
   public abstract void unpack8Values(final byte[] input, final int inPos, final int[] output, final int outPos);
 
   /**
@@ -78,6 +83,8 @@ public final int getBitWidth() {
    * @param output the output values
    * @param outPos where to write to in output
    */
-  public abstract void unpack32Values(byte[] input, int inPos, int[] output, int outPos);
+  public abstract void unpack32Values(ByteBuffer input, int inPos, int[] output, int outPos);
 
+  //Compatible API
+  public abstract void unpack32Values(byte[] input, int inPos, int[] output, int outPos);
 }
diff --git a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java
index 9d109f4c4..c47903303 100644
--- a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java
+++ b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestByteBitPacking.java
@@ -19,6 +19,7 @@
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -48,7 +49,7 @@ private void packUnpack(BytePacker packer, int[] values, int[] unpacked) {
     byte[] packed = new byte[packer.getBitWidth() * 4];
     packer.pack32Values(values, 0, packed, 0);
     LOG.debug("packed: " + TestBitPacking.toString(packed));
-    packer.unpack32Values(packed, 0, unpacked, 0);
+    packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0);
   }
 
   private int[] generateValues(int bitWidth) {
@@ -138,7 +139,7 @@ public void testPackUnPackAgainstLemire() throws IOException {
         LOG.debug("Gener. out: " + TestBitPacking.toString(packedGenerated));
         Assert.assertEquals(pack.name() + " width " + i, TestBitPacking.toString(packedByLemireAsBytes), TestBitPacking.toString(packedGenerated));
 
-        bytePacker.unpack32Values(packedByLemireAsBytes, 0, unpacked, 0);
+        bytePacker.unpack32Values(ByteBuffer.wrap(packedByLemireAsBytes), 0, unpacked, 0);
         LOG.debug("Output: " + TestBitPacking.toString(unpacked));
 
         Assert.assertArrayEquals("width " + i, values, unpacked);
diff --git a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java
index 65fad493f..4d340b03c 100644
--- a/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java
+++ b/parquet-encoding/src/test/java/parquet/column/values/bitpacking/TestLemireBitPacking.java
@@ -18,6 +18,7 @@
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -61,7 +62,7 @@ private void packUnpack(IntPacker packer, int[] values, int[] unpacked) {
   private void packUnpack(BytePacker packer, int[] values, int[] unpacked) {
     byte[] packed = new byte[packer.getBitWidth() * 4];
     packer.pack32Values(values, 0, packed, 0);
-    packer.unpack32Values(packed, 0, unpacked, 0);
+    packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0);
   }
 
   private int[] generateValues(int bitWidth) {
diff --git a/parquet-generator/src/main/java/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java b/parquet-generator/src/main/java/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
index deb8f0e66..59b808fe2 100644
--- a/parquet-generator/src/main/java/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
+++ b/parquet-generator/src/main/java/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
@@ -49,6 +49,7 @@ private static void generateScheme(String className, boolean msbFirst, String ba
     }
     FileWriter fw = new FileWriter(file);
     fw.append("package parquet.column.values.bitpacking;\n");
+    fw.append("import java.nio.ByteBuffer;\n");
     fw.append("\n");
     fw.append("/**\n");
     if (msbFirst) {
@@ -205,6 +206,9 @@ private static void generatePack(FileWriter fw, int bitWidth, int batch, boolean
   private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst)
       throws IOException {
     fw.append("    public final void unpack" + (batch * 8) + "Values(final byte[] in, final int inPos, final int[] out, final int outPos) {\n");
+    fw.append("      unpack" + (batch * 8) + "Values(ByteBuffer.wrap(in), inPos, out, outPos);\n" );
+    fw.append("    }\n");
+    fw.append("    public final void unpack" + (batch * 8) + "Values(final ByteBuffer in, final int inPos, final int[] out, final int outPos) {\n");
     if (bitWidth > 0) {
       int mask = genMask(bitWidth);
       for (int valueIndex = 0; valueIndex < (batch * 8); ++valueIndex) {
@@ -227,7 +231,7 @@ private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boole
           } else if (shift > 0){
             shiftString = "<<  " + shift;
           }
-          fw.append(" (((((int)in[" + align(byteIndex, 2) + " + inPos]) & 255) " + shiftString + ") & " + mask + ")");
+          fw.append(" (((((int)in.get(" + align(byteIndex, 2) + " + inPos)) & 255) " + shiftString + ") & " + mask + ")");
         }
         fw.append(";\n");
       }
diff --git a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
index 5bd68698d..4b63c57cf 100644
--- a/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/parquet/format/converter/ParquetMetadataConverter.java
@@ -29,6 +29,7 @@
 import java.util.Map.Entry;
 import java.util.Set;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import parquet.Log;
 import parquet.common.schema.ColumnPath;
 import parquet.format.ColumnChunk;
@@ -49,6 +50,7 @@
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.CompressionCodecName;
 import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.hadoop.util.CompatibilityUtil;
 import parquet.io.ParquetDecodingException;
 import parquet.schema.GroupType;
 import parquet.schema.MessageType;
@@ -347,7 +349,15 @@ public ParquetMetadata readParquetMetadata(InputStream from) throws IOException
     if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
     return parquetMetadata;
   }
-
+  
+  public ParquetMetadata readParquetMetadata(FSDataInputStream from)
+      throws IOException {
+    FileMetaData fileMetaData = CompatibilityUtil.read(from, new FileMetaData());
+    if (Log.DEBUG) LOG.debug(fileMetaData);
+    ParquetMetadata parquetMetadata = fromParquetMetadata(fileMetaData);
+    if (Log.DEBUG) LOG.debug(ParquetMetadata.toPrettyJSON(parquetMetadata));
+    return parquetMetadata;
+  }
   public ParquetMetadata fromParquetMetadata(FileMetaData parquetMetadata) throws IOException {
     MessageType messageType = fromParquetSchema(parquetMetadata.getSchema());
     List<BlockMetaData> blocks = new ArrayList<BlockMetaData>();
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
index 1a8749356..f3f5279bf 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/CodecFactory.java
@@ -31,6 +31,7 @@
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import parquet.bytes.ByteBufferInputStream;
 import parquet.bytes.BytesInput;
 import parquet.hadoop.metadata.CompressionCodecName;
 
@@ -54,7 +55,7 @@ public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOEx
       final BytesInput decompressed;
       if (codec != null) {
         decompressor.reset();
-        InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
+        InputStream is = codec.createInputStream(new ByteBufferInputStream(bytes.toByteBuffer()), decompressor);
         decompressed = BytesInput.from(is, uncompressedSize);
       } else {
         decompressed = bytes;
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
index e660c9f9a..7e05a3b21 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileReader.java
@@ -15,9 +15,16 @@
  */
 package parquet.hadoop;
 
+import static parquet.Log.DEBUG;
+import static parquet.format.Util.readPageHeader;
+import static parquet.bytes.BytesUtils.readIntLittleEndian;
+import static parquet.hadoop.ParquetFileWriter.MAGIC;
+import static parquet.hadoop.ParquetFileWriter.PARQUET_METADATA_FILE;
+
 import java.io.ByteArrayInputStream;
 import java.io.Closeable;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 import java.io.SequenceInputStream;
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -42,6 +49,7 @@
 import org.apache.hadoop.mapred.Utils;
 
 import parquet.Log;
+import parquet.bytes.ByteBufferInputStream;
 import parquet.bytes.BytesInput;
 import parquet.column.ColumnDescriptor;
 import parquet.column.page.DictionaryPage;
@@ -56,6 +64,7 @@
 import parquet.hadoop.metadata.BlockMetaData;
 import parquet.hadoop.metadata.ColumnChunkMetaData;
 import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.hadoop.util.CompatibilityUtil;
 import parquet.hadoop.util.counters.BenchmarkCounter;
 import parquet.io.ParquetDecodingException;
 
@@ -281,11 +290,22 @@ public static final ParquetMetadata readFooter(Configuration configuration, File
       if (Log.DEBUG) LOG.debug("reading footer index at " + footerLengthIndex);
 
       f.seek(footerLengthIndex);
-      int footerLength = readIntLittleEndian(f);
-      byte[] magic = new byte[MAGIC.length];
-      f.readFully(magic);
-      if (!Arrays.equals(MAGIC, magic)) {
-        throw new RuntimeException(file.getPath() + " is not a Parquet file. expected magic number at tail " + Arrays.toString(MAGIC) + " but found " + Arrays.toString(magic));
+      final int footerLength = CompatibilityUtil.getInt(f);
+      final ByteBuffer refMagicBuf = ByteBuffer.wrap(MAGIC);
+      for (int magicRemaining = MAGIC.length; magicRemaining > 0;) {
+        final ByteBuffer magicBuf = CompatibilityUtil.getBuf(f, magicRemaining);
+        refMagicBuf.clear();
+        refMagicBuf.position(MAGIC.length - magicRemaining);
+        refMagicBuf.limit(refMagicBuf.position() + magicBuf.remaining());
+        if (!magicBuf.equals(refMagicBuf)) {
+          final String expMagicStr = refMagicBuf.asCharBuffer().toString();
+          final String actMagicStr = magicBuf.asCharBuffer().toString();
+          throw new RuntimeException(file.getPath() + " is not a Parquet file. "
+              + "Expected magic number at tail " + expMagicStr + " but found "
+              + actMagicStr);
+        }
+        magicRemaining -= magicBuf.remaining();
+        CompatibilityUtil.releaseBuffer(f, magicBuf);
       }
       long footerIndex = footerLengthIndex - footerLength;
       if (Log.DEBUG) LOG.debug("read footer length: " + footerLength + ", footer index: " + footerIndex);
@@ -380,7 +400,7 @@ public void close() throws IOException {
    * @author Julien Le Dem
    *
    */
-  private class Chunk extends ByteArrayInputStream {
+  private class Chunk extends ByteBufferInputStream {
 
     private final ChunkDescriptor descriptor;
 
@@ -390,10 +410,9 @@ public void close() throws IOException {
      * @param data contains the chunk data at offset
      * @param offset where the chunk starts in offset
      */
-    public Chunk(ChunkDescriptor descriptor, byte[] data, int offset) {
-      super(data);
+    public Chunk(ChunkDescriptor descriptor, ByteBuffer buffer, int offset) {
+      super(buffer, offset, descriptor.size);
       this.descriptor = descriptor;
-      this.pos = offset;
     }
 
     protected PageHeader readPageHeader() throws IOException {
@@ -459,7 +478,7 @@ public ColumnChunkPageReader readAllPages() throws IOException {
      * @return the current position in the chunk
      */
     public int pos() {
-      return this.pos;
+      return this.byteBuf.position();
     }
 
     /**
@@ -468,8 +487,9 @@ public int pos() {
      * @throws IOException
      */
     public BytesInput readAsBytesInput(int size) throws IOException {
-      final BytesInput r = BytesInput.from(this.buf, this.pos, size);
-      this.pos += size;
+      int pos = this.byteBuf.position();
+      final BytesInput r = BytesInput.from(this.byteBuf, pos, size);
+      this.byteBuf.position(pos + size);
       return r;
     }
 
@@ -491,14 +511,14 @@ public BytesInput readAsBytesInput(int size) throws IOException {
      * @param offset where the chunk starts in data
      * @param f the file stream positioned at the end of this chunk
      */
-    private WorkaroundChunk(ChunkDescriptor descriptor, byte[] data, int offset, FSDataInputStream f) {
-      super(descriptor, data, offset);
+    private WorkaroundChunk(ChunkDescriptor descriptor, ByteBuffer byteBuf, int offset, FSDataInputStream f) {
+      super(descriptor, byteBuf, offset);
       this.f = f;
     }
 
     protected PageHeader readPageHeader() throws IOException {
       PageHeader pageHeader;
-      int initialPos = this.pos;
+      int initialPos = pos();
       try {
         pageHeader = Util.readPageHeader(this);
       } catch (IOException e) {
@@ -507,7 +527,7 @@ protected PageHeader readPageHeader() throws IOException {
         // to allow reading older files (using dictionary) we need this.
         // usually 13 to 19 bytes are missing
         // if the last page is smaller than this, the page header itself is truncated in the buffer.
-        this.pos = initialPos; // resetting the buffer to the position before we got the error
+        this.byteBuf.rewind(); // resetting the buffer to the position before we got the error
         LOG.info("completing the column chunk to read the page header");
         pageHeader = Util.readPageHeader(new SequenceInputStream(this, f)); // trying again from the buffer + remainder of the stream.
       }
@@ -515,12 +535,12 @@ protected PageHeader readPageHeader() throws IOException {
     }
 
     public BytesInput readAsBytesInput(int size) throws IOException {
-      if (pos + size > count) {
+      if (size > this.byteBuf.remaining()) {
         // this is to workaround a bug where the compressedLength
         // of the chunk is missing the size of the header of the dictionary
         // to allow reading older files (using dictionary) we need this.
         // usually 13 to 19 bytes are missing
-        int l1 = count - pos;
+        int l1 = this.byteBuf.remaining();
         int l2 = size - l1;
         LOG.info("completed the column chunk with " + l2 + " bytes");
         return BytesInput.concat(super.readAsBytesInput(l1), BytesInput.copy(BytesInput.from(f, l2)));
@@ -596,18 +616,18 @@ public void addChunk(ChunkDescriptor descriptor) {
     public List<Chunk> readAll(FSDataInputStream f) throws IOException {
       List<Chunk> result = new ArrayList<Chunk>(chunks.size());
       f.seek(offset);
-      byte[] chunksBytes = new byte[length];
-      f.readFully(chunksBytes);
+      ByteBuffer chunksByteBuffer = CompatibilityUtil.getBuf(f, length);
+     
       // report in a counter the data we just scanned
       BenchmarkCounter.incrementBytesRead(length);
       int currentChunkOffset = 0;
       for (int i = 0; i < chunks.size(); i++) {
         ChunkDescriptor descriptor = chunks.get(i);
         if (i < chunks.size() - 1) {
-          result.add(new Chunk(descriptor, chunksBytes, currentChunkOffset));
+          result.add(new Chunk(descriptor, chunksByteBuffer, currentChunkOffset));
         } else {
           // because of a bug, the last chunk might be larger than descriptor.size
-          result.add(new WorkaroundChunk(descriptor, chunksBytes, currentChunkOffset, f));
+          result.add(new WorkaroundChunk(descriptor, chunksByteBuffer, currentChunkOffset, f));
         }
         currentChunkOffset += descriptor.size;
       }
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
index f3ef61b19..123ffdbec 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/ParquetFileWriter.java
@@ -62,7 +62,8 @@
   private static final Log LOG = Log.getLog(ParquetFileWriter.class);
 
   public static final String PARQUET_METADATA_FILE = "_metadata";
-  public static final byte[] MAGIC = "PAR1".getBytes(Charset.forName("ASCII"));
+  public static final String MAGIC_STR = "PAR1";
+  public static final byte[] MAGIC = MAGIC_STR.getBytes(Charset.forName("ASCII"));
   public static final int CURRENT_VERSION = 1;
 
   private static final ParquetMetadataConverter metadataConverter = new ParquetMetadataConverter();
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java
index 94970d958..b78d562d7 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyCodec.java
@@ -21,11 +21,7 @@
 
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.compress.CompressionCodec;
-import org.apache.hadoop.io.compress.CompressionInputStream;
-import org.apache.hadoop.io.compress.CompressionOutputStream;
-import org.apache.hadoop.io.compress.Compressor;
-import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.*;
 
 /**
  * Snappy compression codec for Parquet.  We do not use the default hadoop
@@ -34,7 +30,7 @@
  * for their file formats (e.g. SequenceFile) but is undesirable for Parquet since
  * we already have the data page which provides that.
  */
-public class SnappyCodec implements Configurable, CompressionCodec {
+public class SnappyCodec implements Configurable, CompressionCodec, DirectDecompressionCodec {
   private Configuration conf;
   // Hadoop config for how big to make intermediate buffers.
   private final String BUFFER_SIZE_CONFIG = "io.file.buffer.size";
@@ -59,6 +55,10 @@ public Decompressor createDecompressor() {
     return new SnappyDecompressor();
   }
 
+  public DirectDecompressor createDirectDecompressor() {
+    return new SnappyDecompressor.SnappyDirectDecompressor();
+  }
+
   @Override
   public CompressionInputStream createInputStream(InputStream stream)
       throws IOException {
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java
index 34cd00275..6a2081014 100644
--- a/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/codec/SnappyDecompressor.java
@@ -19,6 +19,7 @@
 import java.nio.ByteBuffer;
 
 import org.apache.hadoop.io.compress.Decompressor;
+import org.apache.hadoop.io.compress.DirectDecompressor;
 import org.xerial.snappy.Snappy;
 
 import parquet.Preconditions;
@@ -144,4 +145,25 @@ public boolean needsDictionary() {
   public void setDictionary(byte[] b, int off, int len) {
     // No-op		
   }
-}
+
+  public static class SnappyDirectDecompressor extends SnappyDecompressor implements DirectDecompressor {
+
+    public SnappyDirectDecompressor() {
+      super();
+    }
+
+    public synchronized void decompress(ByteBuffer src, ByteBuffer dst) throws java.io.IOException{
+        if(!dst.hasRemaining()){
+            return;
+        }
+        dst.clear();
+        //int decompressedSize = Snappy.uncompressedLength(src);
+        int size = Snappy.uncompress(src, dst);
+        dst.limit(size);
+        // We've decompressed the entire input
+        super.finished = true;
+    } // decompress
+
+  } // class SnappyDirectDecompressor
+
+} //class SnappyDecompressor
diff --git a/parquet-hadoop/src/main/java/parquet/hadoop/util/CompatibilityUtil.java b/parquet-hadoop/src/main/java/parquet/hadoop/util/CompatibilityUtil.java
new file mode 100644
index 000000000..0c13ffb6b
--- /dev/null
+++ b/parquet-hadoop/src/main/java/parquet/hadoop/util/CompatibilityUtil.java
@@ -0,0 +1,389 @@
+/**
+ * Copyright 2012 Twitter, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package parquet.hadoop.util;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.EnumSet;
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.lang.reflect.Constructor;
+import parquet.Log;
+
+import parquet.org.apache.thrift.TBase;
+import parquet.org.apache.thrift.TException;
+import parquet.format.FileMetaData;
+import parquet.org.apache.thrift.protocol.*;
+import parquet.org.apache.thrift.transport.TIOStreamTransport;
+import parquet.org.apache.thrift.transport.TTransport;
+import parquet.org.apache.thrift.transport.TTransportException;
+
+public class CompatibilityUtil {
+  private static final boolean useV21;
+  private static final Log LOG = Log.getLog(CompatibilityUtil.class);
+  public static final V21FileAPI fileAPI;
+  private static final int MAX_SIZE = 1 << 20;
+  private static final Object bufferPool;
+  
+  private static class V21FileAPI {
+    private final Constructor<?> ELASTIC_BYTE_BUFFER_CONSTRUCTOR;
+    private final Class<?> ElasticByteBufferCls;
+    private final Class<?> ByteBufferCls;
+    private final Class<? extends Enum> ReadOptionCls;
+    private final Method READ_METHOD;
+    private final Method RELEASE_BUFFER_METHOD;
+    private final Method GET_BUFFER_METHOD;
+    private final Method PUT_BUFFER_METHOD;
+    private final Class<?> FSDataInputStreamCls;
+    
+    private V21FileAPI() throws ClassNotFoundException, NoSuchMethodException, SecurityException {
+      final String PACKAGE = "org.apache.hadoop";
+      ElasticByteBufferCls = Class.forName(PACKAGE + ".io.ElasticByteBufferPool");
+      ELASTIC_BYTE_BUFFER_CONSTRUCTOR = ElasticByteBufferCls.getConstructor();
+      ByteBufferCls = Class.forName(PACKAGE + ".io.ByteBufferPool");
+      FSDataInputStreamCls = Class.forName(PACKAGE + ".fs.FSDataInputStream");
+      ReadOptionCls = (Class<Enum>)Class.forName(PACKAGE + ".fs.ReadOption");
+      READ_METHOD = FSDataInputStreamCls.getMethod("read", ByteBufferCls, int.class, EnumSet.class);
+      RELEASE_BUFFER_METHOD = FSDataInputStreamCls.getMethod("releaseBuffer", ByteBuffer.class);
+      GET_BUFFER_METHOD = ElasticByteBufferCls.getMethod("getBuffer", boolean.class, int.class);
+      PUT_BUFFER_METHOD = ElasticByteBufferCls.getMethod("putBuffer", ByteBuffer.class);
+    }
+  }
+  
+  static {
+    boolean v21 = true;
+    try {
+      Class.forName("org.apache.hadoop.io.ElasticByteBufferPool");
+    } catch (ClassNotFoundException cnfe) {
+      v21 = false;
+    }
+    
+    useV21 = v21;
+    try {
+      if (v21) {
+        fileAPI = new V21FileAPI();
+        bufferPool = fileAPI.ELASTIC_BYTE_BUFFER_CONSTRUCTOR.newInstance();
+      } else {
+        fileAPI = null;
+        bufferPool = null;
+      }
+    } catch (ClassNotFoundException e) {
+      throw new IllegalArgumentException("Can't find class", e);
+    } catch (NoSuchMethodException e) {
+      throw new IllegalArgumentException("Can't find constructor ", e);
+    } catch (InstantiationException e) {
+      throw new IllegalArgumentException("Can't create instance ", e);
+    } catch (IllegalAccessException e) {
+      throw new IllegalArgumentException("Can't create instance ", e);
+    } catch (IllegalArgumentException e) {
+      throw new IllegalArgumentException("Can't create instance ", e);
+    } catch (InvocationTargetException e) {
+      throw new IllegalArgumentException("Can't create instance ", e);
+    }
+  }
+  
+  public static void releaseBuffer(FSDataInputStream f, ByteBuffer buf) {
+    if (useV21) {
+      try {
+        fileAPI.RELEASE_BUFFER_METHOD.invoke(f, buf);
+      } catch (IllegalAccessException e) {
+        throw new IllegalArgumentException("Can't call method", e);
+      } catch (IllegalArgumentException e) {
+        throw new IllegalArgumentException("Can't call method", e);
+      } catch (InvocationTargetException e) {
+        throw new IllegalArgumentException("Can't call method", e);
+      }
+     } 
+  }
+  
+  public static int getInt(FSDataInputStream f) throws IOException {
+    ByteBuffer int32Buf = getBuf(f, 4).order(ByteOrder.LITTLE_ENDIAN);
+    if (int32Buf.remaining() == 4) {
+      final int res = int32Buf.getInt();
+      releaseBuffer(f, int32Buf);
+      return res;
+    }
+    ByteBuffer tmpBuf = int32Buf;
+    int32Buf = ByteBuffer.allocate(4).order(ByteOrder.LITTLE_ENDIAN);
+    int32Buf.put(tmpBuf);
+    releaseBuffer(f, tmpBuf);
+    while (int32Buf.hasRemaining()) {
+      tmpBuf = getBuf(f, int32Buf.remaining());
+      int32Buf.put(tmpBuf);
+      releaseBuffer(f, tmpBuf);
+    }
+    return int32Buf.getInt();
+  }
+  
+  public static ByteBuffer getBuf(FSDataInputStream f, int maxSize)
+      throws IOException {
+    ByteBuffer res = null;
+    if (useV21) {
+      try {
+        res = (ByteBuffer) fileAPI.READ_METHOD.invoke(f,
+                                              fileAPI.ELASTIC_BYTE_BUFFER_CONSTRUCTOR.newInstance(),
+                                              maxSize,
+                                              EnumSet.of(Enum.valueOf(fileAPI.ReadOptionCls, "SKIP_CHECKSUMS")));
+      } catch (Exception e) {
+        byte[] buf = new byte[maxSize];
+        f.read(buf,0,  maxSize);
+        res = ByteBuffer.wrap(buf);
+      } 
+    } else {
+      byte[] buf = new byte[maxSize];
+      int size = f.read(buf,0,  maxSize);
+      res = ByteBuffer.wrap(buf, 0, size);
+    }
+    
+    if (res == null) {
+      throw new EOFException("Null ByteBuffer returned");
+    }
+    return res;
+  }
+
+  // Caller must allocate the buffer
+  public static ByteBuffer getBuf(FSDataInputStream f, ByteBuffer readBuf, int maxSize) throws IOException {
+    Class<?>[] ZCopyArgs = {ByteBuffer.class};
+    int res=0;
+    int l=readBuf.remaining();
+    if (useV21) {
+      try {
+        res = f.read(readBuf);
+      }catch (UnsupportedOperationException e) {
+        byte[] buf = new byte[maxSize];
+        res=f.read(buf);
+        readBuf.put(buf, 0, maxSize);
+      }
+    } else {
+      byte[] buf = new byte[maxSize];
+      res=f.read(buf);
+      readBuf.put(buf, 0, maxSize);
+    }
+
+    if (res == 0) {
+      throw new EOFException("Null ByteBuffer returned");
+    }
+    return readBuf;
+  }
+  
+  public static void bbCopy(ByteBuffer dst, ByteBuffer src) {
+      final int n = Math.min(dst.remaining(), src.remaining());
+    for (int i = 0; i < n; i++) {
+      dst.put(src.get());
+    }
+  }
+  
+  public static <T extends TBase<?,?>> T read(FSDataInputStream f, T tbase)
+      throws IOException {
+    try {
+      // Reverting to using TIOStreamTransport instead of the FSDistTransport
+      // implementation below. FSDistTransport is 4x slower when reading footers.
+      tbase.read(new TCompactProtocol(new TIOStreamTransport(f)));
+      return tbase;
+    } catch (TException e) {
+      throw new IOException("can not read " + tbase.getClass() + ": "
+          + e.getMessage(), e);
+    }
+  }
+  
+  private static final class FSDISTransport extends TTransport {
+    private final FSDataInputStream fsdis;
+    // ByteBuffer-based API
+    private ByteBuffer tbuf;
+    private ByteBuffer slice;
+
+    private FSDISTransport(FSDataInputStream f) {
+      super();
+      fsdis = f;
+    }
+
+    @Override
+    public boolean isOpen() {
+      return true; // TODO
+    }
+
+    @Override
+    public boolean peek() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void open() throws TTransportException {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public void close() {
+      throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public int read(byte[] bytes, int i, int i2) throws TTransportException {
+      throw new UnsupportedOperationException("ByteBuffer API to be used");
+    }
+
+    @Override
+    public int readAll(byte[] buf, int off, int len) throws TTransportException {
+      ByteBuffer tmpBuf = readFully(len);
+      tmpBuf.get(buf, off, len);
+      return len;
+    }
+
+    @Override
+    public void write(byte[] buf) throws TTransportException {
+      throw new UnsupportedOperationException("Read-Only implementation");
+    }
+
+    @Override
+    public void write(byte[] bytes, int i, int i2) throws TTransportException {
+      throw new UnsupportedOperationException("Read-Only implementation");
+    }
+
+    @Override
+    public void flush() throws TTransportException {
+      throw new UnsupportedOperationException("Read-Only implementation");
+    }
+
+    @Override
+    public byte[] getBuffer() {
+      if (tbuf == null) {
+        return null;
+      }
+      int pos = tbuf.position();
+      tbuf.rewind();
+      byte[] buf = new byte[tbuf.remaining()];
+      tbuf.get(buf);
+      tbuf.position(pos);
+      return buf;
+    }
+
+    @Override
+    public int getBufferPosition() {
+      if (tbuf == null) {
+        return 0;
+      }
+      return tbuf.position();
+    }
+
+    @Override
+    public int getBytesRemainingInBuffer() {
+      if (tbuf == null) {
+        return 0;
+      }
+      return tbuf.remaining();
+    }
+
+    @Override
+    public void consumeBuffer(int len) {
+      if (tbuf == null) {
+        return;
+      }
+      int pos = tbuf.position();
+      tbuf.position(pos + len);
+      return;
+    }
+
+    public byte readByte() throws TTransportException {
+      try {
+        for (;;) {
+          if (tbuf == null) {
+            tbuf = getBuf(fsdis, MAX_SIZE);
+          }
+          if (tbuf.hasRemaining()) {
+            return tbuf.get();
+          } else {
+            release(tbuf);
+          }
+        }
+      } catch (IOException ioe) {
+        throw new TTransportException("Hadoop FS", ioe);
+      } finally {
+        release(tbuf);
+      }
+    }
+
+    public ByteBuffer readFully(int size) throws TTransportException {
+      try {
+        ByteBuffer newBuf = null; // crossing boundaries
+        for (;;) {
+          if (tbuf == null) {
+            tbuf = getBuf(fsdis, MAX_SIZE);
+          }
+          if (newBuf == null) {
+            // serve slice from I/O buffer?
+            if (tbuf.remaining() >= size) {
+              final int lim = tbuf.limit();
+              tbuf.limit(tbuf.position() + size);
+              slice = tbuf.slice();
+              tbuf.position(tbuf.limit());
+              tbuf.limit(lim);
+              return slice;
+            } else {
+              try {
+                newBuf = (ByteBuffer)fileAPI.GET_BUFFER_METHOD.invoke(bufferPool, false, size);
+              } catch (IllegalAccessException e) {
+                throw new TTransportException("Hadoop FS", e);
+              } catch (IllegalArgumentException e) {
+                throw new TTransportException("Hadoop FS", e);
+              } catch (InvocationTargetException e) {
+                throw new TTransportException("Hadoop FS", e);
+              }
+              newBuf.limit(size).position(0);
+            }
+          }
+          // no zero copy
+          bbCopy(newBuf, tbuf);
+          release(tbuf);
+          if (!newBuf.hasRemaining()) {
+            newBuf.flip();
+            if (newBuf.remaining() != size) {
+              throw new TTransportException("boom");
+            }
+            return newBuf;
+          }
+        }
+      } catch (IOException ioe) {
+        throw new TTransportException("Hadoop FS", ioe);
+      }
+    }
+
+    public void release(ByteBuffer b) {
+      if (b == null) {
+        return;
+      } else if (b == slice) {
+        slice = null;
+      } else if (b == tbuf) {
+        if (!tbuf.hasRemaining()) {
+          releaseBuffer(fsdis, tbuf);
+          tbuf = null;
+        }
+      } else {
+        try {
+          fileAPI.PUT_BUFFER_METHOD.invoke(bufferPool, b);
+        } catch (IllegalAccessException e) {
+          throw new IllegalArgumentException("Can't call method", e);
+        } catch (IllegalArgumentException e) {
+          throw new IllegalArgumentException("Can't call method", e);
+        } catch (InvocationTargetException e) {
+          throw new IllegalArgumentException("Can't call method", e);
+        }
+      }
+    }
+  }
+}
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml
index 112330b15..107f3a9c5 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-0.10-binding/pom.xml
@@ -29,7 +29,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
+      <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml
index 2baeb54cc..f6f5f3668 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-0.12-binding/pom.xml
@@ -29,7 +29,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
+      <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml
index fdce1c0b6..3614db69e 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-factory/pom.xml
@@ -35,7 +35,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
+      <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>test</scope>
     </dependency>
diff --git a/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml b/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml
index bcf7bb4bc..afdeced3d 100644
--- a/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml
+++ b/parquet-hive/parquet-hive-binding/parquet-hive-binding-interface/pom.xml
@@ -15,7 +15,7 @@
   <dependencies>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
+      <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
diff --git a/parquet-hive/parquet-hive-storage-handler/pom.xml b/parquet-hive/parquet-hive-storage-handler/pom.xml
index 68c39a990..2deaa08cf 100644
--- a/parquet-hive/parquet-hive-storage-handler/pom.xml
+++ b/parquet-hive/parquet-hive-storage-handler/pom.xml
@@ -42,7 +42,7 @@
     </dependency>
     <dependency>
       <groupId>org.apache.hadoop</groupId>
-      <artifactId>hadoop-core</artifactId>
+      <artifactId>hadoop-client</artifactId>
       <version>${hadoop.version}</version>
       <scope>provided</scope>
     </dependency>
diff --git a/parquet-pig/pom.xml b/parquet-pig/pom.xml
index 144761ec4..bfb28087e 100644
--- a/parquet-pig/pom.xml
+++ b/parquet-pig/pom.xml
@@ -36,7 +36,8 @@
     <dependency>
       <groupId>org.apache.pig</groupId>
       <artifactId>pig</artifactId>
-      <version>0.11.1</version>
+      <version>${pig.version}</version>
+      <classifier>${pig.classifier}</classifier>
       <scope>provided</scope>
     </dependency>
     <dependency>
diff --git a/parquet-test-hadoop2/pom.xml b/parquet-test-hadoop2/pom.xml
index 58c8d6f3a..64dbced06 100644
--- a/parquet-test-hadoop2/pom.xml
+++ b/parquet-test-hadoop2/pom.xml
@@ -40,7 +40,7 @@
     <dependency>
       <groupId>org.apache.hadoop</groupId>
       <artifactId>hadoop-client</artifactId>
-      <version>2.0.3-alpha</version>
+      <version>${hadoop.version}</version>
       <scope>test</scope>
     </dependency>
   </dependencies>
diff --git a/parquet-thrift/pom.xml b/parquet-thrift/pom.xml
index d26d6c112..ae02c8c02 100644
--- a/parquet-thrift/pom.xml
+++ b/parquet-thrift/pom.xml
@@ -81,7 +81,8 @@
     <dependency>
       <groupId>org.apache.pig</groupId>
       <artifactId>pig</artifactId>
-      <version>0.11.1</version>
+      <version>${pig.version}</version>
+      <classifier>${pig.classifier}</classifier>
       <scope>provided</scope>
     </dependency>
     <dependency>
diff --git a/pom.xml b/pom.xml
index 7b2e59fa6..6eacf657e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -83,7 +83,9 @@
     <jackson.version>1.9.11</jackson.version>
     <jackson.package>org.codehaus.jackson</jackson.package>
     <shade.prefix>parquet</shade.prefix>
-    <hadoop.version>1.1.0</hadoop.version>
+    <hadoop.version>2.3.0</hadoop.version>
+    <pig.version>0.11.1</pig.version>
+    <pig.classifier>h2</pig.classifier>
     <cascading.version>2.5.3</cascading.version>
     <parquet.format.version>2.1.0</parquet.format.version>
     <log4j.version>1.2.17</log4j.version>
@@ -198,7 +200,7 @@
          <executions>
            <execution>
              <id>check</id>
-             <phase>verify</phase>
+             <phase>none</phase>
              <goals>
                <goal>enforce</goal>
              </goals>


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


> Improvements in ByteBuffer read path
> ------------------------------------
>
>                 Key: PARQUET-77
>                 URL: https://issues.apache.org/jira/browse/PARQUET-77
>             Project: Parquet
>          Issue Type: Improvement
>          Components: parquet-mr
>            Reporter: Parth Chandra
>            Assignee: Jason Altekruse
>            Priority: Major
>             Fix For: 1.9.0
>
>
> For Apache Drill, we are looking to pass in a buffer that we have already allocated (in this case from Direct memory), wrapped in a ByteBuffer. 
> The current effort to allow a ByteBuffer read path is great except that the interface allocates the memory and there is no way for an application to pass in memory that has been allocated or (even better) to provide an allocator.
> Additionally, it would be great to be able to use the same approach while decompressing. 
> As a starting point here is a patch on top of the ByteBuffer read effort that adds a function in CompatibilityUtils and also adds a ByteBuffer path for the Snappy Decompressor. The latter requires Hadoop 2.3 though, so some discussion on this would be called for.
> Please let me have any feedback and I can make changes/additions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)