You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2015/11/04 18:57:34 UTC

[2/4] parquet-mr git commit: PARQUET-77: ByteBuffer use in read and write paths

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
index b62ef84..770f4dc 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/TestDeltaByteArray.java
@@ -23,6 +23,7 @@ import java.io.IOException;
 import org.junit.Test;
 import org.junit.Assert;
 
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.Utils;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
@@ -35,7 +36,7 @@ public class TestDeltaByteArray {
 
   @Test
   public void testSerialization () throws Exception {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     DeltaByteArrayReader reader = new DeltaByteArrayReader();
 
     assertReadWrite(writer, reader, values);
@@ -43,14 +44,14 @@ public class TestDeltaByteArray {
 
   @Test
   public void testRandomStrings() throws Exception {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     DeltaByteArrayReader reader = new DeltaByteArrayReader();
     assertReadWrite(writer, reader, randvalues);
   }
 
   @Test
   public void testLengths() throws IOException {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     ValuesReader reader = new DeltaBinaryPackingValuesReader();
 
     Utils.writeData(writer, values);
@@ -82,7 +83,7 @@ public class TestDeltaByteArray {
 
   @Test
   public void testWriterReset() throws Exception {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
 
     assertReadWrite(writer, new DeltaByteArrayReader(), values);
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
index c61ef30..eac4bd2 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/deltastrings/benchmark/BenchmarkDeltaByteArray.java
@@ -24,6 +24,7 @@ import java.util.Arrays;
 import org.junit.Rule;
 import org.junit.Test;
 
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.column.values.Utils;
 import org.apache.parquet.column.values.deltastrings.DeltaByteArrayReader;
 import org.apache.parquet.column.values.deltastrings.DeltaByteArrayWriter;
@@ -54,7 +55,7 @@ public class BenchmarkDeltaByteArray {
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
   @Test
   public void benchmarkRandomStringsWithPlainValuesWriter() throws IOException {
-    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
 
     Utils.writeData(writer, values);
@@ -66,7 +67,7 @@ public class BenchmarkDeltaByteArray {
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
   @Test
   public void benchmarkRandomStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     DeltaByteArrayReader reader = new DeltaByteArrayReader();
 
     Utils.writeData(writer, values);
@@ -78,7 +79,7 @@ public class BenchmarkDeltaByteArray {
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
   @Test
   public void benchmarkSortedStringsWithPlainValuesWriter() throws IOException {
-    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024);
+    PlainValuesWriter writer = new PlainValuesWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     BinaryPlainValuesReader reader = new BinaryPlainValuesReader();
 
     Utils.writeData(writer, sortedVals);
@@ -90,7 +91,7 @@ public class BenchmarkDeltaByteArray {
   @BenchmarkOptions(benchmarkRounds = 20, warmupRounds = 4)
   @Test
   public void benchmarkSortedStringsWithDeltaLengthByteArrayValuesWriter() throws IOException {
-    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024);
+    DeltaByteArrayWriter writer = new DeltaByteArrayWriter(64 * 1024, 64 * 1024, new DirectByteBufferAllocator());
     DeltaByteArrayReader reader = new DeltaByteArrayReader();
 
     Utils.writeData(writer, sortedVals);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
index 020868e..ada1c93 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/dictionary/TestDictionary.java
@@ -28,10 +28,12 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 
 import java.io.IOException;
 import java.io.UnsupportedEncodingException;
+import java.nio.ByteBuffer;
 
 import org.junit.Assert;
 import org.junit.Test;
 
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.Dictionary;
@@ -54,27 +56,27 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
 public class TestDictionary {
 
   private <I extends DictionaryValuesWriter> FallbackValuesWriter<I, PlainValuesWriter> plainFallBack(I dvw, int initialSize) {
-    return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5));
+    return FallbackValuesWriter.of(dvw, new PlainValuesWriter(initialSize, initialSize * 5, new DirectByteBufferAllocator()));
   }
 
   private FallbackValuesWriter<PlainBinaryDictionaryValuesWriter, PlainValuesWriter> newPlainBinaryDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-    return plainFallBack(new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+    return plainFallBack(new PlainBinaryDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
   }
 
   private FallbackValuesWriter<PlainLongDictionaryValuesWriter, PlainValuesWriter> newPlainLongDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-    return plainFallBack(new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+    return plainFallBack(new PlainLongDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
   }
 
   private FallbackValuesWriter<PlainIntegerDictionaryValuesWriter, PlainValuesWriter> newPlainIntegerDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-    return plainFallBack(new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+    return plainFallBack(new PlainIntegerDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
   }
 
   private FallbackValuesWriter<PlainDoubleDictionaryValuesWriter, PlainValuesWriter> newPlainDoubleDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-    return plainFallBack(new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+    return plainFallBack(new PlainDoubleDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
   }
 
   private FallbackValuesWriter<PlainFloatDictionaryValuesWriter, PlainValuesWriter> newPlainFloatDictionaryValuesWriter(int maxDictionaryByteSize, int initialSize) {
-    return plainFallBack(new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY), initialSize);
+    return plainFallBack(new PlainFloatDictionaryValuesWriter(maxDictionaryByteSize, PLAIN_DICTIONARY, PLAIN_DICTIONARY, new DirectByteBufferAllocator()), initialSize);
   }
 
   @Test
@@ -116,7 +118,7 @@ public class TestDictionary {
 
     //Fallbacked to Plain encoding, therefore use PlainValuesReader to read it back
     ValuesReader reader = new BinaryPlainValuesReader();
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (long i = 0; i < 100; i++) {
       assertEquals(Binary.fromString("str" + i), reader.readBytes());
@@ -202,13 +204,13 @@ public class TestDictionary {
 
     DictionaryValuesReader cr = initDicReader(cw, PrimitiveTypeName.INT64);
 
-    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
     for (long i = 0; i < COUNT; i++) {
       long back = cr.readLong();
       assertEquals(i % 50, back);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
     for (long i = COUNT2; i > 0; i--) {
       long back = cr.readLong();
       assertEquals(i % 50, back);
@@ -226,7 +228,7 @@ public class TestDictionary {
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (long i = 0; i < 100; i++) {
       assertEquals(i, reader.readLong());
@@ -272,13 +274,13 @@ public class TestDictionary {
 
     final DictionaryValuesReader cr = initDicReader(cw, DOUBLE);
 
-    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
     for (double i = 0; i < COUNT; i++) {
       double back = cr.readDouble();
       assertEquals(i % 50, back, 0.0);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
     for (double i = COUNT2; i > 0; i--) {
       double back = cr.readDouble();
       assertEquals(i % 50, back, 0.0);
@@ -297,7 +299,7 @@ public class TestDictionary {
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (double i = 0; i < 100; i++) {
       assertEquals(i, reader.readDouble(), 0.00001);
@@ -343,13 +345,13 @@ public class TestDictionary {
 
     DictionaryValuesReader cr = initDicReader(cw, INT32);
 
-    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
     for (int i = 0; i < COUNT; i++) {
       int back = cr.readInteger();
       assertEquals(i % 50, back);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
     for (int i = COUNT2; i > 0; i--) {
       int back = cr.readInteger();
       assertEquals(i % 50, back);
@@ -368,7 +370,7 @@ public class TestDictionary {
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (int i = 0; i < 100; i++) {
       assertEquals(i, reader.readInteger());
@@ -414,13 +416,13 @@ public class TestDictionary {
 
     DictionaryValuesReader cr = initDicReader(cw, FLOAT);
 
-    cr.initFromPage(COUNT, bytes1.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes1.toByteBuffer(), 0);
     for (float i = 0; i < COUNT; i++) {
       float back = cr.readFloat();
       assertEquals(i % 50, back, 0.0f);
     }
 
-    cr.initFromPage(COUNT2, bytes2.toByteArray(), 0);
+    cr.initFromPage(COUNT2, bytes2.toByteBuffer(), 0);
     for (float i = COUNT2; i > 0; i--) {
       float back = cr.readFloat();
       assertEquals(i % 50, back, 0.0f);
@@ -439,7 +441,7 @@ public class TestDictionary {
       }
     }
 
-    reader.initFromPage(100, cw.getBytes().toByteArray(), 0);
+    reader.initFromPage(100, cw.getBytes().toByteBuffer(), 0);
 
     for (float i = 0; i < 100; i++) {
       assertEquals(i, reader.readFloat(), 0.00001);
@@ -473,14 +475,14 @@ public class TestDictionary {
     DictionaryValuesReader reader = initDicReader(cw, INT32);
 
     // pretend there are 100 nulls. what matters is offset = bytes.length.
-    byte[] bytes = {0x00, 0x01, 0x02, 0x03}; // data doesn't matter
-    int offset = bytes.length;
+    ByteBuffer bytes = ByteBuffer.wrap(new byte[] {0x00, 0x01, 0x02, 0x03}); // data doesn't matter
+    int offset = bytes.remaining();
     reader.initFromPage(100, bytes, offset);
   }
 
   private DictionaryValuesReader initDicReader(ValuesWriter cw, PrimitiveTypeName type)
       throws IOException {
-    final DictionaryPage dictionaryPage = cw.createDictionaryPage().copy();
+    final DictionaryPage dictionaryPage = cw.toDictPageAndClose().copy();
     final ColumnDescriptor descriptor = new ColumnDescriptor(new String[] {"foo"}, type, 0, 0);
     final Dictionary dictionary = PLAIN.initDictionary(descriptor, dictionaryPage);
     final DictionaryValuesReader cr = new DictionaryValuesReader(dictionary);
@@ -488,14 +490,14 @@ public class TestDictionary {
   }
 
   private void checkDistinct(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
-    cr.initFromPage(COUNT, bytes.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
     for (int i = 0; i < COUNT; i++) {
       Assert.assertEquals(prefix + i, cr.readBytes().toStringUsingUTF8());
     }
   }
 
   private void checkRepeated(int COUNT, BytesInput bytes, ValuesReader cr, String prefix) throws IOException {
-    cr.initFromPage(COUNT, bytes.toByteArray(), 0);
+    cr.initFromPage(COUNT, bytes.toByteBuffer(), 0);
     for (int i = 0; i < COUNT; i++) {
       Assert.assertEquals(prefix + i % 10, cr.readBytes().toStringUsingUTF8());
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
index 707a507..712fb27 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/RunLengthBitPackingHybridIntegrationTest.java
@@ -18,9 +18,11 @@
  */
 package org.apache.parquet.column.values.rle;
 
-import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.junit.Test;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 
 import static org.junit.Assert.assertEquals;
 
@@ -39,7 +41,7 @@ public class RunLengthBitPackingHybridIntegrationTest {
   private void doIntegrationTest(int bitWidth) throws Exception {
     long modValue = 1L << bitWidth;
 
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000);
+    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(bitWidth, 1000, 64000, new DirectByteBufferAllocator());
     int numValues = 0;
 
     for (int i = 0; i < 100; i++) {
@@ -69,8 +71,8 @@ public class RunLengthBitPackingHybridIntegrationTest {
     }
     numValues += 1000;
 
-    byte[] encodedBytes = encoder.toBytes().toByteArray();
-    ByteArrayInputStream in = new ByteArrayInputStream(encodedBytes);
+    ByteBuffer encodedBytes = encoder.toBytes().toByteBuffer();
+    ByteBufferInputStream in = new ByteBufferInputStream(encodedBytes);
 
     RunLengthBitPackingHybridDecoder decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
index 06664de..5696d7b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
+++ b/parquet-column/src/test/java/org/apache/parquet/column/values/rle/TestRunLengthBitPackingHybridEncoder.java
@@ -21,12 +21,15 @@ package org.apache.parquet.column.values.rle;
 import static org.junit.Assert.assertEquals;
 
 import java.io.ByteArrayInputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 
 import org.junit.Test;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
+import org.apache.parquet.bytes.DirectByteBufferAllocator;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.bitpacking.BytePacker;
 import org.apache.parquet.column.values.bitpacking.Packer;
@@ -36,9 +39,19 @@ import org.apache.parquet.column.values.bitpacking.Packer;
  */
 public class TestRunLengthBitPackingHybridEncoder {
 
+  private RunLengthBitPackingHybridEncoder getRunLengthBitPackingHybridEncoder() {
+    return getRunLengthBitPackingHybridEncoder(3, 5, 10);
+  }
+
+  private RunLengthBitPackingHybridEncoder getRunLengthBitPackingHybridEncoder(
+      int bitWidth, int initialCapacity, int pageSize) {
+    return new RunLengthBitPackingHybridEncoder(bitWidth, initialCapacity,
+        pageSize, new DirectByteBufferAllocator());
+  }
+
   @Test
   public void testRLEOnly() throws Exception {
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+    RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
     for (int i = 0; i < 100; i++) {
       encoder.writeInt(4);
     }
@@ -68,7 +81,7 @@ public class TestRunLengthBitPackingHybridEncoder {
     // make sure that repeated 0s at the beginning
     // of the stream don't trip up the repeat count
 
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+    RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
     for (int i = 0; i < 10; i++) {
       encoder.writeInt(0);
     }
@@ -86,7 +99,7 @@ public class TestRunLengthBitPackingHybridEncoder {
 
   @Test
   public void testBitWidthZero() throws Exception {
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(0, 5, 10);
+    RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(0, 5, 10);
     for (int i = 0; i < 10; i++) {
       encoder.writeInt(0);
     }
@@ -102,8 +115,7 @@ public class TestRunLengthBitPackingHybridEncoder {
 
   @Test
   public void testBitPackingOnly() throws Exception {
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
-
+    RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
     for (int i = 0; i < 100; i++) {
       encoder.writeInt(i % 3);
     }
@@ -125,7 +137,7 @@ public class TestRunLengthBitPackingHybridEncoder {
 
   @Test
   public void testBitPackingOverflow() throws Exception {
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+    RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
 
     for (int i = 0; i < 1000; i++) {
       encoder.writeInt(i % 3);
@@ -157,7 +169,7 @@ public class TestRunLengthBitPackingHybridEncoder {
 
   @Test
   public void testTransitionFromBitPackingToRle() throws Exception {
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(3, 5, 10);
+    RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder();
 
     // 5 obviously bit-packed values
     encoder.writeInt(0);
@@ -195,7 +207,7 @@ public class TestRunLengthBitPackingHybridEncoder {
 
   @Test
   public void testPaddingZerosOnUnfinishedBitPackedRuns() throws Exception {
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(5, 5, 10);
+    RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(5, 5, 10);
     for (int i = 0; i < 9; i++) {
       encoder.writeInt(i+1);
     }
@@ -214,7 +226,7 @@ public class TestRunLengthBitPackingHybridEncoder {
 
   @Test
   public void testSwitchingModes() throws Exception {
-    RunLengthBitPackingHybridEncoder encoder = new RunLengthBitPackingHybridEncoder(9, 100, 1000);
+    RunLengthBitPackingHybridEncoder encoder = getRunLengthBitPackingHybridEncoder(9, 100, 1000);
 
     // rle first
     for (int i = 0; i < 25; i++) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
index 3abf804..aff3937 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/PerfTest.java
@@ -27,6 +27,7 @@ import static org.apache.parquet.example.Paper.schema3;
 import java.util.logging.Level;
 
 import org.apache.parquet.Log;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.column.ParquetProperties.WriterVersion;
 import org.apache.parquet.column.impl.ColumnWriteStoreV1;
 import org.apache.parquet.column.page.mem.MemPageStore;
@@ -77,7 +78,7 @@ public class PerfTest {
 
 
   private static void write(MemPageStore memPageStore) {
-    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0);
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 50*1024*1024, 50*1024*1024, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
     MessageColumnIO columnIO = newColumnFactory(schema);
 
     GroupWriter groupWriter = new GroupWriter(columnIO.getRecordWriter(columns), schema);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
index e7274cc..06f22b6 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestColumnIO.java
@@ -38,6 +38,7 @@ import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.junit.Assert;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -526,7 +527,7 @@ public class TestColumnIO {
   }
 
   private ColumnWriteStoreV1 newColumnWriteStore(MemPageStore memPageStore) {
-    return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0);
+    return new ColumnWriteStoreV1(memPageStore, 800, 800, useDictionary, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
   }
 
   @Test
@@ -599,6 +600,8 @@ public class TestColumnIO {
     groupWriter.write(r2);
     recordWriter.flush();
     columns.validate();
+    columns.flush();
+    columns.close();
   }
 }
 final class ValidatingColumnWriteStore implements ColumnWriteStore {
@@ -610,6 +613,11 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore {
   }
 
   @Override
+  public void close() {
+
+  }
+
+  @Override
   public ColumnWriter getColumnWriter(final ColumnDescriptor path) {
     return new ColumnWriter() {
       private void validate(Object value, int repetitionLevel,
@@ -630,6 +638,11 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore {
       }
 
       @Override
+      public void write(float value, int repetitionLevel, int definitionLevel) {
+        validate(value, repetitionLevel, definitionLevel);
+      }
+
+      @Override
       public void write(boolean value, int repetitionLevel, int definitionLevel) {
         validate(value, repetitionLevel, definitionLevel);
       }
@@ -645,8 +658,13 @@ final class ValidatingColumnWriteStore implements ColumnWriteStore {
       }
 
       @Override
-      public void write(float value, int repetitionLevel, int definitionLevel) {
-        validate(value, repetitionLevel, definitionLevel);
+      public void close() {
+
+      }
+
+      @Override
+      public long getBufferedSizeInMemory() {
+        throw new UnsupportedOperationException();
       }
 
       @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
index 9fde4b1..25b629b 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/TestFiltered.java
@@ -21,6 +21,7 @@ package org.apache.parquet.io;
 import java.util.ArrayList;
 import java.util.List;
 
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
 import org.apache.parquet.io.api.RecordConsumer;
 import org.junit.Test;
 
@@ -258,7 +259,7 @@ public class TestFiltered {
 
   private MemPageStore writeTestRecords(MessageColumnIO columnIO, int number) {
     MemPageStore memPageStore = new MemPageStore(number * 2);
-    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0);
+    ColumnWriteStoreV1 columns = new ColumnWriteStoreV1(memPageStore, 800, 800, false, WriterVersion.PARQUET_1_0, new HeapByteBufferAllocator());
 
     RecordConsumer recordWriter = columnIO.getRecordWriter(columns);
     GroupWriter groupWriter = new GroupWriter(recordWriter, schema);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
index bd8a69d..c8444dc 100644
--- a/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
+++ b/parquet-column/src/test/java/org/apache/parquet/io/api/TestBinary.java
@@ -18,6 +18,8 @@
  */
 package org.apache.parquet.io.api;
 
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
 
@@ -143,6 +145,29 @@ public class TestBinary {
   }
 
   @Test
+  public void testEqualityMethods() throws Exception {
+    Binary bin1 = Binary.fromConstantByteArray("alice".getBytes(), 1, 3);
+    Binary bin2 = Binary.fromConstantByteBuffer(ByteBuffer.wrap("alice".getBytes(), 1, 3));
+    assertEquals(bin1, bin2);
+  }
+
+  @Test
+  public void testWriteAllTo() throws Exception {
+    byte[] orig = {10, 9 ,8, 7, 6, 5, 4, 3, 2, 1};
+    testWriteAllToHelper(Binary.fromConstantByteBuffer(ByteBuffer.wrap(orig)), orig);
+    ByteBuffer buf = ByteBuffer.allocateDirect(orig.length);
+    buf.put(orig);
+    buf.flip();
+    testWriteAllToHelper(Binary.fromConstantByteBuffer(buf), orig);
+  }
+
+  private void testWriteAllToHelper(Binary binary, byte[] orig) throws IOException {
+    ByteArrayOutputStream out = new ByteArrayOutputStream(orig.length);
+    binary.writeTo(out);
+    assertArrayEquals(orig, out.toByteArray());
+  }
+
+  @Test
   public void testFromStringBinary() throws Exception {
     testBinary(STRING_BF, false);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java b/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java
new file mode 100644
index 0000000..2ac8a2b
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/IOExceptionUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet;
+
+import java.io.Closeable;
+import java.io.IOException;
+
+/**
+ * Utilities for managing I/O resources.
+ */
+public class IOExceptionUtils {
+
+	/**
+	 * Call the #close() method on a {@see Closable}, wrapping any IOException
+	 * in a runtime exception.
+	 *
+	 * @param closeable - resource to close
+	 */
+	public static void closeQuietly(Closeable closeable) {
+		try {
+			closeable.close();
+		} catch(IOException e) {
+			throw new ParquetRuntimeException("Error closing I/O related resources.", e) {};
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java b/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java
new file mode 100644
index 0000000..5271000
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/OutputStreamCloseException.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.parquet;
+
+/**
+ * Runtime exception indicating that a stream failed to be closed properly.
+ *
+ * Used to wrap up the checked IOException usually thrown from IO operations,
+ * these are generally not recoverable so it does not make sense to pollute the
+ * codebase declaring that they can be thrown whenever resources are being
+ * closed out.
+ */
+public class OutputStreamCloseException extends ParquetRuntimeException {
+
+  private static final long serialVersionUID = 1L;
+
+  public OutputStreamCloseException() {
+  }
+
+  public OutputStreamCloseException(String message, Throwable cause) {
+    super(message, cause);
+  }
+
+  public OutputStreamCloseException(String message) {
+    super(message);
+  }
+
+  public OutputStreamCloseException(Throwable cause) {
+    super(cause);
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java b/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java
index f67b15a..d0f13a8 100644
--- a/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java
+++ b/parquet-common/src/main/java/org/apache/parquet/ParquetRuntimeException.java
@@ -18,6 +18,9 @@
  */
 package org.apache.parquet;
 
+import java.io.Closeable;
+import java.io.IOException;
+
 /**
  * The parent class for all runtime exceptions
  *
@@ -42,5 +45,4 @@ abstract public class ParquetRuntimeException extends RuntimeException {
   public ParquetRuntimeException(Throwable cause) {
     super(cause);
   }
-
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java
new file mode 100644
index 0000000..ee36b74
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferAllocator.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.nio.ByteBuffer;
+
+public interface ByteBufferAllocator {
+  ByteBuffer allocate(int size);
+
+  /**
+   * For RefCounted implementations using direct memory, the release method
+   * needs to be called to free references to the allocated memory.
+   */
+  void release(ByteBuffer b);
+
+  /**
+   * Indicates if this allocator will produce ByteBuffers backed by direct memory.
+   *
+   * @return true if direct memory backed buffers will be created by this allocator, else false
+   */
+  boolean isDirect();
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
new file mode 100644
index 0000000..5b3b853
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/ByteBufferInputStream.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * This ByteBufferInputStream does not consume the ByteBuffer being passed in, 
+ * but will create a slice of the current buffer.
+ */
+public class ByteBufferInputStream extends InputStream {
+	
+  protected ByteBuffer byteBuf;
+  protected int initPos;
+  protected int count;
+  public ByteBufferInputStream(ByteBuffer buffer) {
+    this(buffer, buffer.position(), buffer.remaining());
+  }
+  
+  public ByteBufferInputStream(ByteBuffer buffer, int offset, int count) {
+    ByteBuffer temp = buffer.duplicate();
+    temp.position(offset);
+    byteBuf = temp.slice();
+    byteBuf.limit(count);
+    this.initPos = offset;
+    this.count = count;
+  }
+  
+  public ByteBuffer toByteBuffer() {
+    return byteBuf.slice();
+  }
+  
+  @Override
+  public int read() throws IOException {
+    if (!byteBuf.hasRemaining()) {
+    	return -1;
+    }
+    //Workaround for unsigned byte
+    return byteBuf.get() & 0xFF;
+  }
+
+  @Override
+  public int read(byte[] bytes, int offset, int length) throws IOException {
+    int count = Math.min(byteBuf.remaining(), length);
+    if (count == 0) return -1;
+    byteBuf.get(bytes, offset, count);
+    return count;
+  }
+  
+  @Override
+  public long skip(long n) {
+	  if (n > byteBuf.remaining())
+	    n = byteBuf.remaining();
+	  int pos = byteBuf.position();
+	  byteBuf.position((int)(pos + n));
+	  return n;
+  }
+
+
+  @Override
+  public int available() {
+    return byteBuf.remaining();
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
index d96a1e5..d40721a 100644
--- a/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/BytesUtils.java
@@ -22,6 +22,7 @@ import java.io.EOFException;
 import java.io.IOException;
 import java.io.InputStream;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.nio.charset.Charset;
 
 import org.apache.parquet.Log;
@@ -53,6 +54,21 @@ public class BytesUtils {
    * @return
    * @throws IOException
    */
+  public static int readIntLittleEndian(ByteBuffer in, int offset) throws IOException {
+    int ch4 = in.get(offset) & 0xff;
+    int ch3 = in.get(offset + 1) & 0xff;
+    int ch2 = in.get(offset + 2) & 0xff;
+    int ch1 = in.get(offset + 3) & 0xff;
+    return ((ch1 << 24) + (ch2 << 16) + (ch3 << 8) + (ch4 << 0));
+  }
+  
+  /**
+   * reads an int in little endian at the given position
+   * @param in
+   * @param offset
+   * @return
+   * @throws IOException
+   */
   public static int readIntLittleEndian(byte[] in, int offset) throws IOException {
     int ch4 = in[offset] & 0xff;
     int ch3 = in[offset + 1] & 0xff;
@@ -205,6 +221,14 @@ public class BytesUtils {
     out.write(value & 0x7F);
   }
 
+  public static void writeUnsignedVarInt(int value, ByteBuffer dest) throws IOException {
+    while ((value & 0xFFFFFF80) != 0L) {
+      dest.putInt((value & 0x7F) | 0x80);
+      value >>>= 7;
+    }
+    dest.putInt(value & 0x7F);
+  }
+
   public static void writeZigZagVarInt(int intValue, OutputStream out) throws IOException{
     writeUnsignedVarInt((intValue << 1) ^ (intValue >> 31), out);
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java
new file mode 100644
index 0000000..9fe4538
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/DirectByteBufferAllocator.java
@@ -0,0 +1,43 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.nio.ByteBuffer;
+
+public class DirectByteBufferAllocator implements ByteBufferAllocator{
+  public static final DirectByteBufferAllocator getInstance(){return new DirectByteBufferAllocator();}
+  public DirectByteBufferAllocator() {
+    super();
+  }
+
+  public ByteBuffer allocate(final int size) {
+    return ByteBuffer.allocateDirect(size);
+  }
+
+  @Override
+  public void release(ByteBuffer b) {
+    // The ByteBuffer.allocateDirect
+    return;
+  }
+
+  @Override
+  public boolean isDirect() {
+    return true;
+  }
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java
----------------------------------------------------------------------
diff --git a/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java b/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java
new file mode 100644
index 0000000..c5f475d
--- /dev/null
+++ b/parquet-common/src/main/java/org/apache/parquet/bytes/HeapByteBufferAllocator.java
@@ -0,0 +1,44 @@
+/* 
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import java.nio.ByteBuffer;
+
+public class HeapByteBufferAllocator implements ByteBufferAllocator{
+
+  public static final HeapByteBufferAllocator getInstance(){ return new HeapByteBufferAllocator();}
+
+  public HeapByteBufferAllocator() {
+    super();
+  }
+
+  public ByteBuffer allocate(final int size) {
+    return ByteBuffer.allocate(size);
+  }
+
+  public void release(ByteBuffer b) {
+    return;
+  }
+
+  @Override
+  public boolean isDirect() {
+    return false;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
index ac334ae..40190ee 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/BytesInput.java
@@ -25,6 +25,9 @@ import java.io.InputStream;
 import java.io.OutputStream;
 import java.util.Arrays;
 import java.util.List;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.WritableByteChannel;
 
 import org.apache.parquet.Log;
 
@@ -71,6 +74,15 @@ abstract public class BytesInput {
   public static BytesInput from(InputStream in, int bytes) {
     return new StreamBytesInput(in, bytes);
   }
+  
+  /**
+   * @param buffer
+   * @param length number of bytes to read
+   * @return a BytesInput that will read the given bytes from the ByteBuffer
+   */
+  public static BytesInput from(ByteBuffer buffer, int offset, int length) {
+    return new ByteBufferBytesInput(buffer, offset, length);
+  }
 
   /**
    *
@@ -121,7 +133,7 @@ abstract public class BytesInput {
   }
 
   /**
-   * @param arrayOut
+   * @param baos - stream to wrap into a BytesInput
    * @return a BytesInput that will write the content of the buffer
    */
   public static BytesInput from(ByteArrayOutputStream baos) {
@@ -166,6 +178,24 @@ abstract public class BytesInput {
 
   /**
    *
+   * @return a new ByteBuffer materializing the contents of this input
+   * @throws IOException
+   */
+  public ByteBuffer toByteBuffer() throws IOException {
+    return ByteBuffer.wrap(toByteArray());
+  }
+
+  /**
+   *
+   * @return a new InputStream materializing the contents of this input
+   * @throws IOException
+   */
+  public InputStream toInputStream() throws IOException {
+    return new ByteBufferInputStream(toByteBuffer());
+  }
+
+  /**
+   *
    * @return the size in bytes that would be written
    */
   abstract public long size();
@@ -258,6 +288,10 @@ abstract public class BytesInput {
       BytesUtils.writeIntLittleEndian(out, intValue);
     }
 
+    public ByteBuffer toByteBuffer() throws IOException {
+      return ByteBuffer.allocate(4).putInt(0, intValue);
+    }
+
     @Override
     public long size() {
       return 4;
@@ -278,6 +312,12 @@ abstract public class BytesInput {
       BytesUtils.writeUnsignedVarInt(intValue, out);
     }
 
+    public ByteBuffer toByteBuffer() throws IOException {
+      ByteBuffer ret = ByteBuffer.allocate((int) size());
+      BytesUtils.writeUnsignedVarInt(intValue, ret);
+      return ret;
+    }
+
     @Override
     public long size() {
       int s = 5 - ((Integer.numberOfLeadingZeros(intValue) + 3) / 7);
@@ -296,6 +336,10 @@ abstract public class BytesInput {
       return 0;
     }
 
+    public ByteBuffer toByteBuffer() throws IOException {
+      return ByteBuffer.allocate(0);
+    }
+
   }
 
   private static class CapacityBAOSBytesInput extends BytesInput {
@@ -355,11 +399,49 @@ abstract public class BytesInput {
       out.write(in, offset, length);
     }
 
+    public ByteBuffer toByteBuffer() throws IOException {
+      return ByteBuffer.wrap(in, offset, length);
+    }
+
     @Override
     public long size() {
       return length;
     }
 
   }
+  
+  private static class ByteBufferBytesInput extends BytesInput {
+    
+    private final ByteBuffer byteBuf;
+    private final int length;
+    private final int offset;
 
+    private ByteBufferBytesInput(ByteBuffer byteBuf, int offset, int length) {
+      this.byteBuf = byteBuf;
+      this.offset = offset;
+      this.length = length;
+    }
+
+    @Override
+    public void writeAllTo(OutputStream out) throws IOException {
+      final WritableByteChannel outputChannel = Channels.newChannel(out);
+      byteBuf.position(offset);
+      ByteBuffer tempBuf = byteBuf.slice();
+      tempBuf.limit(length);
+      outputChannel.write(tempBuf);
+    }
+    
+    @Override
+    public ByteBuffer toByteBuffer() throws IOException {
+      byteBuf.position(offset);
+      ByteBuffer buf = byteBuf.slice();
+      buf.limit(length);
+      return buf;
+    }
+
+    @Override
+    public long size() {
+      return length;
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
index 1670f9c..6155565 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java
@@ -21,16 +21,17 @@ package org.apache.parquet.bytes;
 import static java.lang.Math.max;
 import static java.lang.Math.pow;
 import static java.lang.String.format;
-import static java.lang.System.arraycopy;
 import static org.apache.parquet.Preconditions.checkArgument;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.OutputStream;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
 import java.util.List;
 
 import org.apache.parquet.Log;
+import org.apache.parquet.OutputStreamCloseException;
 
 /**
  * Similar to a {@link ByteArrayOutputStream}, but uses a different strategy for growing that does not involve copying.
@@ -54,16 +55,17 @@ import org.apache.parquet.Log;
  */
 public class CapacityByteArrayOutputStream extends OutputStream {
   private static final Log LOG = Log.getLog(CapacityByteArrayOutputStream.class);
-  private static final byte[] EMPTY_SLAB = new byte[0];
+  private static final ByteBuffer EMPTY_SLAB = ByteBuffer.wrap(new byte[0]);
 
   private int initialSlabSize;
   private final int maxCapacityHint;
-  private final List<byte[]> slabs = new ArrayList<byte[]>();
+  private final List<ByteBuffer> slabs = new ArrayList<ByteBuffer>();
 
-  private byte[] currentSlab;
+  private ByteBuffer currentSlab;
   private int currentSlabIndex;
   private int bytesAllocated = 0;
   private int bytesUsed = 0;
+  private ByteBufferAllocator allocator;
 
   /**
    * Return an initial slab size such that a CapacityByteArrayOutputStream constructed with it
@@ -86,38 +88,64 @@ public class CapacityByteArrayOutputStream extends OutputStream {
     return max(minSlabSize, ((int) (targetCapacity / pow(2, targetNumSlabs))));
   }
 
+  public static CapacityByteArrayOutputStream withTargetNumSlabs(
+      int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
+    return withTargetNumSlabs(minSlabSize, maxCapacityHint, targetNumSlabs, new HeapByteBufferAllocator());
+  }
+
   /**
    * Construct a CapacityByteArrayOutputStream configured such that its initial slab size is
    * determined by {@link #initialSlabSizeHeuristic}, with targetCapacity == maxCapacityHint
    */
   public static CapacityByteArrayOutputStream withTargetNumSlabs(
-      int minSlabSize, int maxCapacityHint, int targetNumSlabs) {
+      int minSlabSize, int maxCapacityHint, int targetNumSlabs, ByteBufferAllocator allocator) {
 
     return new CapacityByteArrayOutputStream(
         initialSlabSizeHeuristic(minSlabSize, maxCapacityHint, targetNumSlabs),
-        maxCapacityHint);
+        maxCapacityHint, allocator);
   }
 
   /**
    * Defaults maxCapacityHint to 1MB
    * @param initialSlabSize
-   * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int)}
+   * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
    */
   @Deprecated
   public CapacityByteArrayOutputStream(int initialSlabSize) {
-    this(initialSlabSize, 1024 * 1024);
+    this(initialSlabSize, 1024 * 1024, new HeapByteBufferAllocator());
+  }
+
+  /**
+   * Defaults maxCapacityHint to 1MB
+   * @param initialSlabSize
+   * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
+   */
+  @Deprecated
+  public CapacityByteArrayOutputStream(int initialSlabSize, ByteBufferAllocator allocator) {
+    this(initialSlabSize, 1024 * 1024, allocator);
   }
 
   /**
    * @param initialSlabSize the size to make the first slab
    * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
+   * @deprecated use {@link CapacityByteArrayOutputStream#CapacityByteArrayOutputStream(int, int, ByteBufferAllocator)}
    */
+  @Deprecated
   public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint) {
+    this(initialSlabSize, maxCapacityHint, new HeapByteBufferAllocator());
+  }
+
+  /**
+   * @param initialSlabSize the size to make the first slab
+   * @param maxCapacityHint a hint (not guarantee) of the max amount of data written to this stream
+   */
+  public CapacityByteArrayOutputStream(int initialSlabSize, int maxCapacityHint, ByteBufferAllocator allocator) {
     checkArgument(initialSlabSize > 0, "initialSlabSize must be > 0");
     checkArgument(maxCapacityHint > 0, "maxCapacityHint must be > 0");
     checkArgument(maxCapacityHint >= initialSlabSize, String.format("maxCapacityHint can't be less than initialSlabSize %d %d", initialSlabSize, maxCapacityHint));
     this.initialSlabSize = initialSlabSize;
     this.maxCapacityHint = maxCapacityHint;
+    this.allocator = allocator;
     reset();
   }
 
@@ -145,7 +173,7 @@ public class CapacityByteArrayOutputStream extends OutputStream {
 
     if (Log.DEBUG) LOG.debug(format("used %d slabs, adding new slab of size %d", slabs.size(), nextSlabSize));
 
-    this.currentSlab = new byte[nextSlabSize];
+    this.currentSlab = allocator.allocate(nextSlabSize);
     this.slabs.add(currentSlab);
     this.bytesAllocated += nextSlabSize;
     this.currentSlabIndex = 0;
@@ -153,11 +181,12 @@ public class CapacityByteArrayOutputStream extends OutputStream {
 
   @Override
   public void write(int b) {
-    if (currentSlabIndex == currentSlab.length) {
+    if (!currentSlab.hasRemaining()) {
       addSlab(1);
     }
-    currentSlab[currentSlabIndex] = (byte) b;
+    currentSlab.put(currentSlabIndex, (byte) b);
     currentSlabIndex += 1;
+    currentSlab.position(currentSlabIndex);
     bytesUsed += 1;
   }
 
@@ -168,18 +197,34 @@ public class CapacityByteArrayOutputStream extends OutputStream {
       throw new IndexOutOfBoundsException(
           String.format("Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off));
     }
-    if (currentSlabIndex + len >= currentSlab.length) {
-      final int length1 = currentSlab.length - currentSlabIndex;
-      arraycopy(b, off, currentSlab, currentSlabIndex, length1);
+    if (len >= currentSlab.remaining()) {
+      final int length1 = currentSlab.remaining();
+      currentSlab.put(b, off, length1);
+      bytesUsed += length1;
+      currentSlabIndex += length1;
       final int length2 = len - length1;
       addSlab(length2);
-      arraycopy(b, off + length1, currentSlab, currentSlabIndex, length2);
+      currentSlab.put(b, off + length1, length2);
       currentSlabIndex = length2;
+      bytesUsed += length2;
     } else {
-      arraycopy(b, off, currentSlab, currentSlabIndex, len);
+      currentSlab.put(b, off, len);
       currentSlabIndex += len;
+      bytesUsed += len;
+    }
+  }
+
+  private void writeToOutput(OutputStream out, ByteBuffer buf, int len) throws IOException {
+    if (buf.hasArray()) {
+      out.write(buf.array(), buf.arrayOffset(), len);
+    } else {
+      // The OutputStream interface only takes a byte[], unfortunately this means that a ByteBuffer
+      // not backed by a byte array must be copied to fulfil this interface
+      byte[] copy = new byte[len];
+      buf.flip();
+      buf.get(copy);
+      out.write(copy);
     }
-    bytesUsed += len;
   }
 
   /**
@@ -191,10 +236,9 @@ public class CapacityByteArrayOutputStream extends OutputStream {
    */
   public void writeTo(OutputStream out) throws IOException {
     for (int i = 0; i < slabs.size() - 1; i++) {
-      final byte[] slab = slabs.get(i);
-      out.write(slab);
+      writeToOutput(out, slabs.get(i), slabs.get(i).position());
     }
-    out.write(currentSlab, 0, currentSlabIndex);
+    writeToOutput(out, currentSlab, currentSlabIndex);
   }
 
   /**
@@ -222,6 +266,9 @@ public class CapacityByteArrayOutputStream extends OutputStream {
     // 7 = 2^3 - 1 so that doubling the initial size 3 times will get to the same size
     this.initialSlabSize = max(bytesUsed / 7, initialSlabSize);
     if (Log.DEBUG) LOG.debug(String.format("initial slab of size %d", initialSlabSize));
+    for (ByteBuffer slab : slabs) {
+      allocator.release(slab);
+    }
     this.slabs.clear();
     this.bytesAllocated = 0;
     this.bytesUsed = 0;
@@ -249,13 +296,13 @@ public class CapacityByteArrayOutputStream extends OutputStream {
 
     long seen = 0;
     for (int i = 0; i < slabs.size(); i++) {
-      byte[] slab = slabs.get(i);
-      if (index < seen + slab.length) {
+      ByteBuffer slab = slabs.get(i);
+      if (index < seen + slab.limit()) {
         // ok found index
-        slab[(int)(index-seen)] = value;
+        slab.put((int)(index-seen), value);
         break;
       }
-      seen += slab.length;
+      seen += slab.limit();
     }
   }
 
@@ -273,4 +320,16 @@ public class CapacityByteArrayOutputStream extends OutputStream {
   int getSlabCount() {
     return slabs.size();
   }
+
+  @Override
+  public void close() {
+    for (ByteBuffer slab : slabs) {
+      allocator.release(slab);
+    }
+    try {
+      super.close();
+    }catch(IOException e){
+      throw new OutputStreamCloseException(e);
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
index da4e92f..9d4a8a9 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/bytes/LittleEndianDataOutputStream.java
@@ -18,6 +18,9 @@
  */
 package org.apache.parquet.bytes;
 
+import org.apache.parquet.IOExceptionUtils;
+import org.apache.parquet.ParquetRuntimeException;
+
 import java.io.IOException;
 import java.io.OutputStream;
 
@@ -210,4 +213,8 @@ public class LittleEndianDataOutputStream extends OutputStream {
     writeLong(Double.doubleToLongBits(v));
   }
 
+  public void close() {
+    IOExceptionUtils.closeQuietly(out);
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java
index b9a37ad..675576c 100644
--- a/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java
+++ b/parquet-encoding/src/main/java/org/apache/parquet/column/values/bitpacking/BytePacker.java
@@ -18,6 +18,8 @@
  */
 package org.apache.parquet.column.values.bitpacking;
 
+import java.nio.ByteBuffer;
+
 /**
  * Packs and unpacks into bytes
  *
@@ -71,7 +73,15 @@ public abstract class BytePacker {
    * @param output the output values
    * @param outPos where to write to in output
    */
-  public abstract void unpack8Values(final byte[] input, final int inPos, final int[] output, final int outPos);
+  public abstract void unpack8Values(final ByteBuffer input, final int inPos, final int[] output, final int outPos);
+
+  /**
+   * Compatibility API
+   */
+  @Deprecated
+  public void unpack8Values(final byte[] input, final int inPos, final int[] output, final int outPos) {
+    unpack8Values(ByteBuffer.wrap(input), inPos, output, outPos);
+  }
 
   /**
    * unpack bitWidth * 4 bytes from input at inPos into 32 values in output at outPos.
@@ -81,6 +91,13 @@ public abstract class BytePacker {
    * @param output the output values
    * @param outPos where to write to in output
    */
-  public abstract void unpack32Values(byte[] input, int inPos, int[] output, int outPos);
+  public abstract void unpack32Values(ByteBuffer input, int inPos, int[] output, int outPos);
 
+  /**
+   * Compatibility API
+   */
+  @Deprecated
+  public void unpack32Values(byte[] input, int inPos, int[] output, int outPos) {
+    unpack32Values(ByteBuffer.wrap(input), inPos, output, outPos);
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
new file mode 100644
index 0000000..1cb0304
--- /dev/null
+++ b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestBytesInput.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.parquet.bytes;
+
+import org.junit.Test;
+
+import java.io.ByteArrayInputStream;
+
+import static org.junit.Assert.assertEquals;
+
+public class TestBytesInput {
+
+	@Test
+	public void testWriteInt() throws Throwable {
+		int[] testVals = {
+				Integer.MIN_VALUE,
+				Integer.MAX_VALUE,
+				0, 100, 1000, 0xdaedbeef};
+		for (Integer testVal : testVals) {
+			BytesInput varInt = BytesInput.fromUnsignedVarInt(testVal);
+			byte[] rno = varInt.toByteArray();
+			int i = BytesUtils.readUnsignedVarInt(new ByteArrayInputStream(rno));
+			assertEquals((int) testVal, i);
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
index b80fe40..89db198 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/bytes/TestCapacityByteArrayOutputStream.java
@@ -63,7 +63,7 @@ public class TestCapacityByteArrayOutputStream {
   }
 
   protected CapacityByteArrayOutputStream newCapacityBAOS(int initialSize) {
-    return new CapacityByteArrayOutputStream(10, 1000000);
+    return new CapacityByteArrayOutputStream(initialSize, 1000000, new HeapByteBufferAllocator());
   }
 
   @Test
@@ -129,12 +129,12 @@ public class TestCapacityByteArrayOutputStream {
       assertEquals(i % (v * 3), byteArray[i]);
     }
     // verifying we have not created 500 * 23 / 10 slabs
-    assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(),capacityByteArrayOutputStream.getSlabCount() <= 20);
+    assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(), capacityByteArrayOutputStream.getSlabCount() <= 20);
     capacityByteArrayOutputStream.reset();
     writeArraysOf3(capacityByteArrayOutputStream, v);
     validate(capacityByteArrayOutputStream, v * 3);
     // verifying we use less slabs now
-    assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(),capacityByteArrayOutputStream.getSlabCount() <= 2);
+    assertTrue("slab count: " + capacityByteArrayOutputStream.getSlabCount(), capacityByteArrayOutputStream.getSlabCount() <= 2);
   }
 
   @Test

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
index a5ce37e..8df5f39 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestByteBitPacking.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.bitpacking;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -50,7 +51,7 @@ public class TestByteBitPacking {
     byte[] packed = new byte[packer.getBitWidth() * 4];
     packer.pack32Values(values, 0, packed, 0);
     LOG.debug("packed: " + TestBitPacking.toString(packed));
-    packer.unpack32Values(packed, 0, unpacked, 0);
+    packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0);
   }
 
   private int[] generateValues(int bitWidth) {
@@ -140,7 +141,7 @@ public class TestByteBitPacking {
         LOG.debug("Gener. out: " + TestBitPacking.toString(packedGenerated));
         Assert.assertEquals(pack.name() + " width " + i, TestBitPacking.toString(packedByLemireAsBytes), TestBitPacking.toString(packedGenerated));
 
-        bytePacker.unpack32Values(packedByLemireAsBytes, 0, unpacked, 0);
+        bytePacker.unpack32Values(ByteBuffer.wrap(packedByLemireAsBytes), 0, unpacked, 0);
         LOG.debug("Output: " + TestBitPacking.toString(unpacked));
 
         Assert.assertArrayEquals("width " + i, values, unpacked);

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java
----------------------------------------------------------------------
diff --git a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java
index e0c97e0..2c5fa58 100644
--- a/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java
+++ b/parquet-encoding/src/test/java/org/apache/parquet/column/values/bitpacking/TestLemireBitPacking.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.bitpacking;
 import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.junit.Assert;
 import org.junit.Test;
@@ -64,7 +65,7 @@ public class TestLemireBitPacking {
   private void packUnpack(BytePacker packer, int[] values, int[] unpacked) {
     byte[] packed = new byte[packer.getBitWidth() * 4];
     packer.pack32Values(values, 0, packed, 0);
-    packer.unpack32Values(packed, 0, unpacked, 0);
+    packer.unpack32Values(ByteBuffer.wrap(packed), 0, unpacked, 0);
   }
 
   private int[] generateValues(int bitWidth) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
----------------------------------------------------------------------
diff --git a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
index 9a7c562..3d182e2 100644
--- a/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
+++ b/parquet-generator/src/main/java/org/apache/parquet/encoding/bitpacking/ByteBasedBitPackingGenerator.java
@@ -50,6 +50,7 @@ public class ByteBasedBitPackingGenerator {
     }
     FileWriter fw = new FileWriter(file);
     fw.append("package org.apache.parquet.column.values.bitpacking;\n");
+    fw.append("import java.nio.ByteBuffer;\n");
     fw.append("\n");
     fw.append("/**\n");
     if (msbFirst) {
@@ -97,8 +98,10 @@ public class ByteBasedBitPackingGenerator {
     generatePack(fw, bitWidth, 4, msbFirst);
 
     // Unpacking
-    generateUnpack(fw, bitWidth, 1, msbFirst);
-    generateUnpack(fw, bitWidth, 4, msbFirst);
+    generateUnpack(fw, bitWidth, 1, msbFirst, true);
+    generateUnpack(fw, bitWidth, 1, msbFirst, false);
+    generateUnpack(fw, bitWidth, 4, msbFirst, true);
+    generateUnpack(fw, bitWidth, 4, msbFirst, false);
 
     fw.append("  }\n");
   }
@@ -203,9 +206,15 @@ public class ByteBasedBitPackingGenerator {
     fw.append("    }\n");
   }
 
-  private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst)
+  private static void generateUnpack(FileWriter fw, int bitWidth, int batch, boolean msbFirst, boolean useByteArray)
       throws IOException {
-    fw.append("    public final void unpack" + (batch * 8) + "Values(final byte[] in, final int inPos, final int[] out, final int outPos) {\n");
+    final String bufferDataType;
+    if (useByteArray) {
+      bufferDataType = "byte[]";
+    } else {
+      bufferDataType = "ByteBuffer";
+    }
+    fw.append("    public final void unpack" + (batch * 8) + "Values(final " + bufferDataType + " in, final int inPos, final int[] out, final int outPos) {\n");
     if (bitWidth > 0) {
       int mask = genMask(bitWidth);
       for (int valueIndex = 0; valueIndex < (batch * 8); ++valueIndex) {
@@ -228,7 +237,14 @@ public class ByteBasedBitPackingGenerator {
           } else if (shift > 0){
             shiftString = "<<  " + shift;
           }
-          fw.append(" (((((int)in[" + align(byteIndex, 2) + " + inPos]) & 255) " + shiftString + ") & " + mask + ")");
+          final String byteAccess;
+          if (useByteArray) {
+            byteAccess = "in[" + align(byteIndex, 2) + " + inPos]";
+          } else {
+            // use ByteBuffer#get(index) method
+            byteAccess = "in.get(" + align(byteIndex, 2) + " + inPos)";
+          }
+          fw.append(" (((((int)" + byteAccess + ") & 255) " + shiftString + ") & " + mask + ")");
         }
         fw.append(";\n");
       }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/pom.xml
----------------------------------------------------------------------
diff --git a/parquet-hadoop/pom.xml b/parquet-hadoop/pom.xml
index a7f9d2c..2f2e932 100644
--- a/parquet-hadoop/pom.xml
+++ b/parquet-hadoop/pom.xml
@@ -92,6 +92,11 @@
       <version>1.9.5</version>
       <scope>test</scope>
     </dependency>
+    <dependency>
+      <groupId>commons-pool</groupId>
+      <artifactId>commons-pool</artifactId>
+      <version>1.5.4</version>
+    </dependency>
   </dependencies>
 
   <build>

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
index fdeb2ba..6821bbf 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/format/converter/ParquetMetadataConverter.java
@@ -36,6 +36,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.parquet.CorruptStatistics;
 import org.apache.parquet.Log;
 import org.apache.parquet.hadoop.metadata.ColumnPath;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
index 6840950..8bf882f 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/CodecFactory.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -18,14 +18,14 @@
  */
 package org.apache.parquet.hadoop;
 
-import java.io.ByteArrayInputStream;
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.compress.CodecPool;
 import org.apache.hadoop.io.compress.CompressionCodec;
@@ -34,18 +34,64 @@ import org.apache.hadoop.io.compress.Compressor;
 import org.apache.hadoop.io.compress.Decompressor;
 import org.apache.hadoop.util.ReflectionUtils;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 
-class CodecFactory {
+public class CodecFactory {
+
+  protected static final Map<String, CompressionCodec> CODEC_BY_NAME = Collections
+      .synchronizedMap(new HashMap<String, CompressionCodec>());
+
+  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
+  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
 
-  public class BytesDecompressor {
+  protected final Configuration configuration;
+  protected final int pageSize;
+
+  /**
+   * Create a new codec factory.
+   *
+   * @param configuration used to pass compression codec configuration information
+   * @param pageSize the expected page size, does not set a hard limit, currently just
+   *                 used to set the initial size of the output stream used when
+   *                 compressing a buffer. If this factory is only used to construct
+   *                 decompressors this parameter has no impact on the function of the factory
+   */
+  public CodecFactory(Configuration configuration, int pageSize) {
+    this.configuration = configuration;
+    this.pageSize = pageSize;
+  }
+
+  /**
+   * Create a codec factory that will provide compressors and decompressors
+   * that will work natively with ByteBuffers backed by direct memory.
+   *
+   * @param config configuration options for different compression codecs
+   * @param allocator an allocator for creating result buffers during compression
+   *                  and decompression, must provide buffers backed by Direct
+   *                  memory and return true for the isDirect() method
+   *                  on the ByteBufferAllocator interface
+   * @param pageSize the default page size. This does not set a hard limit on the
+   *                 size of buffers that can be compressed, but performance may
+   *                 be improved by setting it close to the expected size of buffers
+   *                 (in the case of parquet, pages) that will be compressed. This
+   *                 setting is unused in the case of decompressing data, as parquet
+   *                 always records the uncompressed size of a buffer. If this
+   *                 CodecFactory is only going to be used for decompressors, this
+   *                 parameter will not impact the function of the factory.
+   */
+  public static CodecFactory createDirectCodecFactory(Configuration config, ByteBufferAllocator allocator, int pageSize) {
+    return new DirectCodecFactory(config, allocator, pageSize);
+  }
+
+  class HeapBytesDecompressor extends BytesDecompressor {
 
     private final CompressionCodec codec;
     private final Decompressor decompressor;
 
-    public BytesDecompressor(CompressionCodec codec) {
-      this.codec = codec;
+    HeapBytesDecompressor(CompressionCodecName codecName) {
+      this.codec = getCodec(codecName);
       if (codec != null) {
         decompressor = CodecPool.getDecompressor(codec);
       } else {
@@ -53,11 +99,12 @@ class CodecFactory {
       }
     }
 
+    @Override
     public BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException {
       final BytesInput decompressed;
       if (codec != null) {
         decompressor.reset();
-        InputStream is = codec.createInputStream(new ByteArrayInputStream(bytes.toByteArray()), decompressor);
+        InputStream is = codec.createInputStream(bytes.toInputStream(), decompressor);
         decompressed = BytesInput.from(is, uncompressedSize);
       } else {
         decompressed = bytes;
@@ -65,7 +112,13 @@ class CodecFactory {
       return decompressed;
     }
 
-    private void release() {
+    @Override
+    public void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException {
+      ByteBuffer decompressed = decompress(BytesInput.from(input, 0, input.remaining()), uncompressedSize).toByteBuffer();
+      output.put(decompressed);
+    }
+
+    protected void release() {
       if (decompressor != null) {
         CodecPool.returnDecompressor(decompressor);
       }
@@ -78,16 +131,16 @@ class CodecFactory {
    * @author Julien Le Dem
    *
    */
-  public static class BytesCompressor {
+  class HeapBytesCompressor extends BytesCompressor {
 
     private final CompressionCodec codec;
     private final Compressor compressor;
     private final ByteArrayOutputStream compressedOutBuffer;
     private final CompressionCodecName codecName;
 
-    public BytesCompressor(CompressionCodecName codecName, CompressionCodec codec, int pageSize) {
+    HeapBytesCompressor(CompressionCodecName codecName) {
       this.codecName = codecName;
-      this.codec = codec;
+      this.codec = getCodec(codecName);
       if (codec != null) {
         this.compressor = CodecPool.getCompressor(codec);
         this.compressedOutBuffer = new ByteArrayOutputStream(pageSize);
@@ -97,6 +150,7 @@ class CodecFactory {
       }
     }
 
+    @Override
     public BytesInput compress(BytesInput bytes) throws IOException {
       final BytesInput compressedBytes;
       if (codec == null) {
@@ -116,7 +170,8 @@ class CodecFactory {
       return compressedBytes;
     }
 
-    private void release() {
+    @Override
+    protected void release() {
       if (compressor != null) {
         CodecPool.returnCompressor(compressor);
       }
@@ -128,60 +183,58 @@ class CodecFactory {
 
   }
 
-  private final Map<CompressionCodecName, BytesCompressor> compressors = new HashMap<CompressionCodecName, BytesCompressor>();
-  private final Map<CompressionCodecName, BytesDecompressor> decompressors = new HashMap<CompressionCodecName, BytesDecompressor>();
-  private final Map<String, CompressionCodec> codecByName = new HashMap<String, CompressionCodec>();
-  private final Configuration configuration;
+  public BytesCompressor getCompressor(CompressionCodecName codecName) {
+    BytesCompressor comp = compressors.get(codecName);
+    if (comp == null) {
+      comp = createCompressor(codecName);
+      compressors.put(codecName, comp);
+    }
+    return comp;
+  }
 
-  public CodecFactory(Configuration configuration) {
-    this.configuration = configuration;
+  public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
+    BytesDecompressor decomp = decompressors.get(codecName);
+    if (decomp == null) {
+      decomp = createDecompressor(codecName);
+      decompressors.put(codecName, decomp);
+    }
+    return decomp;
+  }
+
+  protected BytesCompressor createCompressor(CompressionCodecName codecName) {
+    return new HeapBytesCompressor(codecName);
+  }
+
+  protected BytesDecompressor createDecompressor(CompressionCodecName codecName) {
+    return new HeapBytesDecompressor(codecName);
   }
 
   /**
    *
-   * @param codecName the requested codec
+   * @param codecName
+   *          the requested codec
    * @return the corresponding hadoop codec. null if UNCOMPRESSED
    */
-  private CompressionCodec getCodec(CompressionCodecName codecName) {
+  protected CompressionCodec getCodec(CompressionCodecName codecName) {
     String codecClassName = codecName.getHadoopCompressionCodecClassName();
     if (codecClassName == null) {
       return null;
     }
-    CompressionCodec codec = codecByName.get(codecClassName);
+    CompressionCodec codec = CODEC_BY_NAME.get(codecClassName);
     if (codec != null) {
       return codec;
     }
 
     try {
       Class<?> codecClass = Class.forName(codecClassName);
-      codec = (CompressionCodec)ReflectionUtils.newInstance(codecClass, configuration);
-      codecByName.put(codecClassName, codec);
+      codec = (CompressionCodec) ReflectionUtils.newInstance(codecClass, configuration);
+      CODEC_BY_NAME.put(codecClassName, codec);
       return codec;
     } catch (ClassNotFoundException e) {
       throw new BadConfigurationException("Class " + codecClassName + " was not found", e);
     }
   }
 
-  public BytesCompressor getCompressor(CompressionCodecName codecName, int pageSize) {
-    BytesCompressor comp = compressors.get(codecName);
-    if (comp == null) {
-      CompressionCodec codec = getCodec(codecName);
-      comp = new BytesCompressor(codecName, codec, pageSize);
-      compressors.put(codecName, comp);
-    }
-    return comp;
-  }
-
-  public BytesDecompressor getDecompressor(CompressionCodecName codecName) {
-    BytesDecompressor decomp = decompressors.get(codecName);
-    if (decomp == null) {
-      CompressionCodec codec = getCodec(codecName);
-      decomp = new BytesDecompressor(codec);
-      decompressors.put(codecName, decomp);
-    }
-    return decomp;
-  }
-
   public void release() {
     for (BytesCompressor compressor : compressors.values()) {
       compressor.release();
@@ -192,4 +245,16 @@ class CodecFactory {
     }
     decompressors.clear();
   }
+
+  public static abstract class BytesCompressor {
+    public abstract BytesInput compress(BytesInput bytes) throws IOException;
+    public abstract CompressionCodecName getCodecName();
+    protected abstract void release();
+  }
+
+  public static abstract class BytesDecompressor {
+    public abstract BytesInput decompress(BytesInput bytes, int uncompressedSize) throws IOException;
+    public abstract void decompress(ByteBuffer input, int compressedSize, ByteBuffer output, int uncompressedSize) throws IOException;
+    protected abstract void release();
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
----------------------------------------------------------------------
diff --git a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
index b6934c2..af06747 100644
--- a/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
+++ b/parquet-hadoop/src/main/java/org/apache/parquet/hadoop/ColumnChunkPageReadStore.java
@@ -1,4 +1,4 @@
-/* 
+/*
  * Licensed to the Apache Software Foundation (ASF) under one
  * or more contributor license agreements.  See the NOTICE file
  * distributed with this work for additional information
@@ -6,9 +6,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY