You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2015/11/04 18:57:36 UTC

[4/4] parquet-mr git commit: PARQUET-77: ByteBuffer use in read and write paths

PARQUET-77: ByteBuffer use in read and write paths

This work is based on the GSOC project from the summer of 2014. We have expanded on it to fix bugs and change the write path to use ByteBuffers as well. This PR replaces the earlier PRs #6, #49 and #50

Author: Jason Altekruse <al...@gmail.com>
Author: sunyu <st...@gmail.com>
Author: adeneche <ad...@gmail.com>
Author: Jacques Nadeau <ja...@apache.org>
Author: Parth Chandra <pc...@maprtech.com>
Author: stormdsy@gmail.com <st...@gmail.com>
Author: Jason Altekruse <al...@open-math.com>
Author: dsy <st...@gmail.com>
Author: Steven Phillips <sp...@maprtech.com>
Author: Gera Shegalov <ge...@twitter.com>
Author: Ryan Blue <bl...@apache.org>

Closes #267 from jaltekruse/1.6.0rc3-drill-r0.3-merge and squashes the following commits:

56316d0 [Jason Altekruse] An exception out of the read method doesn't necessarily mean something is very wrong, so it shouldn't get wrapped in a ShouldNeverHappenException. This invocationTargetException will wrap any kind of exception coming out of the method, including an IOException.
58340d8 [Jason Altekruse] Fix CompatibilityUtil, primary issue was a small error in the package name for the class that was being used to detect if the Hadoop 2.x API was available.
96e19a8 [Jason Altekruse] Properly set the byte buffer position when reading out of a filesystem that does not implement the byte buffer based read method in the Hadoop 2.x API.
269daef [Jason Altekruse] Make CodecFactory public
bd7aa97 [Jason Altekruse] Remove unused imports, one of which has been moved to package private and is no longer accessible in this class.
a44fdba [Jason Altekruse] Fix logging and restrict access to classes inside of CodecFactory.
723701c [Jason Altekruse] Adding isDirect interface to ByteBufferAllocator to add a restriction on the allocators used by a DirectCodecFactory.
10b5ba3 [Jason Altekruse] Remove unneeded TODO
57491a2 [Jason Altekruse] Delete older version of test file, all of these tests look to be covered in the newer version.
d6501b1 [Jason Altekruse] Thought I had fixed this double deallocation earlier, guess the change got lost somewhere.
a8d2dc1 [Jason Altekruse] Address review comments.
40714a4 [Jason Altekruse] Move pageSize to the constructor of codecfactory rather than the method for getting a compressor.
df7fd9c [Jason Altekruse] Limit access to classes and methods used for reflection based access to Hadoop 2.0 compression APIs.
192c717 [Jason Altekruse] Fix error message
1a47767 [Jason Altekruse] Address review comments
5869156 [Jason Altekruse] Move fallback classes from HeapCodecFactory to the DirectCodecFactory
3945674 [Jason Altekruse] Switch to using the DirectCodecFactory everywhere, one test is failing form the command line that is passing in intellij.
e7f7f7f [Jason Altekruse] WIP - removing unneeded generics form CodecFactories
659230f [Jason Altekruse] Remove second version of the class ByteBufferBytesInput that was nested in DirectCodecFactory. Replace with the one that was declared in the BytesInput class.
c305984 [Jason Altekruse] Adding back code generation for method to take a byte array as well as the new implementation that takes a Bytebuffer.
b8f54c2 [Jason Altekruse] Add a unit test for ByteBufferBackedBinary.
ae58486 [Jason Altekruse] Changing argument lists that previously included both an allocator and a ParquetProperties object.
b4266fb [Jason Altekruse] Add license header to new class
f8e5988 [Jason Altekruse] Added javadocs, removed unused code in DirectCodecFactory
d332ca7 [Jason Altekruse] Add test for UnsignedVarIntBytesInput
b7a6457 [Jason Altekruse] fix license leader
8ff878a [Jason Altekruse] Addressing review comments
862eb13 [Jason Altekruse] Fix usage of old constructor in Thrift module that caused a compilation failure. I had been skipping this module entirely during my work as the tests will fail to compile without a binary version of thrift 0.7, which seems hard to come by or compile yourself on Mac OS X.
0496350 [Jason Altekruse] Add unit test for direct codec factory.
da1b52a [Jason Altekruse] Moving classes into parquet from Drill.
2f1a6c7 [Jason Altekruse] Consolidate a little more code
8f66e43 [Jason Altekruse] Create utility methods to transform checked exceptions to unchecked when using reflection.
f217e6a [Jason Altekruse] Restore old interfaces
d5536b6 [Jason Altekruse] Restore original name of CapacityByteArrayOutputStream to keep compatibility with 1.7
4c3195e [Jason Altekruse] Turn back on SemVer
2e95915 [Jason Altekruse] Addressing minor review comments, comments out code, star import, formatting
a793be8 [Jason Altekruse] Add closeQuietly method to convert checked  IOExceptions from classless into runtime exceptions. Remove a bunch of unused imports from when there were previously try catch blocks that did this wrapping themselves (many actually were refactored to remove any need for special exception handling in an earlier commit, only one is actually using the new method).
fdb689c [Jason Altekruse] Remove unnecessary copy writing a Binary to an OutputStream if it is backed by a byte array.
d4819b4 [Jason Altekruse] remove methods now unneccesary as same implementation has been moved to the base class.
ad58bbe [Jason Altekruse] Addressing small review comments, unused imports, doc cleanup, etc.
9fb65dd [Jason Altekruse] Rename method to get a dictionary page to clarify that the dictionary will be closed and not available for further insertion.
e79684e [Jason Altekruse] Review comments - fixing use of ParquetProperties and removing unused interfaces on PageWriter
b1040a8 [Jason Altekruse] Remove code used to debug a test that was failing after the initial merge.
9dccb94 [Jason Altekruse] Add new method to turn BytesInput into an InputStream.
f0e31ec [Jason Altekruse] revert small formatting and renaming changes, TODO make sure these result in a net diff of no changes (or only intended functional changes)
0098b1c [Jason Altekruse] Remove unused method
8c6e4a9 [Jason Altekruse] Addressing review comments, moving code out of generated class into abstract base class.
29cc747 [Jason Altekruse] Factor out common code
6959db7 [Jason Altekruse] addressing review comments, avoiding unnecessary copies when creating ByteBuffers
fec4242 [Jason Altekruse] Address review comments - factoring out code in tests
104a1d1 [Jason Altekruse] Remove test requiring a hard-coded binary file. This was actually a bad file being produced by Drill because we were not flushing the RecordConsumer.
86317b0 [Jason Altekruse] Address review comments, make field in immutable ParquetProperties object final, make an interface now expecting a ByteBuffer deprecated for the version that takes a byte[].
1971fc5 [Jason Altekruse] Fixes made while debugging drill unit tests
ebae775 [Jason Altekruse] Fix issue reading page data into an off-heap ByteBuffer
705b864 [Jason Altekruse] Rename CapacityByteArrayOutputStream to CapacityByteBufferOutputStream to reflect new implementation internals. Add close method to CapacityByteBufferOutputStream and a few other classes.
35d8386 [Jason Altekruse] Move call to getBytes() on dictionaryPages to remove the need to cache a list of dictionaryEncoders to be closed later.
d40706b [Jason Altekruse] Get rid of unnecessary calls to Bytebuffer.wrap(byte[]), as an interface that takes a byte array is still available.
fddd4af [Jason Altekruse] WIP - removing copies from the ByteBufferBasedBinary equals, compareTo, hashCode methods. Current tests are passing, but I should add some new ones.
829af6f [Jason Altekruse] WIP - getting rid of unnecessary copies in Binary.java
23ad48e [Jason Altekruse] WIP - addressing review comments
7e252f3 [Jason Altekruse] WIP - addressing review comments
1f4f504 [Jason Altekruse] WIP - addressing review comments
ab54c4e [Jason Altekruse] Moving classes out of the old packages.
45cadee [Jason Altekruse] Cleaning up code in Binary after merge.
864b011 [Jason Altekruse] Simplifying how buffer allocators are passed when creating ValuesWriters.
2b8328b [Jason Altekruse] I all of the tests are now passing after the merge.
1bfa3a0 [Jason Altekruse] Merge branch 'master' into 1.6.0rc3-drill-r0.3-merge
9bbc269 [Jacques Nadeau] Update to 1.6.0rc3-drill-r0.3
9f22bd7 [Jacques Nadeau] Make CodecFactory pluggable
4a9dd28 [Jacques Nadeau] update pom version
173aa25 [Jacques Nadeau] Set max preferred slab size to 16mb
c98ec2a [adeneche] bumped version to 1.6.0rc3-drill-r0.1
51cf2f1 [Ryan Blue] cherry pick pull#188
e1df3b9 [adeneche] disabled enforcer and changed version to -drill
6943536 [adeneche] fixing bug related to testDictionaryError_419
48cceef [Steven Phillips] Fix allocation in DictionaryValuesWriter
98b99ea [Parth Chandra] Revert readFooter to not use ZeroCopy path.
a6389db [Steven Phillips] Make constructor for PrimitiveType that takes decimalMetadata public.
e488924 [adeneche] after merge code cleanup
35b10af [Parth Chandra] Use ByteBuffers in the Write path. Allow callers to pass in an allocator to allocate the ByteBuffer.
2187697 [Jacques Nadeau] Update Binary to make a copy of data for initial statistics.
8143174 [adeneche] update pig.version to build with Hadoop 2 jars
2c2b183 [Parth Chandra] Remove Zero Copy read path while reading footers
7bc2a4d [Parth Chandra] Make a copy of Min and Max values for BinaryStatistics so that direct memory can be released before stats are written.
5bc8774 [Parth Chandra] Update Snappy Codec to implement DirectDecompressionCodec interface Add compatibility function to read directly into a byte buffer
0d22908 [adeneche] merging with master
8be638a [sunyu] Address tsdeng's comments
861e541 [dsy] enable enforcer check.
912cbaf [sunyu] fix a bug in equals in ByteBuffer Binary with offset and length
016e89c [sunyu] remove some unncessary codes. add compatible method initFromPage in ValueReaders. add toByteBuffer method in ByteBufferInputStream. add V21FileAPI class to encapsulate v21 APIs and make it a singlton. add ByteBuffer based equal and compareto method in Binary.
26dc879 [dsy] disable enforcer to pass build.
a7bcfbb [sunyu] Make BytePacker consume ByteBuffer directly.
01c2ae5 [sunyu] Implement FSDISTransport in Compatible layer. Fix bugs in Binary.
47b177d [sunyu] Move CompatibilityUtil to parquet.hadoop.util. Use reflect to call new API to keep compatible.
970fc8b [stormdsy@gmail.com] Add a Hadoop compatible layer to abstract away the zero copy API and old API.
4f399aa [stormdsy@gmail.com] Add original readIntLittleEndian function to keep compatible with previous verision.
7ac1df5 [stormdsy@gmail.com] Using Writable Channel to replace write to OutputStream one by one.
36aba13 [sunyu] Read from ByteBuffer instead of ByteArray to avoid unnecessary array copy through read path.
53500d4 [sunyu] Add ByteBufferInputStream and modify Chunk to consume ByteBuffer instead of byte array.
df1ad93 [stormdsy@gmail.com] Reading chunk using zero-copy API
2d32f49 [Gera Shegalov] Reading file metadata using zero-copy API
686d598 [Gera Shegalov] Use ByteBuf-based api to read magic.


Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/6b605a4e
Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/6b605a4e
Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/6b605a4e

Branch: refs/heads/master
Commit: 6b605a4ea05b66e1a6bf843353abcb4834a4ced8
Parents: 5a45ae3
Author: Jason Altekruse <al...@gmail.com>
Authored: Wed Nov 4 09:13:09 2015 -0800
Committer: Julien Le Dem <ju...@ledem.net>
Committed: Wed Nov 4 09:13:09 2015 -0800

----------------------------------------------------------------------
 .../apache/parquet/column/ColumnWriteStore.java |   6 +
 .../org/apache/parquet/column/ColumnWriter.java |  11 +
 .../parquet/column/ParquetProperties.java       |  53 +-
 .../parquet/column/impl/ColumnReaderImpl.java   |  11 +-
 .../parquet/column/impl/ColumnWriteStoreV1.java |  14 +-
 .../parquet/column/impl/ColumnWriteStoreV2.java |  11 +
 .../parquet/column/impl/ColumnWriterV1.java     |  22 +-
 .../parquet/column/impl/ColumnWriterV2.java     |  27 +-
 .../apache/parquet/column/page/PageWriter.java  |   2 +-
 .../parquet/column/values/ValuesReader.java     |  19 +-
 .../parquet/column/values/ValuesWriter.java     |  13 +-
 .../bitpacking/BitPackingValuesReader.java      |  13 +-
 .../bitpacking/BitPackingValuesWriter.java      |  10 +-
 .../bitpacking/ByteBitPackingValuesReader.java  |  14 +-
 .../bitpacking/ByteBitPackingValuesWriter.java  |   1 +
 .../column/values/boundedint/BitReader.java     |   7 +-
 .../column/values/boundedint/BitWriter.java     |  12 +-
 .../boundedint/BoundedIntValuesFactory.java     |   5 +-
 .../boundedint/BoundedIntValuesReader.java      |   5 +-
 .../boundedint/BoundedIntValuesWriter.java      |  10 +-
 .../boundedint/ZeroIntegerValuesReader.java     |   3 +-
 .../delta/DeltaBinaryPackingValuesReader.java   |  15 +-
 .../delta/DeltaBinaryPackingValuesWriter.java   |  17 +-
 .../DeltaLengthByteArrayValuesReader.java       |  11 +-
 .../DeltaLengthByteArrayValuesWriter.java       |  14 +-
 .../deltastrings/DeltaByteArrayReader.java      |   5 +-
 .../deltastrings/DeltaByteArrayWriter.java      |  14 +-
 .../dictionary/DictionaryValuesReader.java      |  13 +-
 .../dictionary/DictionaryValuesWriter.java      |  88 ++--
 .../dictionary/PlainValuesDictionary.java       |  26 +-
 .../values/fallback/FallbackValuesWriter.java   |  12 +-
 .../values/plain/BinaryPlainValuesReader.java   |  10 +-
 .../values/plain/BooleanPlainValuesReader.java  |   5 +-
 .../values/plain/BooleanPlainValuesWriter.java  |   6 +
 .../FixedLenByteArrayPlainValuesReader.java     |   9 +-
 .../FixedLenByteArrayPlainValuesWriter.java     |  13 +-
 .../column/values/plain/PlainValuesReader.java  |  15 +-
 .../column/values/plain/PlainValuesWriter.java  |  11 +-
 .../rle/RunLengthBitPackingHybridDecoder.java   |   8 +-
 .../rle/RunLengthBitPackingHybridEncoder.java   |  10 +-
 .../RunLengthBitPackingHybridValuesReader.java  |   7 +-
 .../RunLengthBitPackingHybridValuesWriter.java  |  10 +-
 .../java/org/apache/parquet/io/api/Binary.java  | 204 ++++++--
 .../apache/parquet/schema/PrimitiveType.java    |   7 +-
 .../column/impl/TestColumnReaderImpl.java       |   1 +
 .../column/impl/TestCorruptDeltaByteArrays.java |  31 +-
 .../parquet/column/mem/TestMemColumn.java       |   3 +-
 .../parquet/column/page/mem/MemPageWriter.java  |   1 -
 .../org/apache/parquet/column/values/Utils.java |   5 +-
 .../values/bitpacking/BitPackingPerfTest.java   |   3 +-
 .../values/bitpacking/TestBitPackingColumn.java |   6 +-
 .../values/boundedint/TestBoundedColumns.java   |  11 +-
 .../DeltaBinaryPackingValuesWriterTest.java     |  12 +-
 .../benchmark/BenchmarkIntegerOutputSize.java   |   5 +-
 .../BenchmarkReadingRandomIntegers.java         |   8 +-
 .../benchmark/RandomWritingBenchmarkTest.java   |   7 +-
 .../SmallRangeWritingBenchmarkTest.java         |   3 +-
 .../TestDeltaLengthByteArray.java               |  13 +-
 .../BenchmarkDeltaLengthByteArray.java          |   5 +-
 .../values/deltastrings/TestDeltaByteArray.java |   9 +-
 .../benchmark/BenchmarkDeltaByteArray.java      |   9 +-
 .../values/dictionary/TestDictionary.java       |  50 +-
 ...unLengthBitPackingHybridIntegrationTest.java |  10 +-
 .../TestRunLengthBitPackingHybridEncoder.java   |  30 +-
 .../java/org/apache/parquet/io/PerfTest.java    |   3 +-
 .../org/apache/parquet/io/TestColumnIO.java     |  24 +-
 .../org/apache/parquet/io/TestFiltered.java     |   3 +-
 .../org/apache/parquet/io/api/TestBinary.java   |  25 +
 .../org/apache/parquet/IOExceptionUtils.java    |  43 ++
 .../parquet/OutputStreamCloseException.java     |  46 ++
 .../apache/parquet/ParquetRuntimeException.java |   4 +-
 .../parquet/bytes/ByteBufferAllocator.java      |  38 ++
 .../parquet/bytes/ByteBufferInputStream.java    |  82 +++
 .../org/apache/parquet/bytes/BytesUtils.java    |  24 +
 .../bytes/DirectByteBufferAllocator.java        |  43 ++
 .../parquet/bytes/HeapByteBufferAllocator.java  |  44 ++
 .../org/apache/parquet/bytes/BytesInput.java    |  84 ++-
 .../bytes/CapacityByteArrayOutputStream.java    | 107 +++-
 .../bytes/LittleEndianDataOutputStream.java     |   7 +
 .../column/values/bitpacking/BytePacker.java    |  21 +-
 .../apache/parquet/bytes/TestBytesInput.java    |  42 ++
 .../TestCapacityByteArrayOutputStream.java      |   6 +-
 .../values/bitpacking/TestByteBitPacking.java   |   5 +-
 .../values/bitpacking/TestLemireBitPacking.java |   3 +-
 .../ByteBasedBitPackingGenerator.java           |  26 +-
 parquet-hadoop/pom.xml                          |   5 +
 .../converter/ParquetMetadataConverter.java     |   1 +
 .../org/apache/parquet/hadoop/CodecFactory.java | 157 ++++--
 .../hadoop/ColumnChunkPageReadStore.java        |   6 +-
 .../hadoop/ColumnChunkPageWriteStore.java       |  34 +-
 .../parquet/hadoop/DirectCodecFactory.java      | 522 +++++++++++++++++++
 .../hadoop/InternalParquetRecordWriter.java     |  21 +-
 .../parquet/hadoop/ParquetFileReader.java       |  81 +--
 .../parquet/hadoop/ParquetFileWriter.java       |   3 +-
 .../parquet/hadoop/ParquetOutputFormat.java     |   5 +-
 .../parquet/hadoop/ParquetRecordWriter.java     |  14 +-
 .../apache/parquet/hadoop/ParquetWriter.java    |   8 +-
 .../hadoop/codec/SnappyDecompressor.java        |   3 +-
 .../parquet/hadoop/util/CompatibilityUtil.java  | 114 ++++
 .../hadoop/TestColumnChunkPageWriteStore.java   |   9 +-
 .../parquet/hadoop/TestDirectCodecFactory.java  | 165 ++++++
 .../parquet/pig/TupleConsumerPerfTest.java      |   3 +-
 .../parquet/thrift/TestParquetReadProtocol.java |   4 +-
 pom.xml                                         |   3 +-
 104 files changed, 2396 insertions(+), 463 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
index 739c00f..bb9dfea 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriteStore.java
@@ -58,4 +58,10 @@ public interface ColumnWriteStore {
    * @return a formated string representing memory usage per column
    */
   abstract public String memUsageString();
+
+  /**
+   * Close the related output stream and release any resources
+   */
+  abstract public void close();
+
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
index 7605c50..c824504 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ColumnWriter.java
@@ -83,5 +83,16 @@ public interface ColumnWriter {
    */
   void writeNull(int repetitionLevel, int definitionLevel);
 
+ /**
+  * Close the underlying store. This should be called when there are no
+  * more data to be written.
+  */
+  void close();
+
+  /**
+   * used to decide when to write a page or row group
+   * @return the number of bytes of memory used to buffer the current data
+   */
+  long getBufferedSizeInMemory();
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
index df44c4b..f8567a8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java
@@ -18,6 +18,10 @@
  */
 package org.apache.parquet.column;
 
+import org.apache.parquet.Preconditions;
+import org.apache.parquet.bytes.ByteBufferAllocator;
+import org.apache.parquet.bytes.HeapByteBufferAllocator;
+
 import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
 import static org.apache.parquet.column.Encoding.PLAIN;
 import static org.apache.parquet.column.Encoding.PLAIN_DICTIONARY;
@@ -74,19 +78,27 @@ public class ParquetProperties {
   private final int dictionaryPageSizeThreshold;
   private final WriterVersion writerVersion;
   private final boolean enableDictionary;
+  private final ByteBufferAllocator allocator;
 
   public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict) {
+    this(dictPageSize, writerVersion, enableDict, new HeapByteBufferAllocator());
+  }
+
+  public ParquetProperties(int dictPageSize, WriterVersion writerVersion, boolean enableDict, ByteBufferAllocator allocator) {
     this.dictionaryPageSizeThreshold = dictPageSize;
     this.writerVersion = writerVersion;
     this.enableDictionary = enableDict;
+    Preconditions.checkNotNull(allocator, "ByteBufferAllocator");
+    this.allocator = allocator;
   }
 
-  public static ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
+  public ValuesWriter getColumnDescriptorValuesWriter(int maxLevel, int initialSizePerCol, int pageSize) {
     if (maxLevel == 0) {
       return new DevNullValuesWriter();
     } else {
       return new RunLengthBitPackingHybridValuesWriter(
-          getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize);
+          getWidthFromMaxInt(maxLevel), initialSizePerCol, pageSize, this.allocator
+      );
     }
   }
 
@@ -95,15 +107,15 @@ public class ParquetProperties {
     case BOOLEAN:
       return new BooleanPlainValuesWriter();
     case INT96:
-      return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize);
+      return new FixedLenByteArrayPlainValuesWriter(12, initialSizePerCol, pageSize, this.allocator);
     case FIXED_LEN_BYTE_ARRAY:
-      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize);
+      return new FixedLenByteArrayPlainValuesWriter(path.getTypeLength(), initialSizePerCol, pageSize, this.allocator);
     case BINARY:
     case INT32:
     case INT64:
     case DOUBLE:
     case FLOAT:
-      return new PlainValuesWriter(initialSizePerCol, pageSize);
+      return new PlainValuesWriter(initialSizePerCol, pageSize, this.allocator);
     default:
       throw new IllegalArgumentException("Unknown type " + path.getType());
     }
@@ -128,19 +140,19 @@ public class ParquetProperties {
     case BOOLEAN:
       throw new IllegalArgumentException("no dictionary encoding for BOOLEAN");
     case BINARY:
-      return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+      return new PlainBinaryDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
     case INT32:
-      return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+      return new PlainIntegerDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
     case INT64:
-      return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+      return new PlainLongDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
     case INT96:
-      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage);
+      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, 12, encodingForDataPage, encodingForDictionaryPage, this.allocator);
     case DOUBLE:
-      return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+      return new PlainDoubleDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
     case FLOAT:
-      return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage);
+      return new PlainFloatDictionaryValuesWriter(dictionaryPageSizeThreshold, encodingForDataPage, encodingForDictionaryPage, this.allocator);
     case FIXED_LEN_BYTE_ARRAY:
-      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage);
+      return new PlainFixedLenArrayDictionaryValuesWriter(dictionaryPageSizeThreshold, path.getTypeLength(), encodingForDataPage, encodingForDictionaryPage, this.allocator);
     default:
       throw new IllegalArgumentException("Unknown type " + path.getType());
     }
@@ -153,12 +165,12 @@ public class ParquetProperties {
     case PARQUET_2_0:
       switch (path.getType()) {
       case BOOLEAN:
-        return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize);
+        return new RunLengthBitPackingHybridValuesWriter(1, initialSizePerCol, pageSize, this.allocator);
       case BINARY:
       case FIXED_LEN_BYTE_ARRAY:
-        return new DeltaByteArrayWriter(initialSizePerCol, pageSize);
+        return new DeltaByteArrayWriter(initialSizePerCol, pageSize,this.allocator);
       case INT32:
-        return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize);
+        return new DeltaBinaryPackingValuesWriter(initialSizePerCol, pageSize, this.allocator);
       case INT96:
       case INT64:
       case DOUBLE:
@@ -218,23 +230,28 @@ public class ParquetProperties {
     return enableDictionary;
   }
 
+  public ByteBufferAllocator getAllocator() {
+    return allocator;
+  }
+
   public ColumnWriteStore newColumnWriteStore(
       MessageType schema,
       PageWriteStore pageStore,
-      int pageSize) {
+      int pageSize,
+      ByteBufferAllocator allocator) {
     switch (writerVersion) {
     case PARQUET_1_0:
       return new ColumnWriteStoreV1(
           pageStore,
           pageSize,
           dictionaryPageSizeThreshold,
-          enableDictionary, writerVersion);
+          enableDictionary, writerVersion, allocator);
     case PARQUET_2_0:
       return new ColumnWriteStoreV2(
           schema,
           pageStore,
           pageSize,
-          new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary));
+          new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary, allocator));
     default:
       throw new IllegalArgumentException("unknown version " + writerVersion);
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
index c53977f..8c2a4bf 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnReaderImpl.java
@@ -27,6 +27,7 @@ import static org.apache.parquet.column.ValuesType.VALUES;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.CorruptDeltaByteArrays;
 import org.apache.parquet.Log;
@@ -548,7 +549,7 @@ public class ColumnReaderImpl implements ColumnReader {
     });
   }
 
-  private void initDataReader(Encoding dataEncoding, byte[] bytes, int offset, int valueCount) {
+  private void initDataReader(Encoding dataEncoding, ByteBuffer bytes, int offset, int valueCount) {
     ValuesReader previousReader = this.dataColumn;
 
     this.currentEncoding = dataEncoding;
@@ -588,8 +589,8 @@ public class ColumnReaderImpl implements ColumnReader {
     this.repetitionLevelColumn = new ValuesReaderIntIterator(rlReader);
     this.definitionLevelColumn = new ValuesReaderIntIterator(dlReader);
     try {
-      byte[] bytes = page.getBytes().toByteArray();
-      if (DEBUG) LOG.debug("page size " + bytes.length + " bytes and " + pageValueCount + " records");
+      ByteBuffer bytes = page.getBytes().toByteBuffer();
+      if (DEBUG) LOG.debug("page size " + bytes.remaining() + " bytes and " + pageValueCount + " records");
       if (DEBUG) LOG.debug("reading repetition levels at 0");
       rlReader.initFromPage(pageValueCount, bytes, 0);
       int next = rlReader.getNextOffset();
@@ -608,7 +609,7 @@ public class ColumnReaderImpl implements ColumnReader {
     this.definitionLevelColumn = newRLEIterator(path.getMaxDefinitionLevel(), page.getDefinitionLevels());
     try {
       if (DEBUG) LOG.debug("page data size " + page.getData().size() + " bytes and " + pageValueCount + " records");
-      initDataReader(page.getDataEncoding(), page.getData().toByteArray(), 0, page.getValueCount());
+      initDataReader(page.getDataEncoding(), page.getData().toByteBuffer(), 0, page.getValueCount());
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read page " + page + " in col " + path, e);
     }
@@ -622,7 +623,7 @@ public class ColumnReaderImpl implements ColumnReader {
       return new RLEIntIterator(
           new RunLengthBitPackingHybridDecoder(
               BytesUtils.getWidthFromMaxInt(maxLevel),
-              new ByteArrayInputStream(bytes.toByteArray())));
+              bytes.toInputStream()));
     } catch (IOException e) {
       throw new ParquetDecodingException("could not read levels in page for col " + path, e);
     }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
index a72b6f7..277c468 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV1.java
@@ -25,6 +25,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
@@ -40,14 +41,16 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
   private final int dictionaryPageSizeThreshold;
   private final boolean enableDictionary;
   private final WriterVersion writerVersion;
+  private final ByteBufferAllocator allocator;
 
-  public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion) {
+  public ColumnWriteStoreV1(PageWriteStore pageWriteStore, int pageSizeThreshold, int dictionaryPageSizeThreshold, boolean enableDictionary, WriterVersion writerVersion, ByteBufferAllocator allocator) {
     super();
     this.pageWriteStore = pageWriteStore;
     this.pageSizeThreshold = pageSizeThreshold;
     this.dictionaryPageSizeThreshold = dictionaryPageSizeThreshold;
     this.enableDictionary = enableDictionary;
     this.writerVersion = writerVersion;
+    this.allocator = allocator;
   }
 
   public ColumnWriter getColumnWriter(ColumnDescriptor path) {
@@ -65,7 +68,7 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
 
   private ColumnWriterV1 newMemColumn(ColumnDescriptor path) {
     PageWriter pageWriter = pageWriteStore.getPageWriter(path);
-    return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion);
+    return new ColumnWriterV1(path, pageWriter, pageSizeThreshold, dictionaryPageSizeThreshold, enableDictionary, writerVersion, allocator);
   }
 
   @Override
@@ -132,4 +135,11 @@ public class ColumnWriteStoreV1 implements ColumnWriteStore {
     // V1 does not take record boundaries into account
   }
 
+  public void close() {
+    Collection<ColumnWriterV1> values = columns.values();
+    for (ColumnWriterV1 memColumn : values) {
+      memColumn.close();
+    }
+  }
+
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
index fc17a22..4126004 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriteStoreV2.java
@@ -29,6 +29,7 @@ import java.util.Map.Entry;
 import java.util.Set;
 import java.util.TreeMap;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriteStore;
 import org.apache.parquet.column.ColumnWriter;
@@ -50,6 +51,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
   private long rowCount;
   private long rowCountForNextSizeCheck = MINIMUM_RECORD_COUNT_FOR_CHECK;
   private final long thresholdTolerance;
+  private final ByteBufferAllocator allocator;
 
   private int pageSizeThreshold;
 
@@ -61,6 +63,7 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
     super();
     this.pageSizeThreshold = pageSizeThreshold;
     this.thresholdTolerance = (long)(pageSizeThreshold * THRESHOLD_TOLERANCE_RATIO);
+    this.allocator = parquetProps.getAllocator();
     Map<ColumnDescriptor, ColumnWriterV2> mcolumns = new TreeMap<ColumnDescriptor, ColumnWriterV2>();
     for (ColumnDescriptor path : schema.getColumns()) {
       PageWriter pageWriter = pageWriteStore.getPageWriter(path);
@@ -128,6 +131,14 @@ public class ColumnWriteStoreV2 implements ColumnWriteStore {
   }
 
   @Override
+  public void close() {
+    flush(); // calling flush() here to keep it consistent with the behavior before merging with master
+    for (ColumnWriterV2 memColumn : columns.values()) {
+      memColumn.close();
+    }
+  }
+
+  @Override
   public void endRecord() {
     ++ rowCount;
     if (rowCount >= rowCountForNextSizeCheck) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
index f4079c7..f010df8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV1.java
@@ -23,6 +23,7 @@ import static org.apache.parquet.bytes.BytesInput.concat;
 import java.io.IOException;
 
 import org.apache.parquet.Log;
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnDescriptor;
 import org.apache.parquet.column.ColumnWriter;
@@ -66,7 +67,8 @@ final class ColumnWriterV1 implements ColumnWriter {
       int pageSizeThreshold,
       int dictionaryPageSizeThreshold,
       boolean enableDictionary,
-      WriterVersion writerVersion) {
+      WriterVersion writerVersion,
+      ByteBufferAllocator allocator) {
     this.path = path;
     this.pageWriter = pageWriter;
     this.pageSizeThreshold = pageSizeThreshold;
@@ -74,10 +76,10 @@ final class ColumnWriterV1 implements ColumnWriter {
     this.valueCountForNextSizeCheck = INITIAL_COUNT_FOR_SIZE_CHECK;
     resetStatistics();
 
-    ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary);
+    ParquetProperties parquetProps = new ParquetProperties(dictionaryPageSizeThreshold, writerVersion, enableDictionary, allocator);
 
-    this.repetitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
-    this.definitionLevelColumn = ParquetProperties.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
+    this.repetitionLevelColumn = parquetProps.getColumnDescriptorValuesWriter(path.getMaxRepetitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
+    this.definitionLevelColumn = parquetProps.getColumnDescriptorValuesWriter(path.getMaxDefinitionLevel(), MIN_SLAB_SIZE, pageSizeThreshold);
 
     int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSizeThreshold, 10);
     this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSizeThreshold);
@@ -239,7 +241,7 @@ final class ColumnWriterV1 implements ColumnWriter {
     if (valueCount > 0) {
       writePage();
     }
-    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+    final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
     if (dictionaryPage != null) {
       if (DEBUG) LOG.debug("write dictionary");
       try {
@@ -251,6 +253,16 @@ final class ColumnWriterV1 implements ColumnWriter {
     }
   }
 
+  @Override
+  public void close() {
+    flush();
+    // Close the Values writers.
+    repetitionLevelColumn.close();
+    definitionLevelColumn.close();
+    dataColumn.close();
+  }
+
+  @Override
   public long getBufferedSizeInMemory() {
     return repetitionLevelColumn.getBufferedSize()
         + definitionLevelColumn.getBufferedSize()

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
index 5e936a2..8249b72 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/impl/ColumnWriterV2.java
@@ -25,6 +25,7 @@ import java.io.IOException;
 
 import org.apache.parquet.Ints;
 import org.apache.parquet.Log;
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.ColumnDescriptor;
@@ -69,8 +70,10 @@ final class ColumnWriterV2 implements ColumnWriter {
     this.pageWriter = pageWriter;
     resetStatistics();
 
-    this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize);
-    this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize);
+    this.repetitionLevelColumn = new RunLengthBitPackingHybridEncoder(
+        getWidthFromMaxInt(path.getMaxRepetitionLevel()), MIN_SLAB_SIZE, pageSize, parquetProps.getAllocator());
+    this.definitionLevelColumn = new RunLengthBitPackingHybridEncoder(
+        getWidthFromMaxInt(path.getMaxDefinitionLevel()), MIN_SLAB_SIZE, pageSize, parquetProps.getAllocator());
 
     int initialSlabSize = CapacityByteArrayOutputStream.initialSlabSizeHeuristic(MIN_SLAB_SIZE, pageSize, 10);
     this.dataColumn = parquetProps.getValuesWriter(path, initialSlabSize, pageSize);
@@ -113,6 +116,22 @@ final class ColumnWriterV2 implements ColumnWriter {
     ++ valueCount;
   }
 
+  @Override
+  public void close() {
+    // Close the Values writers.
+    repetitionLevelColumn.close();
+    definitionLevelColumn.close();
+    dataColumn.close();
+  }
+
+  @Override
+  public long getBufferedSizeInMemory() {
+    return repetitionLevelColumn.getBufferedSize()
+      + definitionLevelColumn.getBufferedSize()
+      + dataColumn.getBufferedSize()
+      + pageWriter.getMemSize();
+  }
+
   /**
    * writes the current value
    * @param value
@@ -208,7 +227,7 @@ final class ColumnWriterV2 implements ColumnWriter {
    * Is called right after writePage
    */
   public void finalizeColumnChunk() {
-    final DictionaryPage dictionaryPage = dataColumn.createDictionaryPage();
+    final DictionaryPage dictionaryPage = dataColumn.toDictPageAndClose();
     if (dictionaryPage != null) {
       if (DEBUG) LOG.debug("write dictionary");
       try {
@@ -252,7 +271,7 @@ final class ColumnWriterV2 implements ColumnWriter {
   }
 
   /**
-   * @param prefix a prefix to format lines
+   * @param indent a prefix to format lines
    * @return a formatted string showing how memory is used
    */
   public String memUsageString(String indent) {

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
index 4ad7d9f..a1d8647 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/page/PageWriter.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.page;
 
 import java.io.IOException;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.statistics.Statistics;
@@ -54,7 +55,6 @@ public interface PageWriter {
    * @param dataEncoding the encoding for the data
    * @param data the data encoded with dataEncoding
    * @param statistics optional stats for this page
-   * @param metadata optional free form key values
    * @throws IOException
    */
   void writePageV2(

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
index a3d8920..03aa2f8 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesReader.java
@@ -20,13 +20,14 @@ package org.apache.parquet.column.values;
 
 import java.io.IOException;
 
+import java.nio.ByteBuffer;
 import org.apache.parquet.io.ParquetDecodingException;
 import org.apache.parquet.io.api.Binary;
 
 /**
  * Base class to implement an encoding for a given column type.
  *
- * A ValuesReader is provided with a page (byte-array) and is responsible
+ * A ValuesReader is provided with a page (byte-buffer) and is responsible
  * for deserializing the primitive values stored in that page.
  *
  * Given that pages are homogeneous (store only a single type), typical subclasses
@@ -58,8 +59,20 @@ public abstract class ValuesReader {
    *
    * @throws IOException
    */
-  public abstract void initFromPage(int valueCount, byte[] page, int offset) throws IOException;
-  
+  public abstract void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException;
+
+  /**
+   * Same functionality as method of the same name that takes a ByteBuffer instead of a byte[].
+   *
+   * This method is only provided for backward compatibility and will be removed in a future release.
+   * Please update any code using it as soon as possible.
+   * @see #initFromPage(int, ByteBuffer, int)
+   */
+  @Deprecated
+  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
+    this.initFromPage(valueCount, ByteBuffer.wrap(page), offset);
+  }
+
   /**
    * Called to return offset of the next section
    * @return offset of the next section

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
index c8f31b9..e5cf8e9 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/ValuesWriter.java
@@ -56,9 +56,20 @@ public abstract class ValuesWriter {
   public abstract void reset();
 
   /**
+   * Called to close the values writer. Any output stream is closed and can no longer be used.
+   * All resources are released.
+   */
+  public void close() {
+  }
+
+  /**
+   * Returns the dictionary generated by this writer if one was created.
+   * As part of this operation the dictionary is closed and will not have
+   * any new values written into it.
+   *
    * @return the dictionary page or null if not dictionary based
    */
-  public DictionaryPage createDictionaryPage() {
+  public DictionaryPage toDictPageAndClose() {
     return null;
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
index f713263..f540c39 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesReader.java
@@ -21,9 +21,10 @@ package org.apache.parquet.column.values.bitpacking;
 import static org.apache.parquet.bytes.BytesUtils.getWidthFromMaxInt;
 import static org.apache.parquet.column.values.bitpacking.BitPacking.createBitPackingReader;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
@@ -39,7 +40,7 @@ import org.apache.parquet.io.ParquetDecodingException;
 public class BitPackingValuesReader extends ValuesReader {
   private static final Log LOG = Log.getLog(BitPackingValuesReader.class);
 
-  private ByteArrayInputStream in;
+  private ByteBufferInputStream in;
   private BitPackingReader bitPackingReader;
   private final int bitsPerValue;
   private int nextOffset;
@@ -66,18 +67,18 @@ public class BitPackingValuesReader extends ValuesReader {
 
   /**
    * {@inheritDoc}
-   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(long, byte[], int)
+   * @see org.apache.parquet.column.values.ValuesReader#initFromPage(int, ByteBuffer, int)
    */
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
     int effectiveBitLength = valueCount * bitsPerValue;
     int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength);
     if (Log.DEBUG) LOG.debug("reading " + length + " bytes for " + valueCount + " values of size " + bitsPerValue + " bits." );
-    this.in = new ByteArrayInputStream(in, offset, length);
+    this.in = new ByteBufferInputStream(in, offset, length);
     this.bitPackingReader = createBitPackingReader(bitsPerValue, this.in, valueCount);
     this.nextOffset = offset + length;
   }
-  
+
   @Override
   public int getNextOffset() {
     return nextOffset;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
index 24436ef..08751a0 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/BitPackingValuesWriter.java
@@ -24,6 +24,7 @@ import static org.apache.parquet.column.values.bitpacking.BitPacking.getBitPacki
 
 import java.io.IOException;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
 import org.apache.parquet.column.Encoding;
@@ -47,9 +48,9 @@ public class BitPackingValuesWriter extends ValuesWriter {
    * @param bound the maximum value stored by this column
    * @param pageSize
    */
-  public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize) {
+  public BitPackingValuesWriter(int bound, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
     this.bitsPerValue = getWidthFromMaxInt(bound);
-    this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
+    this.out = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator);
     init();
   }
 
@@ -103,6 +104,11 @@ public class BitPackingValuesWriter extends ValuesWriter {
     init();
   }
 
+  @Override
+  public void close() {
+    out.close();
+  }
+
   /**
    * {@inheritDoc}
    * @see org.apache.parquet.column.values.ValuesWriter#getAllocatedSize()

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
index c0ab7e0..f4c8c8e 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesReader.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.bitpacking;
 
 import java.io.IOException;
 import java.util.Arrays;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesUtils;
@@ -34,7 +35,7 @@ public class ByteBitPackingValuesReader extends ValuesReader {
   private final BytePacker packer;
   private final int[] decoded = new int[VALUES_AT_A_TIME];
   private int decodedPosition = VALUES_AT_A_TIME - 1;
-  private byte[] encoded;
+  private ByteBuffer encoded;
   private int encodedPos;
   private int nextOffset;
 
@@ -47,8 +48,13 @@ public class ByteBitPackingValuesReader extends ValuesReader {
   public int readInteger() {
     ++ decodedPosition;
     if (decodedPosition == decoded.length) {
-      if (encodedPos + bitWidth > encoded.length) {
-        packer.unpack8Values(Arrays.copyOfRange(encoded, encodedPos, encodedPos + bitWidth), 0, decoded, 0);
+      encoded.position(encodedPos);
+      if (encodedPos + bitWidth > encoded.limit()) {
+        // unpack8Values needs at least bitWidth bytes to read from,
+        // We have to fill in 0 byte at the end of encoded bytes.
+        byte[] tempEncode = new byte[bitWidth];
+        encoded.get(tempEncode, 0, encoded.limit() - encodedPos);
+        packer.unpack8Values(tempEncode, 0, decoded, 0);
       } else {
         packer.unpack8Values(encoded, encodedPos, decoded, 0);
       }
@@ -59,7 +65,7 @@ public class ByteBitPackingValuesReader extends ValuesReader {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] page, int offset)
+  public void initFromPage(int valueCount, ByteBuffer page, int offset)
       throws IOException {
     int effectiveBitLength = valueCount * bitWidth;
     int length = BytesUtils.paddedByteCountFromBits(effectiveBitLength); // ceil

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
index d0240bb..a33e22c 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/bitpacking/ByteBitPackingValuesWriter.java
@@ -22,6 +22,7 @@ import static org.apache.parquet.column.Encoding.BIT_PACKED;
 
 import java.io.IOException;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.Encoding;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
index 865eea2..caea5b5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitReader.java
@@ -19,13 +19,14 @@
 package org.apache.parquet.column.values.boundedint;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.io.ParquetDecodingException;
 
 class BitReader {
   private int currentByte = 0;
   private int currentPosition = 8;
-  private byte[] buf;
+  private ByteBuffer buf;
   private int currentBufferPosition = 0;
   private static final int[] byteGetValueMask = new int[8];
   private static final int[] readMask = new int[32];
@@ -50,7 +51,7 @@ class BitReader {
    * The array is not copied, so must not be mutated during the course of
    * reading.
    */
-  public void prepare(byte[] buf, int offset, int length) {
+  public void prepare(ByteBuffer buf, int offset, int length) {
     this.buf = buf;
     this.endBufferPosistion = offset + length;
     currentByte = 0;
@@ -87,7 +88,7 @@ class BitReader {
 
   private int getNextByte() {
     if (currentBufferPosition < endBufferPosistion) {
-      return buf[currentBufferPosition++] & 0xFF;
+      return buf.get(currentBufferPosition++) & 0xFF;
     }
     return 0;
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
index 1d1d9d1..9489714 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BitWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column.values.boundedint;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
@@ -41,8 +42,8 @@ class BitWriter {
     }
   }
 
-  public BitWriter(int initialCapacity, int pageSize) {
-    this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize);
+  public BitWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
+    this.baos = new CapacityByteArrayOutputStream(initialCapacity, pageSize, allocator);
   }
 
   public void writeBit(boolean bit) {
@@ -156,4 +157,11 @@ class BitWriter {
   public String memUsageString(String prefix) {
     return baos.memUsageString(prefix);
   }
+
+  public void close() {
+    currentByte = 0;
+    currentBytePosition = 0;
+    finished = false;
+    baos.close();
+  }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
index 3ace6e2..bbbf8da 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesFactory.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column.values.boundedint;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.ValuesWriter;
 
@@ -26,7 +27,7 @@ public abstract class BoundedIntValuesFactory {
     return bound == 0 ? new ZeroIntegerValuesReader() : new BoundedIntValuesReader(bound);
   }
 
-  public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize) {
-    return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize);
+  public static ValuesWriter getBoundedWriter(int bound, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
+    return bound == 0 ? new DevNullValuesWriter() : new BoundedIntValuesWriter(bound, initialCapacity, pageSize, allocator);
   }
 }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
index dda4431..c322125 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesReader.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.boundedint;
 import static org.apache.parquet.Log.DEBUG;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesUtils;
@@ -70,8 +71,8 @@ class BoundedIntValuesReader extends ValuesReader {
   // bytes would have to be serialized). This is the flip-side
   // to BoundedIntColumnWriter.writeData(BytesOutput)
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
-    if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in[offset] + " " + in[offset + 1] + " " + in[offset + 2] + " " + in[offset + 3] + " ");
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
+    if (DEBUG) LOG.debug("reading size at "+ offset + ": " + in.get(offset) + " " + in.get(offset + 1) + " " + in.get(offset + 2) + " " + in.get(offset + 3) + " ");
     int totalBytes = BytesUtils.readIntLittleEndian(in, offset);
     if (DEBUG) LOG.debug("will read "+ totalBytes + " bytes");
     currentValueCt = 0;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
index 0acaaf7..a90a6e5 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/BoundedIntValuesWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column.values.boundedint;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import static org.apache.parquet.bytes.BytesInput.concat;
 import static org.apache.parquet.column.Encoding.RLE;
 import org.apache.parquet.Log;
@@ -59,11 +60,11 @@ class BoundedIntValuesWriter extends ValuesWriter {
     }
   }
 
-  public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize) {
+  public BoundedIntValuesWriter(int bound, int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
     if (bound == 0) {
       throw new ParquetEncodingException("Value bound cannot be 0. Use DevNullColumnWriter instead.");
     }
-    this.bitWriter = new BitWriter(initialCapacity, pageSize);
+    this.bitWriter = new BitWriter(initialCapacity, pageSize, allocator);
     bitsPerValue = (int)Math.ceil(Math.log(bound + 1)/Math.log(2));
     shouldRepeatThreshold = (bitsPerValue + 9)/(1 + bitsPerValue);
     if (Log.DEBUG) LOG.debug("init column with bit width of " + bitsPerValue + " and repeat threshold of " + shouldRepeatThreshold);
@@ -101,6 +102,11 @@ class BoundedIntValuesWriter extends ValuesWriter {
   }
 
   @Override
+  public void close() {
+    bitWriter.close();
+  }
+
+  @Override
   public void writeInteger(int val) {
     if (currentValue == val) {
       currentValueCt++;

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
index 9201596..8c78c38 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/boundedint/ZeroIntegerValuesReader.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.values.boundedint;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.column.values.ValuesReader;
 
@@ -36,7 +37,7 @@ public class ZeroIntegerValuesReader extends ValuesReader {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset) throws IOException {
+  public void initFromPage(int valueCount, ByteBuffer in, int offset) throws IOException {
     this.nextOffset = offset;
   }
   

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
index c1678ae..3f92deb 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesReader.java
@@ -19,14 +19,15 @@
 package org.apache.parquet.column.values.delta;
 
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.values.ValuesReader;
 import org.apache.parquet.column.values.bitpacking.BytePacker;
 import org.apache.parquet.column.values.bitpacking.Packer;
 import org.apache.parquet.io.ParquetDecodingException;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 /**
  * Read values written by {@link DeltaBinaryPackingValuesWriter}
@@ -40,7 +41,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
    */
   private int valuesRead;
   private int minDeltaInCurrentBlock;
-  private byte[] page;
+  private ByteBuffer page;
   /**
    * stores the decoded values including the first value which is written to the header
    */
@@ -50,7 +51,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
    * when data is not aligned to mini block, which means padding 0s are in the buffer
    */
   private int valuesBuffered;
-  private ByteArrayInputStream in;
+  private ByteBufferInputStream in;
   private int nextOffset;
   private DeltaBinaryPackingConfig config;
   private int[] bitWidths;
@@ -64,8 +65,8 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
    * @throws IOException
    */
   @Override
-  public void initFromPage(int valueCount, byte[] page, int offset) throws IOException {
-    in = new ByteArrayInputStream(page, offset, page.length - offset);
+  public void initFromPage(int valueCount, ByteBuffer page, int offset) throws IOException {
+    in = new ByteBufferInputStream(page, offset, page.limit() - offset);
     this.config = DeltaBinaryPackingConfig.readConfig(in);
     this.page = page;
     this.totalValueCount = BytesUtils.readUnsignedVarInt(in);
@@ -78,7 +79,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
     while (valuesBuffered < totalValueCount) { //values Buffered could be more than totalValueCount, since we flush on a mini block basis
       loadNewBlockToBuffer();
     }
-    this.nextOffset = page.length - in.available();
+    this.nextOffset = page.limit() - in.available();
   }
   
   @Override
@@ -151,7 +152,7 @@ public class DeltaBinaryPackingValuesReader extends ValuesReader {
 
   private void unpack8Values(BytePacker packer) {
     //calculate the pos because the packer api uses array not stream
-    int pos = page.length - in.available();
+    int pos = page.limit() - in.available();
     packer.unpack8Values(page, pos, valuesBuffer, valuesBuffered);
     this.valuesBuffered += 8;
     //sync the pos in stream

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
index 1b3692f..421182f 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/delta/DeltaBinaryPackingValuesWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column.values.delta;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
@@ -111,16 +112,16 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
    */
   private int minDeltaInCurrentBlock = Integer.MAX_VALUE;
 
-  public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize) {
-    this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize);
+  public DeltaBinaryPackingValuesWriter(int slabSize, int pageSize, ByteBufferAllocator allocator) {
+    this(DEFAULT_NUM_BLOCK_VALUES, DEFAULT_NUM_MINIBLOCKS, slabSize, pageSize, allocator);
   }
 
-  public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize) {
+  public DeltaBinaryPackingValuesWriter(int blockSizeInValues, int miniBlockNum, int slabSize, int pageSize, ByteBufferAllocator allocator) {
     this.config = new DeltaBinaryPackingConfig(blockSizeInValues, miniBlockNum);
     bitWidths = new int[config.miniBlockNumInABlock];
     deltaBlockBuffer = new int[blockSizeInValues];
     miniBlockByteBuffer = new byte[config.miniBlockSizeInValues * MAX_BITWIDTH];
-    baos = new CapacityByteArrayOutputStream(slabSize, pageSize);
+    baos = new CapacityByteArrayOutputStream(slabSize, pageSize, allocator);
   }
 
   @Override
@@ -258,6 +259,14 @@ public class DeltaBinaryPackingValuesWriter extends ValuesWriter {
   }
 
   @Override
+  public void close() {
+    this.totalValueCount = 0;
+    this.baos.close();
+    this.deltaValuesToFlush = 0;
+    this.minDeltaInCurrentBlock = Integer.MAX_VALUE;
+  }
+
+  @Override
   public long getAllocatedSize() {
     return baos.getCapacity();
   }

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
index fb9bdc5..41f221d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesReader.java
@@ -21,6 +21,7 @@ package org.apache.parquet.column.values.deltalengthbytearray;
 import static org.apache.parquet.Log.DEBUG;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.Log;
 import org.apache.parquet.column.values.ValuesReader;
@@ -28,7 +29,7 @@ import org.apache.parquet.column.values.delta.DeltaBinaryPackingValuesReader;
 import org.apache.parquet.io.api.Binary;
 
 /**
- * Reads binary data written by {@link DeltaLengthByteArrayValueWriter}
+ * Reads binary data written by {@link DeltaLengthByteArrayValuesWriter}
  *
  * @author Aniket Mokashi
  *
@@ -37,7 +38,7 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {
 
   private static final Log LOG = Log.getLog(DeltaLengthByteArrayValuesReader.class);
   private ValuesReader lengthReader;
-  private byte[] in;
+  private ByteBuffer in;
   private int offset;
 
   public DeltaLengthByteArrayValuesReader() {
@@ -45,9 +46,9 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] in, int offset)
+  public void initFromPage(int valueCount, ByteBuffer in, int offset)
       throws IOException {
-    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.length - offset));
+    if (DEBUG) LOG.debug("init from page at offset "+ offset + " for length " + (in.limit() - offset));
     lengthReader.initFromPage(valueCount, in, offset);
     offset = lengthReader.getNextOffset();
     this.in = in;
@@ -59,7 +60,7 @@ public class DeltaLengthByteArrayValuesReader extends ValuesReader {
     int length = lengthReader.readInteger();
     int start = offset;
     offset = start + length;
-    return Binary.fromConstantByteArray(in, start, length);
+    return Binary.fromConstantByteBuffer(in, start, length);
   }
 
   @Override

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
index 3f686cc..2d6b213 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltalengthbytearray/DeltaLengthByteArrayValuesWriter.java
@@ -20,6 +20,7 @@ package org.apache.parquet.column.values.deltalengthbytearray;
 
 import java.io.IOException;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.bytes.CapacityByteArrayOutputStream;
@@ -48,13 +49,13 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
   private CapacityByteArrayOutputStream arrayOut;
   private LittleEndianDataOutputStream out;
 
-  public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize) {
-    arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize);
+  public DeltaLengthByteArrayValuesWriter(int initialSize, int pageSize, ByteBufferAllocator allocator) {
+    arrayOut = new CapacityByteArrayOutputStream(initialSize, pageSize, allocator);
     out = new LittleEndianDataOutputStream(arrayOut);
     lengthWriter = new DeltaBinaryPackingValuesWriter(
         DeltaBinaryPackingValuesWriter.DEFAULT_NUM_BLOCK_VALUES,
         DeltaBinaryPackingValuesWriter.DEFAULT_NUM_MINIBLOCKS,
-        initialSize, pageSize);
+        initialSize, pageSize, allocator);
   }
 
   @Override
@@ -95,6 +96,12 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
   }
 
   @Override
+  public void close() {
+    lengthWriter.close();
+    arrayOut.close();
+  }
+
+  @Override
   public long getAllocatedSize() {
     return lengthWriter.getAllocatedSize() + arrayOut.getCapacity();
   }
@@ -104,3 +111,4 @@ public class DeltaLengthByteArrayValuesWriter extends ValuesWriter {
     return arrayOut.memUsageString(lengthWriter.memUsageString(prefix) + " DELTA_LENGTH_BYTE_ARRAY");
   }
 }
+

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
index 87ec08e..c2cfd6d 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayReader.java
@@ -19,6 +19,7 @@
 package org.apache.parquet.column.values.deltastrings;
 
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
 import org.apache.parquet.column.values.RequiresPreviousReader;
 import org.apache.parquet.column.values.ValuesReader;
@@ -45,13 +46,13 @@ public class DeltaByteArrayReader extends ValuesReader implements RequiresPrevio
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] page, int offset)
+  public void initFromPage(int valueCount, ByteBuffer page, int offset)
       throws IOException {
     prefixLengthReader.initFromPage(valueCount, page, offset);
     int next = prefixLengthReader.getNextOffset();
     suffixReader.initFromPage(valueCount, page, next);	
   }
-
+  
   @Override
   public void skip() {
     prefixLengthReader.skip();

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
index 54234db..1604ddb 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/deltastrings/DeltaByteArrayWriter.java
@@ -18,6 +18,7 @@
  */
 package org.apache.parquet.column.values.deltastrings;
 
+import org.apache.parquet.bytes.ByteBufferAllocator;
 import org.apache.parquet.bytes.BytesInput;
 import org.apache.parquet.column.Encoding;
 import org.apache.parquet.column.values.ValuesWriter;
@@ -41,9 +42,9 @@ public class DeltaByteArrayWriter extends ValuesWriter{
   private ValuesWriter suffixWriter;
   private byte[] previous;
 
-  public DeltaByteArrayWriter(int initialCapacity, int pageSize) {
-    this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize);
-    this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize);
+  public DeltaByteArrayWriter(int initialCapacity, int pageSize, ByteBufferAllocator allocator) {
+    this.prefixLengthWriter = new DeltaBinaryPackingValuesWriter(128, 4, initialCapacity, pageSize, allocator);
+    this.suffixWriter = new DeltaLengthByteArrayValuesWriter(initialCapacity, pageSize, allocator);
     this.previous = new byte[0];
   }
 
@@ -70,6 +71,12 @@ public class DeltaByteArrayWriter extends ValuesWriter{
   }
 
   @Override
+  public void close() {
+    prefixLengthWriter.close();
+    suffixWriter.close();
+  }
+
+  @Override
   public long getAllocatedSize() {
     return prefixLengthWriter.getAllocatedSize() + suffixWriter.getAllocatedSize();
   }
@@ -85,6 +92,7 @@ public class DeltaByteArrayWriter extends ValuesWriter{
     int i = 0;
     byte[] vb = v.getBytes();
     int length = previous.length < vb.length ? previous.length : vb.length;
+    // find the number of matching prefix bytes between this value and the previous one
     for(i = 0; (i < length) && (previous[i] == vb[i]); i++);
     prefixLengthWriter.writeInteger(i);
     suffixWriter.writeBytes(v.slice(i, vb.length - i));

http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/6b605a4e/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
----------------------------------------------------------------------
diff --git a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
index 972c87e..e421da9 100644
--- a/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
+++ b/parquet-column/src/main/java/org/apache/parquet/column/values/dictionary/DictionaryValuesReader.java
@@ -20,9 +20,10 @@ package org.apache.parquet.column.values.dictionary;
 
 import static org.apache.parquet.Log.DEBUG;
 
-import java.io.ByteArrayInputStream;
 import java.io.IOException;
+import java.nio.ByteBuffer;
 
+import org.apache.parquet.bytes.ByteBufferInputStream;
 import org.apache.parquet.Log;
 import org.apache.parquet.bytes.BytesUtils;
 import org.apache.parquet.column.Dictionary;
@@ -40,7 +41,7 @@ import org.apache.parquet.io.api.Binary;
 public class DictionaryValuesReader extends ValuesReader {
   private static final Log LOG = Log.getLog(DictionaryValuesReader.class);
 
-  private ByteArrayInputStream in;
+  private ByteBufferInputStream in;
 
   private Dictionary dictionary;
 
@@ -51,12 +52,12 @@ public class DictionaryValuesReader extends ValuesReader {
   }
 
   @Override
-  public void initFromPage(int valueCount, byte[] page, int offset)
+  public void initFromPage(int valueCount, ByteBuffer page, int offset)
       throws IOException {
-    this.in = new ByteArrayInputStream(page, offset, page.length - offset);
-    if (page.length - offset > 0) {
+    this.in = new ByteBufferInputStream(page, offset, page.limit() - offset);
+    if (page.limit() - offset > 0) {
       if (DEBUG)
-        LOG.debug("init from page at offset " + offset + " for length " + (page.length - offset));
+        LOG.debug("init from page at offset " + offset + " for length " + (page.limit() - offset));
       int bitWidth = BytesUtils.readIntLittleEndianOnOneByte(in);
       if (DEBUG) LOG.debug("bit width " + bitWidth);
       decoder = new RunLengthBitPackingHybridDecoder(bitWidth, in);