You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by bl...@apache.org on 2018/02/21 17:40:13 UTC

[2/2] parquet-mr git commit: PARQUET-787: Limit read allocation size

PARQUET-787: Limit read allocation size

WIP: This update the `ParquetFileReader` to use multiple buffers when reading a row group, instead of a single humongous allocation. As a consequence, many classes needed to be updated to accept a stream backed by multiple buffers, instead of using a single buffer directly. Assuming a single contiguous buffer would require too many copies.

Author: Ryan Blue <bl...@apache.org>

Closes #390 from rdblue/PARQUET-787-limit-read-allocation-size and squashes the following commits:

4abba3e7a [Ryan Blue] PARQUET-787: Update byte buffer input streams for review comments.
e7c6c5dd2 [Ryan Blue] PARQUET-787: Fix problems from Zoltan's review.
be52b59fa [Ryan Blue] PARQUET-787: Update tests for both ByteBufferInputStreams.
b0b614748 [Ryan Blue] PARQUET-787: Update encodings to use ByteBufferInputStream.
a4fa05ac5 [Ryan Blue] Refactor ByteBufferInputStream implementations.
56b22a6a1 [Ryan Blue] Make allocation size configurable.
103ed3d86 [Ryan Blue] Add tests for ByteBufferInputStream and fix bugs.
614a2bbc8 [Ryan Blue] Limit allocation size to 8MB chunks for better garbage collection.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/8bbc6cb9
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/8bbc6cb9
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/8bbc6cb9

Branch: refs/heads/master
Commit: 8bbc6cb95fd9b4b9e86c924ca1e40fd555ecac1d
Parents: ad80bfe
Author: Ryan Blue <bl...@apache.org>
Authored: Wed Feb 21 09:40:07 2018 -0800
Committer: Ryan Blue <bl...@apache.org>
Committed: Wed Feb 21 09:40:07 2018 -0800

----------------------------------------------------------------------
 .../parquet/column/impl/ColumnReaderImpl.java   |  30 +-
 .../parquet/column/values/ValuesReader.java     |  36 +-
 .../bitpacking/BitPackingValuesReader.java      |  15 +-
 .../bitpacking/ByteBitPackingValuesReader.java  |  48 +-
 .../delta/DeltaBinaryPackingValuesReader.java   |  40 +-
 .../DeltaLengthByteArrayValuesReader.java       |  30 +-
 .../deltastrings/DeltaByteArrayReader.java      |  11 +-
 .../dictionary/DictionaryValuesReader.java      |   9 +-
 .../dictionary/PlainValuesDictionary.java       |  17 +-
 .../values/plain/BinaryPlainValuesReader.java   |  31 +-
 .../values/plain/BooleanPlainValuesReader.java  |  16 +-
 .../FixedLenByteArrayPlainValuesReader.java     |  29 +-
 .../column/values/plain/PlainValuesReader.java  |  15 +-
 .../rle/RunLengthBitPackingHybridDecoder.java   |   2 -
 .../RunLengthBitPackingHybridValuesReader.java  |  19 +-
 .../values/rle/ZeroIntegerValuesReader.java     |  11 +-
 .../column/impl/TestCorruptDeltaByteArrays.java |  17 +-
 .../org/apache/parquet/column/values/Utils.java |  21 +-
 .../values/bitpacking/BitPackingPerfTest.java   |   3 +-
 .../values/bitpacking/TestBitPackingColumn.java |   3 +-
 ...BinaryPackingValuesWriterForIntegerTest.java |  17 +-
 ...ltaBinaryPackingValuesWriterForLongTest.java |  15 +-
 .../BenchmarkReadingRandomIntegers.java         |   3 +-
 .../TestDeltaLengthByteArray.java               |   6 +-
 .../BenchmarkDeltaLengthByteArray.java          |   9 +-
 .../values/deltastrings/TestDeltaByteArray.java |  10 +-
 .../benchmark/BenchmarkDeltaByteArray.java      |  17 +-
 .../values/dictionary/TestDictionary.java       |  36 +-
 ...unLengthBitPackingHybridIntegrationTest.java |   2 +-
 .../TestRunLengthBitPackingHybridEncoder.java   |   2 -
 .../parquet/bytes/ByteBufferInputStream.java    |  86 ++-
 .../org/apache/parquet/bytes/BytesInput.java    |  98 ++-
 .../parquet/bytes/MultiBufferInputStream.java   | 382 ++++++++++++
 .../parquet/bytes/SingleBufferInputStream.java  | 177 ++++++
 .../bytes/TestByteBufferInputStreams.java       | 597 +++++++++++++++++++
 .../bytes/TestMultiBufferInputStream.java       | 141 +++++
 .../bytes/TestSingleBufferInputStream.java      | 130 ++++
 .../org/apache/parquet/HadoopReadOptions.java   |   9 +-
 .../org/apache/parquet/ParquetReadOptions.java  |  50 +-
 .../org/apache/parquet/hadoop/CodecFactory.java |   2 +-
 .../parquet/hadoop/DirectCodecFactory.java      |   4 +-
 .../parquet/hadoop/ParquetFileReader.java       |  91 +--
 .../parquet/hadoop/TestDirectCodecFactory.java  |   6 +-
 43 files changed, 1852 insertions(+), 441 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
index 931b4b1..8b47977 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
@@ -24,12 +24,11 @@ import static org.apache.parquet.column.ValuesType.DEFINITION_LEVEL;
 import static org.apache.parquet.column.ValuesType.REPETITION_LEVEL;
 import static org.apache.parquet.column.ValuesType.VALUES;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -549,7 +548,7 @@ public class ColumnReaderImpl implements ColumnReader {
     });
   }
 
-  private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) {
+  private void initDataReader(Encoding dataEncoding, ByteBufferInputStream in, int valueCount) {
     ValuesReader previousReader = this.dataColumn;
 
     this.currentEncoding = dataEncoding;
@@ -565,13 +564,15 @@ public class ColumnReaderImpl implements ColumnReader {
     } else {
       this.dataColumn = dataEncoding.getValuesReader(path, VALUES);
     }
+
     if (dataEncoding.usesDictionary() && converter.hasDictionarySupport()) {
       bindToDictionary(dictionary);
     } else {
       bind(path.getType());
     }
+
     try {
-      dataColumn.initFromPage(pageValueCount, bytes, offset);
+      dataColumn.initFromPage(pageValueCount, in);
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page in col " + path, e);
     }
@@ -589,16 +590,15 @@ public class ColumnReaderImpl implements ColumnReader {
     this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
     this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
     try {
-      ByteBuffer bytes = page.getBytes().toByteBuffer();
-      LOG.debug("page size {} bytes and {} records", bytes.remaining(), pageValueCount);
+      BytesInput bytes = page.getBytes();
+      LOG.debug("page size {} bytes and {} records", bytes.size(), pageValueCount);
       LOG.debug("reading repetition levels at 0");
-      rlReader.initFromPage(pageValueCount, bytes, 0);
-      int next = rlReader.getNextOffset();
-      LOG.debug("reading definition levels at {}", next);
-      dlReader.initFromPage(pageValueCount, bytes, next);
-      next = dlReader.getNextOffset();
-      LOG.debug("reading data at {}", next);
-      initDataReader(page.getValueEncoding(), bytes, next, page.getValueCount());
+      ByteBufferInputStream in = bytes.toInputStream();
+      rlReader.initFromPage(pageValueCount, in);
+      LOG.debug("reading definition levels at {}", in.position());
+      dlReader.initFromPage(pageValueCount, in);
+      LOG.debug("reading data at {}", in.position());
+      initDataReader(page.getValueEncoding(), in, page.getValueCount());
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
     }
@@ -607,9 +607,9 @@ public class ColumnReaderImpl implements ColumnReader {
   private void readPageV2(DataPageV2 page) {
     this.repetitionLevelColumn = newRLEIterator(path.getMaxRepetitionLevel(), page.getRepetitionLevels());
     this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
+    LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
     try {
-      LOG.debug("page data size {} bytes and {} records", page.getData().size(), pageValueCount);
-      initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount());
+      initDataReader(page.getDataEncoding(), page.getData().toInputStream(), page.getValueCount());
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
index 03aa2f8..b2ec2a5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -20,8 +20,7 @@ package org.apache.parquet.column.values;
 
 import java.io.IOException;
 
-import java.nio.ByteBuffer;
-import org.apache.parquet.io.ParquetDecodingException;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.io.api.Binary;
 
 /**
@@ -40,8 +39,9 @@ public abstract class ValuesReader {
   /**
    * Called to initialize the column reader from a part of a page.
    *
-   * The underlying implementation knows how much data to read, so a length
-   * is not provided.
+   * Implementations must consume all bytes from the input stream, leaving the
+   * stream ready to read the next section of data. The underlying
+   * implementation knows how much data to read, so a length is not provided.
    *
    * Each page may contain several sections:
    * <ul>
@@ -50,36 +50,12 @@ public abstract class ValuesReader {
    *  <li> data column
    * </ul>
    *
-   * This function is called with 'offset' pointing to the beginning of one of these sections,
-   * and should return the offset to the section following it.
-   *
    * @param valueCount count of values in this page
-   * @param page the array to read from containing the page data (repetition levels, definition levels, data)
-   * @param offset where to start reading from in the page
+   * @param in an input stream containing the page data at the correct offset
    *
    * @throws IOException
    */
-  public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;
-
-  /**
-   * Same functionality as method of the same name that takes a ByteBuffer instead of a byte[].
-   *
-   * This method is only provided for backward compatibility and will be removed in a future release.
-   * Please update any code using it as soon as possible.
-   * @see #initFromPage(int, ByteBuffer, int)
-   */
-  @Deprecated
-  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
-    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
-  }
-
-  /**
-   * Called to return offset of the next section
-   * @return offset of the next section
-   */
-  public int getNextOffset() {
-    throw new ParquetDecodingException("Unsupported: cannot get offset of the next section.");
-  }
+  public abstract void initFromPage(int valueCount, ByteBufferInputStream in) throws IOException;
 
   /**
    * usable when the encoding is dictionary based

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
index a5608cb..bcc828b 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
@@ -22,7 +22,6 @@ import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
 import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
@@ -44,7 +43,6 @@ public class BitPackingValuesReader extends ValuesReader {
   private ByteBufferInputStream in;
   private BitPackingReader bitPackingReader;
   private final int bitsPerValue;
-  private int nextOffset;
 
   /**
    * @param bound the maximum value stored by this column
@@ -68,21 +66,16 @@ public class BitPackingValuesReader extends ValuesReader {
 
   /**
    * {@inheritDoc}
-   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
+   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream)
    */
   @Override
-  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+  public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
     int effectiveBitLength = valueCount * bitsPerValue;
     int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
     LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitsPerValue);
-    this.in = new ByteBufferInputStream(in, offset, length);
-    this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
-    this.nextOffset = offset + length;
-  }
 
-  @Override
-  public int getNextOffset() {
-    return nextOffset;
+    this.in = stream.sliceStream(length);
+    this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
index 7c19340..0445d25 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
@@ -19,11 +19,12 @@
 package org.apache.parquet.column.values.bitpacking;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -36,9 +37,7 @@ public class ByteBitPackingValuesReader extends ValuesReader {
   private final BytePacker packer;
   private final int[] decoded = new int[VALUES_AT_A_TIME];
   private int decodedPosition = VALUES_AT_A_TIME - 1;
-  private ByteBuffer encoded;
-  private int encodedPos;
-  private int nextOffset;
+  private ByteBufferInputStream in;
 
   public ByteBitPackingValuesReader(int bound, Packer packer) {
     this.bitWidth = BytesUtils.getWidthFromMaxInt(bound);
@@ -49,37 +48,38 @@ public class ByteBitPackingValuesReader extends ValuesReader {
   public int readInteger() {
     ++ decodedPosition;
     if (decodedPosition == decoded.length) {
-      encoded.position(encodedPos);
-      if (encodedPos + bitWidth > encoded.limit()) {
-        // unpack8Values needs at least bitWidth bytes to read from,
-        // We have to fill in 0 byte at the end of encoded bytes.
-        byte[] tempEncode = new byte[bitWidth];
-        encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
-        packer.unpack8Values(tempEncode, 0, decoded, 0);
-      } else {
-        packer.unpack8Values(encoded, encodedPos, decoded, 0);
+      try {
+        if (in.available() < bitWidth) {
+          // unpack8Values needs at least bitWidth bytes to read from,
+          // We have to fill in 0 byte at the end of encoded bytes.
+          byte[] tempEncode = new byte[bitWidth];
+          in.read(tempEncode, 0, in.available());
+          packer.unpack8Values(tempEncode, 0, decoded, 0);
+        } else {
+          ByteBuffer encoded = in.slice(bitWidth);
+          packer.unpack8Values(encoded, encoded.position(), decoded, 0);
+        }
+      } catch (IOException e) {
+        throw new ParquetDecodingException("Failed to read packed values", e);
       }
-      encodedPos += bitWidth;
       decodedPosition = 0;
     }
     return decoded[decodedPosition];
   }
 
   @Override
-  public void initFromPage(int valueCount, ByteBuffer page, int offset)
+  public void initFromPage(int valueCount, ByteBufferInputStream stream)
       throws IOException {
     int effectiveBitLength = valueCount * bitWidth;
     int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil
-    LOG.debug("reading {} bytes for {} values of size {} bits.", length, valueCount, bitWidth);
-    this.encoded = page;
-    this.encodedPos = offset;
+    LOG.debug("reading {} bytes for {} values of size {} bits.",
+        length, valueCount, bitWidth);
+    // work-around for null values. this will not happen for repetition or
+    // definition levels (never null), but will happen when valueCount has not
+    // been adjusted for null values in the data.
+    length = Math.min(length, stream.available());
+    this.in = stream.sliceStream(length);
     this.decodedPosition = VALUES_AT_A_TIME - 1;
-    this.nextOffset = offset + length;
-  }
-  
-  @Override
-  public int getNextOffset() {
-    return nextOffset;
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index a3355d2..bf53846 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -18,7 +18,6 @@
  */
 package org.apache.parquet.column.values.delta;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
@@ -28,7 +27,6 @@ import org.apache.parquet.column.values.bitpacking.BytePackerForLong;
 import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 
-import java.io.IOException;
 import java.nio.ByteBuffer;
 
 /**
@@ -43,7 +41,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
    */
   private int valuesRead;
   private long minDeltaInCurrentBlock;
-  private ByteBuffer page;
+
   /**
    * stores the decoded values including the first value which is written to the header
    */
@@ -54,23 +52,16 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
    */
   private int valuesBuffered;
   private ByteBufferInputStream in;
-  private int nextOffset;
   private DeltaBinaryPackingConfig config;
   private int[] bitWidths;
 
   /**
-   * eagerly load all the data into memory
-   *
-   * @param valueCount count of values in this page
-   * @param page       the array to read from containing the page data (repetition levels, definition levels, data)
-   * @param offset     where to start reading from in the page
-   * @throws IOException
+   * eagerly loads all the data into memory
    */
   @Override
-  public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
-    in = new ByteBufferInputStream(page, offset, page.limit() - offset);
+  public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
+    this.in = stream;
     this.config = DeltaBinaryPackingConfig.readConfig(in);
-    this.page = page;
     this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
     allocateValuesBuffer();
     bitWidths = new int[config.miniBlockNumInABlock];
@@ -81,14 +72,8 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
     while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
       loadNewBlockToBuffer();
     }
-    this.nextOffset = page.limit() - in.available();
   }
-  
-  @Override
-  public int getNextOffset() {
-    return nextOffset;
-  }
-  
+
   /**
    * the value buffer is allocated so that the size of it is multiple of mini block
    * because when writing, data is flushed on a mini block basis
@@ -123,7 +108,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
     }
   }
 
-  private void loadNewBlockToBuffer() {
+  private void loadNewBlockToBuffer() throws IOException {
     try {
       minDeltaInCurrentBlock = BytesUtils.readZigZagVarLong(in);
     } catch (IOException e) {
@@ -152,19 +137,18 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
    *
    * @param packer the packer created from bitwidth of current mini block
    */
-  private void unpackMiniBlock(BytePackerForLong packer) {
+  private void unpackMiniBlock(BytePackerForLong packer) throws IOException {
     for (int j = 0; j < config.miniBlockSizeInValues; j += 8) {
       unpack8Values(packer);
     }
   }
 
-  private void unpack8Values(BytePackerForLong packer) {
-    //calculate the pos because the packer api uses array not stream
-    int pos = page.limit() - in.available();
-    packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
+  private void unpack8Values(BytePackerForLong packer) throws IOException {
+    // get a single buffer of 8 values. most of the time, this won't require a copy
+    // TODO: update the packer to consume from an InputStream
+    ByteBuffer buffer = in.slice(packer.getBitWidth());
+    packer.unpack8Values(buffer, buffer.position(), valuesBuffer, valuesBuffered);
     this.valuesBuffered += 8;
-    //sync the pos in stream
-    in.skip(packer.getBitWidth());
   }
 
   private void readBitWidthsForMiniBlocks() {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index d810ba8..e6ee1fd 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -22,8 +22,10 @@ package org.apache.parquet.column.values.deltalengthbytearray;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
+import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,34 +40,38 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {
 
   private static final Logger LOG = LoggerFactory.getLogger(DeltaLengthByteArrayValuesReader.class);
   private ValuesReader lengthReader;
-  private ByteBuffer in;
-  private int offset;
+  private ByteBufferInputStream in;
 
   public DeltaLengthByteArrayValuesReader() {
     this.lengthReader = new DeltaBinaryPackingValuesReader();
   }
 
   @Override
-  public void initFromPage(int valueCount, ByteBuffer in, int offset)
+  public void initFromPage(int valueCount, ByteBufferInputStream stream)
       throws IOException {
-    LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
-    lengthReader.initFromPage(valueCount, in, offset);
-    offset = lengthReader.getNextOffset();
-    this.in = in;
-    this.offset = offset;
+    LOG.debug("init from page at offset {} for length {}",
+        stream.position(), stream.available());
+    lengthReader.initFromPage(valueCount, stream);
+    this.in = stream.remainingStream();
   }
 
   @Override
   public Binary readBytes() {
     int length = lengthReader.readInteger();
-    int start = offset;
-    offset = start + length;
-    return Binary.fromConstantByteBuffer(in, start, length);
+    try {
+      return Binary.fromConstantByteBuffer(in.slice(length));
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to read " + length + " bytes");
+    }
   }
 
   @Override
   public void skip() {
     int length = lengthReader.readInteger();
-    offset = offset + length;
+    try {
+      in.skipFully(length);
+    } catch (IOException e) {
+      throw new ParquetDecodingException("Failed to skip " + length + " bytes");
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
index 742b515..7a01627 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -21,6 +21,8 @@ package org.apache.parquet.column.values.deltastrings;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.values.RequiresPreviousReader;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -46,13 +48,12 @@ public class DeltaByteArrayReader extends ValuesReader implements RequiresPrevio
   }
 
   @Override
-  public void initFromPage(int valueCount, ByteBuffer page, int offset)
+  public void initFromPage(int valueCount, ByteBufferInputStream stream)
       throws IOException {
-    prefixLengthReader.initFromPage(valueCount, page, offset);
-    int next = prefixLengthReader.getNextOffset();
-    suffixReader.initFromPage(valueCount, page, next);	
+    prefixLengthReader.initFromPage(valueCount, stream);
+    suffixReader.initFromPage(valueCount, stream);
   }
-  
+
   @Override
   public void skip() {
     // read the next value to skip so that previous is correct.

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
index 19ff47c..87edda6 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -52,11 +52,12 @@ public class DictionaryValuesReader extends ValuesReader {
   }
 
   @Override
-  public void initFromPage(int valueCount, ByteBuffer page, int offset)
+  public void initFromPage(int valueCount, ByteBufferInputStream stream)
       throws IOException {
-    this.in = new ByteBufferInputStream(page, offset, page.limit() - offset);
-    if (page.limit() - offset > 0) {
-      LOG.debug("init from page at offset {} for length {}", offset, (page.limit() - offset));
+    this.in = stream.remainingStream();
+    if (in.available() > 0) {
+      LOG.debug("init from page at offset {} for length {}",
+          stream.position(), stream.available());
       int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
       LOG.debug("bit width {}", bitWidth);
       decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
index 0fa6cc6..0b8beb2 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/PlainValuesDictionary.java
@@ -26,6 +26,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 
 import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.Dictionary;
 import org.apache.parquet.column.page.DictionaryPage;
 import org.apache.parquet.column.values.plain.PlainValuesReader.DoublePlainValuesReader;
@@ -150,10 +151,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainLongDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
+      ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream();
       longDictionaryContent = new long[dictionaryPage.getDictionarySize()];
       LongPlainValuesReader longReader = new LongPlainValuesReader();
-      longReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
+      longReader.initFromPage(dictionaryPage.getDictionarySize(), in);
       for (int i = 0; i < longDictionaryContent.length; i++) {
         longDictionaryContent[i] = longReader.readLong();
       }
@@ -193,10 +194,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainDoubleDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
+      ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream();
       doubleDictionaryContent = new double[dictionaryPage.getDictionarySize()];
       DoublePlainValuesReader doubleReader = new DoublePlainValuesReader();
-      doubleReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0);
+      doubleReader.initFromPage(dictionaryPage.getDictionarySize(), in);
       for (int i = 0; i < doubleDictionaryContent.length; i++) {
         doubleDictionaryContent[i] = doubleReader.readDouble();
       }
@@ -236,10 +237,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainIntegerDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
+      ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream();
       intDictionaryContent = new int[dictionaryPage.getDictionarySize()];
       IntegerPlainValuesReader intReader = new IntegerPlainValuesReader();
-      intReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, 0);
+      intReader.initFromPage(dictionaryPage.getDictionarySize(), in);
       for (int i = 0; i < intDictionaryContent.length; i++) {
         intDictionaryContent[i] = intReader.readInteger();
       }
@@ -279,10 +280,10 @@ public abstract class PlainValuesDictionary extends Dictionary {
      */
     public PlainFloatDictionary(DictionaryPage dictionaryPage) throws IOException {
       super(dictionaryPage);
-      final ByteBuffer dictionaryByteBuf = dictionaryPage.getBytes().toByteBuffer();
+      ByteBufferInputStream in = dictionaryPage.getBytes().toInputStream();
       floatDictionaryContent = new float[dictionaryPage.getDictionarySize()];
       FloatPlainValuesReader floatReader = new FloatPlainValuesReader();
-      floatReader.initFromPage(dictionaryPage.getDictionarySize(), dictionaryByteBuf, dictionaryByteBuf.position());
+      floatReader.initFromPage(dictionaryPage.getDictionarySize(), in);
       for (int i = 0; i < floatDictionaryContent.length; i++) {
         floatDictionaryContent[i] = floatReader.readFloat();
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
index 82e5551..6411325 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BinaryPlainValuesReader.java
@@ -20,8 +20,8 @@ package org.apache.parquet.column.values.plain;
 
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
@@ -31,40 +31,37 @@ import org.slf4j.LoggerFactory;
 
 public class BinaryPlainValuesReader extends ValuesReader {
   private static final Logger LOG = LoggerFactory.getLogger(BinaryPlainValuesReader.class);
-  private ByteBuffer in;
-  private int offset;
+  private ByteBufferInputStream in;
 
   @Override
   public Binary readBytes() {
     try {
-      int length = BytesUtils.readIntLittleEndian(in, offset);
-      int start = offset + 4;
-      offset = start + length;
-      return Binary.fromConstantByteBuffer(in, start, length);
+      int length = BytesUtils.readIntLittleEndian(in);
+      return Binary.fromConstantByteBuffer(in.slice(length));
     } catch (IOException e) {
-      throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+      throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
     } catch (RuntimeException e) {
-      throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+      throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
     }
   }
 
   @Override
   public void skip() {
     try {
-      int length = BytesUtils.readIntLittleEndian(in, offset);
-      offset += 4 + length;
+      int length = BytesUtils.readIntLittleEndian(in);
+      in.skipFully(length);
     } catch (IOException e) {
-      throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+      throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
     } catch (RuntimeException e) {
-      throw new ParquetDecodingException("could not skip bytes at offset " + offset, e);
+      throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
     }
   }
 
   @Override
-  public void initFromPage(int valueCount, ByteBuffer in, int offset)
+  public void initFromPage(int valueCount, ByteBufferInputStream stream)
       throws IOException {
-    LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
-    this.in = in;
-    this.offset = offset;
+    LOG.debug("init from page at offset {} for length {}",
+        stream.position(), (stream.available() - stream.position()));
+    this.in = stream.remainingStream();
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
index 1f8fc2c..3296daa 100755
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/BooleanPlainValuesReader.java
@@ -21,8 +21,8 @@ package org.apache.parquet.column.values.plain;
 import static org.apache.parquet.column.values.bitpacking.Packer.LITTLE_ENDIAN;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.bitpacking.ByteBitPackingValuesReader;
 import org.slf4j.Logger;
@@ -60,17 +60,11 @@ public class BooleanPlainValuesReader extends ValuesReader {
 
   /**
    * {@inheritDoc}
-   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int valueCount, ByteBuffer page, int offset)
+   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBufferInputStream)
    */
   @Override
-  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
-    LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
-    this.in.initFromPage(valueCount, in, offset);
+  public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
+    LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
+    this.in.initFromPage(valueCount, stream);
   }
-  
-  @Override
-  public int getNextOffset() {
-    return this.in.getNextOffset();
-  }
-
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
index 7a14f81..7738de7 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/FixedLenByteArrayPlainValuesReader.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
@@ -33,9 +34,9 @@ import org.slf4j.LoggerFactory;
  */
 public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
   private static final Logger LOG = LoggerFactory.getLogger(FixedLenByteArrayPlainValuesReader.class);
-  private ByteBuffer in;
-  private int offset;
-  private int length;
+
+  private final int length;
+  private ByteBufferInputStream in;
 
   public FixedLenByteArrayPlainValuesReader(int length) {
     this.length = length;
@@ -44,24 +45,26 @@ public class FixedLenByteArrayPlainValuesReader extends ValuesReader {
   @Override
   public Binary readBytes() {
     try {
-      int start = offset;
-      offset = start + length;
-      return Binary.fromConstantByteBuffer(in, start, length);
-    } catch (RuntimeException e) {
-      throw new ParquetDecodingException("could not read bytes at offset " + offset, e);
+      return Binary.fromConstantByteBuffer(in.slice(length));
+    } catch (IOException | RuntimeException e) {
+      throw new ParquetDecodingException("could not read bytes at offset " + in.position(), e);
     }
   }
 
   @Override
   public void skip() {
-    offset += length;
+    try {
+      in.skipFully(length);
+    } catch (IOException | RuntimeException e) {
+      throw new ParquetDecodingException("could not skip bytes at offset " + in.position(), e);
+    }
   }
 
   @Override
-  public void initFromPage(int valueCount, ByteBuffer in, int offset)
+  public void initFromPage(int valueCount, ByteBufferInputStream stream)
       throws IOException {
-    LOG.debug("init from page at offset {} for length {}", offset, (in.limit() - offset));
-    this.in = in;
-    this.offset = offset;
+    LOG.debug("init from page at offset {} for length {}",
+        stream.position(), stream.available());
+    this.in = stream.remainingStream();
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
index e79cbb2..726f611 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/plain/PlainValuesReader.java
@@ -19,7 +19,6 @@
 package org.apache.parquet.column.values.plain;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.LittleEndianDataInputStream;
@@ -39,18 +38,10 @@ abstract public class PlainValuesReader extends ValuesReader {
 
   protected LittleEndianDataInputStream in;
 
-  /**
-   * {@inheritDoc}
-   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
-   */
   @Override
-  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
-    LOG.debug("init from page at offset {} for length {}", offset , (in.limit() - offset));
-    this.in = new LittleEndianDataInputStream(toInputStream(in, offset));
-  }
-
-  private ByteBufferInputStream toInputStream(ByteBuffer in, int offset) {
-    return new ByteBufferInputStream(in.duplicate(), offset, in.limit() - offset);
+  public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
+    LOG.debug("init from page at offset {} for length {}", stream.position(), stream.available());
+    this.in = new LittleEndianDataInputStream(stream.remainingStream());
   }
 
   public static class DoublePlainValuesReader extends PlainValuesReader {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
index 6daa349..d682a98 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridDecoder.java
@@ -22,9 +22,7 @@ package org.apache.parquet.column.values.rle;
 import java.io.DataInputStream;
 import java.io.IOException;
 import java.io.InputStream;
-import java.nio.ByteBuffer;
 
-import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.Preconditions;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.bitpacking.BytePacker;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
index 4ccf2b8..ebfa76d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridValuesReader.java
@@ -19,7 +19,6 @@
 package org.apache.parquet.column.values.rle;
 
 import java.io.IOException;
-import java.nio.ByteBuffer;
 
 import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
@@ -35,26 +34,16 @@ import org.apache.parquet.io.ParquetDecodingException;
 public class RunLengthBitPackingHybridValuesReader extends ValuesReader {
   private final int bitWidth;
   private RunLengthBitPackingHybridDecoder decoder;
-  private int nextOffset;
 
   public RunLengthBitPackingHybridValuesReader(int bitWidth) {
     this.bitWidth = bitWidth;
   }
 
   @Override
-  public void initFromPage(int valueCountL, ByteBuffer page, int offset) throws IOException {
-    ByteBufferInputStream in = new ByteBufferInputStream(page, offset, page.limit() - offset);
-    int length = BytesUtils.readIntLittleEndian(in);
-
-    decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
-
-    // 4 is for the length which is stored as 4 bytes little endian
-    this.nextOffset = offset + length + 4;
-  }
-  
-  @Override
-  public int getNextOffset() {
-    return this.nextOffset;
+  public void initFromPage(int valueCountL, ByteBufferInputStream stream) throws IOException {
+    int length = BytesUtils.readIntLittleEndian(stream);
+    this.decoder = new RunLengthBitPackingHybridDecoder(
+        bitWidth, stream.sliceStream(length));
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
index f8ff8d0..fe00de9 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/rle/ZeroIntegerValuesReader.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.rle;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 
 /**
@@ -30,20 +31,12 @@ import org.apache.parquet.column.values.ValuesReader;
  */
 public class ZeroIntegerValuesReader extends ValuesReader {
   
-  private int nextOffset;
-
   public int readInteger() {
     return 0;
   }
 
   @Override
-  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
-    this.nextOffset = offset;
-  }
-  
-  @Override
-  public int getNextOffset() {
-    return nextOffset;
+  public void initFromPage(int valueCount, ByteBufferInputStream stream) throws IOException {
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
index 1f39d95..5bcbb88 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/impl/TestCorruptDeltaByteArrays.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.impl;
 import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.SemanticVersion;
 import org.apache.parquet.VersionParser.ParsedVersion;
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Encoding;
@@ -100,13 +101,13 @@ public class TestCorruptDeltaByteArrays {
     ByteBuffer corruptPageBytes = writer.getBytes().toByteBuffer();
 
     DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
-    firstPageReader.initFromPage(10, firstPageBytes, 0);
+    firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes));
     for (int i = 0; i < 10; i += 1) {
-      assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
+      assertEquals(str(i), firstPageReader.readBytes().toStringUsingUTF8());
     }
 
     DeltaByteArrayReader corruptPageReader = new DeltaByteArrayReader();
-    corruptPageReader.initFromPage(10, corruptPageBytes, 0);
+    corruptPageReader.initFromPage(10, ByteBufferInputStream.wrap(corruptPageBytes));
     try {
       corruptPageReader.readBytes();
       fail("Corrupt page did not throw an exception when read");
@@ -115,7 +116,7 @@ public class TestCorruptDeltaByteArrays {
     }
 
     DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
-    secondPageReader.initFromPage(10, corruptPageBytes, 0);
+    secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(corruptPageBytes));
     secondPageReader.setPreviousReader(firstPageReader);
 
     for (int i = 10; i < 20; i += 1) {
@@ -140,13 +141,13 @@ public class TestCorruptDeltaByteArrays {
     ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer();
 
     DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
-    firstPageReader.initFromPage(10, firstPageBytes, 0);
+    firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes));
     for (int i = 0; i < 10; i += 1) {
       assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
     }
 
     DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
-    secondPageReader.initFromPage(10, secondPageBytes, 0);
+    secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(secondPageBytes));
     secondPageReader.setPreviousReader(firstPageReader);
 
     for (int i = 10; i < 20; i += 1) {
@@ -171,13 +172,13 @@ public class TestCorruptDeltaByteArrays {
     ByteBuffer secondPageBytes = writer.getBytes().toByteBuffer();
 
     DeltaByteArrayReader firstPageReader = new DeltaByteArrayReader();
-    firstPageReader.initFromPage(10, firstPageBytes, 0);
+    firstPageReader.initFromPage(10, ByteBufferInputStream.wrap(firstPageBytes));
     for (int i = 0; i < 10; i += 1) {
       assertEquals(firstPageReader.readBytes().toStringUsingUTF8(), str(i));
     }
 
     DeltaByteArrayReader secondPageReader = new DeltaByteArrayReader();
-    secondPageReader.initFromPage(10, secondPageBytes, 0);
+    secondPageReader.initFromPage(10, ByteBufferInputStream.wrap(secondPageBytes));
 
     for (int i = 10; i < 20; i += 1) {
       assertEquals(secondPageReader.readBytes().toStringUsingUTF8(), str(i));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
index 8caad2b..248e039 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/Utils.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.io.api.Binary;
 
 /**
@@ -59,33 +60,23 @@ public class Utils {
     }
   }
 
-  public static Binary[] readData(ValuesReader reader, byte[] data, int offset, int length)
+  public static Binary[] readData(ValuesReader reader, ByteBufferInputStream stream, int length)
       throws IOException {
     Binary[] bins = new Binary[length];
-    reader.initFromPage(length, ByteBuffer.wrap(data), 0);
+    reader.initFromPage(length, stream);
     for(int i=0; i < length; i++) {
       bins[i] = reader.readBytes();
     }
     return bins;
   }
-  
-  public static Binary[] readData(ValuesReader reader, byte[] data, int length)
-      throws IOException {
-    return readData(reader, data, 0, length);
-  }
-  
-  public static int[] readInts(ValuesReader reader, byte[] data, int offset, int length)
+
+  public static int[] readInts(ValuesReader reader, ByteBufferInputStream stream, int length)
       throws IOException {
     int[] ints = new int[length];
-    reader.initFromPage(length, ByteBuffer.wrap(data), offset);
+    reader.initFromPage(length, stream);
     for(int i=0; i < length; i++) {
       ints[i] = reader.readInteger();
     }
     return ints;
   }
-  
-  public static int[] readInts(ValuesReader reader, byte[] data, int length)
-      throws IOException {
-    return readInts(reader, data, 0, length);
-  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
index 2733b72..656623c 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/BitPackingPerfTest.java
@@ -22,6 +22,7 @@ import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.bitpacking.BitPacking.BitPackingWriter;
 
@@ -88,7 +89,7 @@ public class BitPackingPerfTest {
     System.out.print(" no gc <");
     for (int k = 0; k < N; k++) {
       long t2 = System.nanoTime();
-      r.initFromPage(result.length, ByteBuffer.wrap(bytes), 0);
+      r.initFromPage(result.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)));
       for (int i = 0; i < result.length; i++) {
         result[i] = r.readInteger();
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
index d83628a..867af28 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/bitpacking/TestBitPackingColumn.java
@@ -25,6 +25,7 @@ import static org.apache.parquet.column.values.bitpacking.Packer.BIG_ENDIAN;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.junit.Test;
 
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
@@ -175,7 +176,7 @@ public class TestBitPackingColumn {
       LOG.debug("bytes: {}", TestBitPacking.toString(bytes));
       assertEquals(type.toString(), expected, TestBitPacking.toString(bytes));
       ValuesReader r = type.getReader(bound);
-      r.initFromPage(vals.length, ByteBuffer.wrap(bytes), 0);
+      r.initFromPage(vals.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(bytes)));
       int[] result = new int[vals.length];
       for (int i = 0; i < result.length; i++) {
         result[i] = r.readInteger();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
index a3bec4a..ff4a308 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForIntegerTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -143,7 +144,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
   }
 
   @Test
-  public void shouldReturnCorrectOffsetAfterInitialization() throws IOException {
+  public void shouldConsumePageDataInInitialization() throws IOException {
     int[] data = new int[2 * blockSize + 3];
     for (int i = 0; i < data.length; i++) {
       data[i] = i * 32;
@@ -157,12 +158,14 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
     int contentOffsetInPage = 33;
     System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
 
-    //offset should be correct
-    reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
-    int offset= reader.getNextOffset();
+    // offset should be correct
+    ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(pageContent));
+    stream.skipFully(contentOffsetInPage);
+    reader.initFromPage(100, stream);
+    long offset = stream.position();
     assertEquals(valueContent.length + contentOffsetInPage, offset);
 
-    //should be able to read data correclty
+    // should be able to read data correctly
     for (int i : data) {
       assertEquals(i, reader.readInteger());
     }
@@ -191,7 +194,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
     }
     writeData(data);
     reader = new DeltaBinaryPackingValuesReader();
-    reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
+    reader.initFromPage(100, writer.getBytes().toInputStream());
     for (int i = 0; i < data.length; i++) {
       if (i % 3 == 0) {
         reader.skip();
@@ -247,7 +250,7 @@ public class DeltaBinaryPackingValuesWriterForIntegerTest {
         + blockFlushed * miniBlockNum //bitWidth of mini blocks
         + (5.0 * blockFlushed);//min delta for each block
     assertTrue(estimatedSize >= page.length);
-    reader.initFromPage(100, ByteBuffer.wrap(page), 0);
+    reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page)));
 
     for (int i = 0; i < length; i++) {
       assertEquals(data[i], reader.readInteger());

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
index 34e1800..795a591 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriterForLongTest.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.junit.Before;
 import org.junit.Test;
 
@@ -157,12 +158,14 @@ public class DeltaBinaryPackingValuesWriterForLongTest {
     int contentOffsetInPage = 33;
     System.arraycopy(valueContent, 0, pageContent, contentOffsetInPage, valueContent.length);
 
-    //offset should be correct
-    reader.initFromPage(100, ByteBuffer.wrap(pageContent), contentOffsetInPage);
-    int offset = reader.getNextOffset();
+    // offset should be correct
+    ByteBufferInputStream stream = ByteBufferInputStream.wrap(ByteBuffer.wrap(pageContent));
+    stream.skipFully(contentOffsetInPage);
+    reader.initFromPage(100, stream);
+    long offset = stream.position();
     assertEquals(valueContent.length + contentOffsetInPage, offset);
 
-    //should be able to read data correclty
+    // should be able to read data correctly
     for (long i : data) {
       assertEquals(i, reader.readLong());
     }
@@ -190,7 +193,7 @@ public class DeltaBinaryPackingValuesWriterForLongTest {
     }
     writeData(data);
     reader = new DeltaBinaryPackingValuesReader();
-    reader.initFromPage(100, writer.getBytes().toByteBuffer(), 0);
+    reader.initFromPage(100, writer.getBytes().toInputStream());
     for (int i = 0; i < data.length; i++) {
       if (i % 3 == 0) {
         reader.skip();
@@ -244,7 +247,7 @@ public class DeltaBinaryPackingValuesWriterForLongTest {
         + blockFlushed * miniBlockNum //bitWidth of mini blocks
         + (10.0 * blockFlushed);//min delta for each block
     assertTrue(estimatedSize >= page.length);
-    reader.initFromPage(100, page, 0);
+    reader.initFromPage(100, ByteBufferInputStream.wrap(ByteBuffer.wrap(page)));
 
     for (int i = 0; i < length; i++) {
       assertEquals(data[i], reader.readLong());

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
index 488208c..ba5d771 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/delta/benchmark/BenchmarkReadingRandomIntegers.java
@@ -22,6 +22,7 @@ import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Random;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.ValuesWriter;
@@ -91,7 +92,7 @@ public class BenchmarkReadingRandomIntegers {
   }
 
   private void readData(ValuesReader reader, byte[] deltaBytes) throws IOException {
-    reader.initFromPage(data.length, ByteBuffer.wrap(deltaBytes), 0);
+    reader.initFromPage(data.length, ByteBufferInputStream.wrap(ByteBuffer.wrap(deltaBytes)));
     for (int i = 0; i < data.length; i++) {
       reader.readInteger();
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
index d7ebee5..d214a88 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/TestDeltaLengthByteArray.java
@@ -43,7 +43,7 @@ public class TestDeltaLengthByteArray {
     DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
     
     Utils.writeData(writer, values);
-    Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
+    Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length);
 
     for(int i =0; i< bin.length ; i++) {
       Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
@@ -57,7 +57,7 @@ public class TestDeltaLengthByteArray {
 
     String[] values = Utils.getRandomStringSamples(1000, 32);
     Utils.writeData(writer, values);
-    Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), values.length);
+    Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), values.length);
 
     for(int i =0; i< bin.length ; i++) {
       Assert.assertEquals(Binary.fromString(values[i]), bin[i]);
@@ -70,7 +70,7 @@ public class TestDeltaLengthByteArray {
     ValuesReader reader = new DeltaBinaryPackingValuesReader();
 
     Utils.writeData(writer, values);
-    int[] bin = Utils.readInts(reader, writer.getBytes().toByteArray(), values.length);
+    int[] bin = Utils.readInts(reader, writer.getBytes().toInputStream(), values.length);
 
     for(int i =0; i< bin.length ; i++) {
       Assert.assertEquals(values[i].length(), bin[i]);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
index 69c5e15..08d04e6 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltalengthbytearray/benchmark/BenchmarkDeltaLengthByteArray.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.deltalengthbytearray.benchmark;
 
 import java.io.IOException;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -52,9 +53,9 @@ public class BenchmarkDeltaLengthByteArray {
     BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
 
     Utils.writeData(writer, values);
-    byte [] data = writer.getBytes().toByteArray();
+    ByteBufferInputStream data = writer.getBytes().toInputStream();
     Binary[] bin = Utils.readData(reader, data, values.length);
-    System.out.println("size " + data.length);
+    System.out.println("size " + data.position());
   }
 
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@@ -64,9 +65,9 @@ public class BenchmarkDeltaLengthByteArray {
     DeltaLengthByteArrayValuesReader reader = new DeltaLengthByteArrayValuesReader();
 
     Utils.writeData(writer, values);
-    byte [] data = writer.getBytes().toByteArray();
+    ByteBufferInputStream data = writer.getBytes().toInputStream();
     Binary[] bin = Utils.readData(reader, data, values.length);
-    System.out.println("size " + data.length);
+    System.out.println("size " + data.position());
   }
 
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
index 4f8f40c..c13a3a2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.deltastrings;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.junit.Test;
 import org.junit.Assert;
 
@@ -63,7 +64,7 @@ public class TestDeltaByteArray {
     ValuesReader reader = new DeltaBinaryPackingValuesReader();
 
     Utils.writeData(writer, values);
-    byte[] data = writer.getBytes().toByteArray();
+    ByteBufferInputStream data = writer.getBytes().toInputStream();
     int[] bin = Utils.readInts(reader, data, values.length);
 
     // test prefix lengths
@@ -71,9 +72,8 @@ public class TestDeltaByteArray {
     Assert.assertEquals(7, bin[1]);
     Assert.assertEquals(7, bin[2]);
 
-    int offset = reader.getNextOffset();
     reader = new DeltaBinaryPackingValuesReader();
-    bin = Utils.readInts(reader, writer.getBytes().toByteArray(), offset, values.length);
+    bin = Utils.readInts(reader, data, values.length);
     // test suffix lengths
     Assert.assertEquals(10, bin[0]);
     Assert.assertEquals(0, bin[1]);
@@ -82,7 +82,7 @@ public class TestDeltaByteArray {
 
   private void assertReadWrite(DeltaByteArrayWriter writer, DeltaByteArrayReader reader, String[] vals) throws Exception {
     Utils.writeData(writer, vals);
-    Binary[] bin = Utils.readData(reader, writer.getBytes().toByteArray(), vals.length);
+    Binary[] bin = Utils.readData(reader, writer.getBytes().toInputStream(), vals.length);
 
     for(int i = 0; i< bin.length ; i++) {
       Assert.assertEquals(Binary.fromString(vals[i]), bin[i]);
@@ -92,7 +92,7 @@ public class TestDeltaByteArray {
   private void assertReadWriteWithSkip(DeltaByteArrayWriter writer, DeltaByteArrayReader reader, String[] vals) throws Exception {
     Utils.writeData(writer, vals);
 
-    reader.initFromPage(vals.length, writer.getBytes().toByteBuffer(), 0);
+    reader.initFromPage(vals.length, writer.getBytes().toInputStream());
     for (int i = 0; i < vals.length; i += 2) {
       Assert.assertEquals(Binary.fromString(vals[i]), reader.readBytes());
       reader.skip();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
index eac4bd2..53578f0 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.deltastrings.benchmark;
 import java.io.IOException;
 import java.util.Arrays;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.junit.Rule;
 import org.junit.Test;
 
@@ -59,9 +60,9 @@ public class BenchmarkDeltaByteArray {
     BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
 
     Utils.writeData(writer, values);
-    byte [] data = writer.getBytes().toByteArray();
+    ByteBufferInputStream data = writer.getBytes().toInputStream();
     Binary[] bin = Utils.readData(reader, data, values.length);
-    System.out.println("size " + data.length);
+    System.out.println("size " + data.position());
   }
 
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@@ -71,9 +72,9 @@ public class BenchmarkDeltaByteArray {
     DeltaByteArrayReader reader = new DeltaByteArrayReader();
 
     Utils.writeData(writer, values);
-    byte [] data = writer.getBytes().toByteArray();
+    ByteBufferInputStream data = writer.getBytes().toInputStream();
     Binary[] bin = Utils.readData(reader, data, values.length);
-    System.out.println("size " + data.length);
+    System.out.println("size " + data.position());
   }
 
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@@ -83,9 +84,9 @@ public class BenchmarkDeltaByteArray {
     BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
 
     Utils.writeData(writer, sortedVals);
-    byte [] data = writer.getBytes().toByteArray();
+    ByteBufferInputStream data = writer.getBytes().toInputStream();
     Binary[] bin = Utils.readData(reader, data, values.length);
-    System.out.println("size " + data.length);
+    System.out.println("size " + data.position());
   }
 
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
@@ -95,8 +96,8 @@ public class BenchmarkDeltaByteArray {
     DeltaByteArrayReader reader = new DeltaByteArrayReader();
 
     Utils.writeData(writer, sortedVals);
-    byte [] data = writer.getBytes().toByteArray();
+    ByteBufferInputStream data = writer.getBytes().toInputStream();
     Binary[] bin = Utils.readData(reader, data, values.length);
-    System.out.println("size " + data.length);
+    System.out.println("size " + data.position());
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
index ada1c93..cf66982 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
@@ -30,6 +30,7 @@ import java.io.IOException;
 import java.io.UnsupportedEncodingException;
 import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.junit.Assert;
 import org.junit.Test;
 
@@ -118,7 +119,7 @@ public class TestDictionary {
 
     //Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
     ValuesReader reader = new BinaryPlainValuesReader();
-    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+    reader.initFromPage(100, cw.getBytes().toInputStream());
 
     for (long i = 0; i < 100; i++) {
       assertEquals(Binary.fromString("str" + i), reader.readBytes());
@@ -204,13 +205,13 @@ public class TestDictionary {
 
     DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64);
 
-    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
+    cr.initFromPage(COUNT, bytes1.toInputStream());
     for (long i = 0; i < COUNT; i++) {
       long back = cr.readLong();
       assertEquals(i % 50, back);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
+    cr.initFromPage(COUNT2, bytes2.toInputStream());
     for (long i = COUNT2; i > 0; i--) {
       long back = cr.readLong();
       assertEquals(i % 50, back);
@@ -228,7 +229,7 @@ public class TestDictionary {
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+    reader.initFromPage(100, cw.getBytes().toInputStream());
 
     for (long i = 0; i < 100; i++) {
       assertEquals(i, reader.readLong());
@@ -274,13 +275,13 @@ public class TestDictionary {
 
     final DictionaryValuesReader cr = initDicReader(cw, DOUBLE);
 
-    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
+    cr.initFromPage(COUNT, bytes1.toInputStream());
     for (double i = 0; i < COUNT; i++) {
       double back = cr.readDouble();
       assertEquals(i % 50, back, 0.0);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
+    cr.initFromPage(COUNT2, bytes2.toInputStream());
     for (double i = COUNT2; i > 0; i--) {
       double back = cr.readDouble();
       assertEquals(i % 50, back, 0.0);
@@ -299,7 +300,7 @@ public class TestDictionary {
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+    reader.initFromPage(100, cw.getBytes().toInputStream());
 
     for (double i = 0; i < 100; i++) {
       assertEquals(i, reader.readDouble(), 0.00001);
@@ -345,13 +346,13 @@ public class TestDictionary {
 
     DictionaryValuesReader cr = initDicReader(cw, INT32);
 
-    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
+    cr.initFromPage(COUNT, bytes1.toInputStream());
     for (int i = 0; i < COUNT; i++) {
       int back = cr.readInteger();
       assertEquals(i % 50, back);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
+    cr.initFromPage(COUNT2, bytes2.toInputStream());
     for (int i = COUNT2; i > 0; i--) {
       int back = cr.readInteger();
       assertEquals(i % 50, back);
@@ -370,7 +371,7 @@ public class TestDictionary {
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+    reader.initFromPage(100, cw.getBytes().toInputStream());
 
     for (int i = 0; i < 100; i++) {
       assertEquals(i, reader.readInteger());
@@ -416,13 +417,13 @@ public class TestDictionary {
 
     DictionaryValuesReader cr = initDicReader(cw, FLOAT);
 
-    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
+    cr.initFromPage(COUNT, bytes1.toInputStream());
     for (float i = 0; i < COUNT; i++) {
       float back = cr.readFloat();
       assertEquals(i % 50, back, 0.0f);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
+    cr.initFromPage(COUNT2, bytes2.toInputStream());
     for (float i = COUNT2; i > 0; i--) {
       float back = cr.readFloat();
       assertEquals(i % 50, back, 0.0f);
@@ -441,7 +442,7 @@ public class TestDictionary {
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
+    reader.initFromPage(100, cw.getBytes().toInputStream());
 
     for (float i = 0; i < 100; i++) {
       assertEquals(i, reader.readFloat(), 0.00001);
@@ -476,8 +477,9 @@ public class TestDictionary {
 
     // pretend there are 100 nulls. what matters is offset = bytes.length.
     ByteBuffer bytes = ByteBuffer.wrap(new byte[] {0x00, 0x01, 0x02, 0x03}); // data doesn't matter
-    int offset = bytes.remaining();
-    reader.initFromPage(100, bytes, offset);
+    ByteBufferInputStream stream = ByteBufferInputStream.wrap(bytes);
+    stream.skipFully(stream.available());
+    reader.initFromPage(100, stream);
   }
 
   private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type)
@@ -490,14 +492,14 @@ public class TestDictionary {
   }
 
   private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
-    cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
+    cr.initFromPage(COUNT, bytes.toInputStream());
     for (int i = 0; i < COUNT; i++) {
       Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8());
     }
   }
 
   private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
-    cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
+    cr.initFromPage(COUNT, bytes.toInputStream());
     for (int i = 0; i < COUNT; i++) {
       Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8());
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
index 712fb27..173d8fa 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
@@ -72,7 +72,7 @@ public class RunLengthBitPackingHybridIntegrationTest {
     numValues += 1000;
 
     ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer();
-    ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes);
+    ByteBufferInputStream in = ByteBufferInputStream.wrap(encodedBytes);
 
     RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
index 5696d7b..dd329c0 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
@@ -21,14 +21,12 @@ package org.apache.parquet.column.values.rle;
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayInputStream;
-import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.junit.Test;
 
-import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.bitpacking.BytePacker;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/8bbc6cb9/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
index 5b3b853..fc92b6b 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
@@ -16,67 +16,57 @@
  * specific language governing permissions and limitations
  * under the License.
  */
+
 package org.apache.parquet.bytes;
 
+import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.List;
 
-/**
- * This ByteBufferInputStream does not consume the ByteBuffer being passed in, 
- * but will create a slice of the current buffer.
- */
-public class ByteBufferInputStream extends InputStream {
-	
-  protected ByteBuffer byteBuf;
-  protected int initPos;
-  protected int count;
-  public ByteBufferInputStream(ByteBuffer buffer) {
-    this(buffer, buffer.position(), buffer.remaining());
-  }
-  
-  public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) {
-    ByteBuffer temp = buffer.duplicate();
-    temp.position(offset);
-    byteBuf = temp.slice();
-    byteBuf.limit(count);
-    this.initPos = offset;
-    this.count = count;
-  }
-  
-  public ByteBuffer toByteBuffer() {
-    return byteBuf.slice();
+public abstract class ByteBufferInputStream extends InputStream {
+
+  public static ByteBufferInputStream wrap(ByteBuffer... buffers) {
+    if (buffers.length == 1) {
+      return new SingleBufferInputStream(buffers[0]);
+    } else {
+      return new MultiBufferInputStream(Arrays.asList(buffers));
+    }
   }
-  
-  @Override
-  public int read() throws IOException {
-    if (!byteBuf.hasRemaining()) {
-    	return -1;
+
+  public static ByteBufferInputStream wrap(List<ByteBuffer> buffers) {
+    if (buffers.size() == 1) {
+      return new SingleBufferInputStream(buffers.get(0));
+    } else {
+      return new MultiBufferInputStream(buffers);
     }
-    //Workaround for unsigned byte
-    return byteBuf.get() & 0xFF;
   }
 
-  @Override
-  public int read(byte[] bytes, int offset, int length) throws IOException {
-    int count = Math.min(byteBuf.remaining(), length);
-    if (count == 0) return -1;
-    byteBuf.get(bytes, offset, count);
-    return count;
+  public abstract long position();
+
+  public void skipFully(long n) throws IOException {
+    long skipped = skip(n);
+    if (skipped < n) {
+      throw new EOFException(
+          "Not enough bytes to skip: " + skipped + " < " + n);
+    }
   }
-  
-  @Override
-  public long skip(long n) {
-	  if (n > byteBuf.remaining())
-	    n = byteBuf.remaining();
-	  int pos = byteBuf.position();
-	  byteBuf.position((int)(pos + n));
-	  return n;
+
+  public abstract int read(ByteBuffer out);
+
+  public abstract ByteBuffer slice(int length) throws EOFException;
+
+  public abstract List<ByteBuffer> sliceBuffers(long length) throws EOFException;
+
+  public ByteBufferInputStream sliceStream(long length) throws EOFException {
+    return ByteBufferInputStream.wrap(sliceBuffers(length));
   }
 
+  public abstract List<ByteBuffer> remainingBuffers();
 
-  @Override
-  public int available() {
-    return byteBuf.remaining();
+  public ByteBufferInputStream remainingStream() {
+    return ByteBufferInputStream.wrap(remainingBuffers());
   }
 }