You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/03/16 17:34:21 UTC

parquet-cpp git commit: PARQUET-542: Support custom memory allocators

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 5c4f64557 -> 225fba78b


PARQUET-542: Support custom memory allocators

Added `MemoryAllocator` interface and a default implementation, ensured `MemPool` can use a custom allocator, added `Vector<T>` to replace `std::vector<T>`.

Author: Aliaksei Sandryhaila <al...@hp.com>

Closes #72 from asandryh/PARQUET-542 and squashes the following commits:

a740edb [Aliaksei Sandryhaila] Incorporated PR feedback.
6422e0d [Aliaksei Sandryhaila] Added MemoryAllocator interface and default implementation, ensured MemPool can use a custom allocator, added Vector<T> to replace std::vector<T>.


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/225fba78
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/225fba78
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/225fba78

Branch: refs/heads/master
Commit: 225fba78b7aa162f9cd3608df3b48a309b8557c4
Parents: 5c4f645
Author: Aliaksei Sandryhaila <al...@hp.com>
Authored: Wed Mar 16 09:34:35 2016 -0700
Committer: Wes McKinney <we...@apache.org>
Committed: Wed Mar 16 09:34:35 2016 -0700

----------------------------------------------------------------------
 src/parquet/api/io.h                            |  1 +
 src/parquet/column/reader.cc                    | 27 +++---
 src/parquet/column/reader.h                     | 12 ++-
 src/parquet/column/scanner.cc                   | 19 +++--
 src/parquet/column/scanner.h                    | 17 ++--
 src/parquet/column/test-util.h                  | 10 +--
 src/parquet/encodings/decoder.h                 |  4 +-
 src/parquet/encodings/delta-bit-pack-encoding.h | 13 +--
 .../encodings/delta-byte-array-encoding.h       |  7 +-
 .../delta-length-byte-array-encoding.h          |  8 +-
 src/parquet/encodings/dictionary-encoding.h     | 45 +++++-----
 src/parquet/encodings/encoder.h                 |  5 +-
 src/parquet/encodings/encoding-test.cc          |  5 +-
 src/parquet/encodings/plain-encoding.h          | 15 ++--
 src/parquet/file/file-deserialize-test.cc       |  5 +-
 src/parquet/file/reader-internal.cc             | 25 +++---
 src/parquet/file/reader-internal.h              | 17 ++--
 src/parquet/file/reader.cc                      | 15 ++--
 src/parquet/file/reader.h                       |  7 +-
 src/parquet/reader-test.cc                      |  7 +-
 src/parquet/util/CMakeLists.txt                 |  3 +
 src/parquet/util/buffer-test.cc                 |  2 -
 src/parquet/util/buffer.cc                      | 90 ++++++++++++++++++--
 src/parquet/util/buffer.h                       | 37 +++++++-
 src/parquet/util/input-output-test.cc           |  2 +-
 src/parquet/util/input.cc                       | 14 ++-
 src/parquet/util/input.h                        | 18 ++--
 src/parquet/util/mem-allocator-test.cc          | 67 +++++++++++++++
 src/parquet/util/mem-allocator.cc               | 61 +++++++++++++
 src/parquet/util/mem-allocator.h                | 60 +++++++++++++
 src/parquet/util/mem-pool.cc                    | 11 +--
 src/parquet/util/mem-pool.h                     |  9 +-
 src/parquet/util/output.cc                      | 13 +--
 src/parquet/util/output.h                       |  6 +-
 34 files changed, 496 insertions(+), 161 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/api/io.h
----------------------------------------------------------------------
diff --git a/src/parquet/api/io.h b/src/parquet/api/io.h
index a8e555d..3a9b148 100644
--- a/src/parquet/api/io.h
+++ b/src/parquet/api/io.h
@@ -21,6 +21,7 @@
 #include "parquet/exception.h"
 #include "parquet/util/buffer.h"
 #include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
 #include "parquet/util/output.h"
 
 #endif // PARQUET_API_IO_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index bf76d4c..902704b 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -29,11 +29,12 @@
 namespace parquet_cpp {
 
 ColumnReader::ColumnReader(const ColumnDescriptor* descr,
-    std::unique_ptr<PageReader> pager)
+    std::unique_ptr<PageReader> pager, MemoryAllocator* allocator)
   : descr_(descr),
     pager_(std::move(pager)),
     num_buffered_values_(0),
-    num_decoded_values_(0) {}
+    num_decoded_values_(0),
+    allocator_(allocator) {}
 
 template <int TYPE>
 void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
@@ -59,7 +60,7 @@ void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
     // TODO(wesm): investigate whether this all-or-nothing decoding of the
     // dictionary makes sense and whether performance can be improved
 
-    auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_);
+    auto decoder = std::make_shared<DictionaryDecoder<TYPE> >(descr_, allocator_);
     decoder->SetDict(&dictionary);
     decoders_[encoding] = decoder;
   } else {
@@ -196,24 +197,26 @@ int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels)
 
 std::shared_ptr<ColumnReader> ColumnReader::Make(
     const ColumnDescriptor* descr,
-    std::unique_ptr<PageReader> pager) {
+    std::unique_ptr<PageReader> pager,
+    MemoryAllocator* allocator) {
   switch (descr->physical_type()) {
     case Type::BOOLEAN:
-      return std::make_shared<BoolReader>(descr, std::move(pager));
+      return std::make_shared<BoolReader>(descr, std::move(pager), allocator);
     case Type::INT32:
-      return std::make_shared<Int32Reader>(descr, std::move(pager));
+      return std::make_shared<Int32Reader>(descr, std::move(pager), allocator);
     case Type::INT64:
-      return std::make_shared<Int64Reader>(descr, std::move(pager));
+      return std::make_shared<Int64Reader>(descr, std::move(pager), allocator);
     case Type::INT96:
-      return std::make_shared<Int96Reader>(descr, std::move(pager));
+      return std::make_shared<Int96Reader>(descr, std::move(pager), allocator);
     case Type::FLOAT:
-      return std::make_shared<FloatReader>(descr, std::move(pager));
+      return std::make_shared<FloatReader>(descr, std::move(pager), allocator);
     case Type::DOUBLE:
-      return std::make_shared<DoubleReader>(descr, std::move(pager));
+      return std::make_shared<DoubleReader>(descr, std::move(pager), allocator);
     case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayReader>(descr, std::move(pager));
+      return std::make_shared<ByteArrayReader>(descr, std::move(pager), allocator);
     case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager));
+      return std::make_shared<FixedLenByteArrayReader>(descr,
+          std::move(pager), allocator);
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index f6bf100..7c880fb 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -30,15 +30,17 @@
 #include "parquet/exception.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
+#include "parquet/util/mem-allocator.h"
 
 namespace parquet_cpp {
 
 class ColumnReader {
  public:
-  ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>);
+  ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
+      MemoryAllocator* allocator = default_allocator());
 
   static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor*,
-      std::unique_ptr<PageReader>);
+      std::unique_ptr<PageReader>, MemoryAllocator* allocator = default_allocator());
 
   // Returns true if there are still values in this column.
   bool HasNext() {
@@ -95,6 +97,8 @@ class ColumnReader {
   // The number of values from the current data page that have been decoded
   // into memory
   int num_decoded_values_;
+
+  MemoryAllocator* allocator_;
 };
 
 // API to read values from a single column. This is the main client facing API.
@@ -104,8 +108,8 @@ class TypedColumnReader : public ColumnReader {
   typedef typename type_traits<TYPE>::value_type T;
 
   TypedColumnReader(const ColumnDescriptor* schema,
-      std::unique_ptr<PageReader> pager) :
-      ColumnReader(schema, std::move(pager)),
+      std::unique_ptr<PageReader> pager, MemoryAllocator* allocator) :
+      ColumnReader(schema, std::move(pager), allocator),
       current_decoder_(NULL) {
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc
index 4a0b32f..f88b37c 100644
--- a/src/parquet/column/scanner.cc
+++ b/src/parquet/column/scanner.cc
@@ -25,24 +25,25 @@
 namespace parquet_cpp {
 
 std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader,
-    int64_t batch_size) {
+    int64_t batch_size, MemoryAllocator* allocator) {
   switch (col_reader->type()) {
     case Type::BOOLEAN:
-      return std::make_shared<BoolScanner>(col_reader, batch_size);
+      return std::make_shared<BoolScanner>(col_reader, batch_size, allocator);
     case Type::INT32:
-      return std::make_shared<Int32Scanner>(col_reader, batch_size);
+      return std::make_shared<Int32Scanner>(col_reader, batch_size, allocator);
     case Type::INT64:
-      return std::make_shared<Int64Scanner>(col_reader, batch_size);
+      return std::make_shared<Int64Scanner>(col_reader, batch_size, allocator);
     case Type::INT96:
-      return std::make_shared<Int96Scanner>(col_reader, batch_size);
+      return std::make_shared<Int96Scanner>(col_reader, batch_size, allocator);
     case Type::FLOAT:
-      return std::make_shared<FloatScanner>(col_reader, batch_size);
+      return std::make_shared<FloatScanner>(col_reader, batch_size, allocator);
     case Type::DOUBLE:
-      return std::make_shared<DoubleScanner>(col_reader, batch_size);
+      return std::make_shared<DoubleScanner>(col_reader, batch_size, allocator);
     case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayScanner>(col_reader, batch_size);
+      return std::make_shared<ByteArrayScanner>(col_reader, batch_size, allocator);
     case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size);
+      return std::make_shared<FixedLenByteArrayScanner>(col_reader,
+          batch_size, allocator);
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
index f83cd81..ce2c26f 100644
--- a/src/parquet/column/scanner.h
+++ b/src/parquet/column/scanner.h
@@ -29,6 +29,7 @@
 #include "parquet/exception.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
+#include "parquet/util/mem-allocator.h"
 
 namespace parquet_cpp {
 
@@ -37,10 +38,12 @@ static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
 class Scanner {
  public:
   explicit Scanner(std::shared_ptr<ColumnReader> reader,
-      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
+      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+      MemoryAllocator* allocator = default_allocator()) :
       batch_size_(batch_size),
       level_offset_(0),
       levels_buffered_(0),
+      value_buffer_(0, allocator),
       value_offset_(0),
       values_buffered_(0),
       reader_(reader) {
@@ -52,7 +55,8 @@ class Scanner {
   virtual ~Scanner() {}
 
   static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
-      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE);
+      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+      MemoryAllocator* allocator = default_allocator());
 
   virtual void PrintNext(std::ostream& out, int width) = 0;
 
@@ -78,7 +82,7 @@ class Scanner {
   int level_offset_;
   int levels_buffered_;
 
-  std::vector<uint8_t> value_buffer_;
+  OwnedMutableBuffer value_buffer_;
   int value_offset_;
   int64_t values_buffered_;
 
@@ -93,11 +97,12 @@ class TypedScanner : public Scanner {
   typedef typename type_traits<TYPE>::value_type T;
 
   explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
-      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
-      Scanner(reader, batch_size) {
+      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+      MemoryAllocator* allocator = default_allocator()) :
+      Scanner(reader, batch_size, allocator) {
     typed_reader_ = static_cast<TypedColumnReader<TYPE>*>(reader.get());
     int value_byte_size = type_traits<TYPE>::value_byte_size;
-    value_buffer_.resize(batch_size_ * value_byte_size);
+    value_buffer_.Resize(batch_size_ * value_byte_size);
     values_ = reinterpret_cast<T*>(&value_buffer_[0]);
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 4d10a42..0b9b599 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -252,7 +252,7 @@ class DictionaryPageBuilder {
         if (TN == Type::FIXED_LEN_BYTE_ARRAY) {
           type_length = d->type_length();
         }
-        encoder_.reset(new DictEncoder<TC>(&pool_, type_length));
+        encoder_.reset(new DictEncoder<TC>(&pool_, default_allocator(), type_length));
   }
 
   ~DictionaryPageBuilder() {
@@ -260,7 +260,6 @@ class DictionaryPageBuilder {
   }
 
   shared_ptr<Buffer> AppendValues(const vector<TC>& values) {
-    shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>();
     int num_values = values.size();
     // Dictionary encoding
     for (int i = 0; i < num_values; ++i) {
@@ -268,7 +267,8 @@ class DictionaryPageBuilder {
     }
     num_dict_values_ = encoder_->num_entries();
     have_values_ = true;
-    rle_indices->Resize(sizeof(int) * encoder_->EstimatedDataEncodedSize());
+    shared_ptr<OwnedMutableBuffer> rle_indices = std::make_shared<OwnedMutableBuffer>(
+        sizeof(int) * encoder_->EstimatedDataEncodedSize());
     int actual_bytes = encoder_->WriteIndices(rle_indices->mutable_data(),
         rle_indices->size());
     rle_indices->Resize(actual_bytes);
@@ -277,8 +277,8 @@ class DictionaryPageBuilder {
   }
 
   shared_ptr<Buffer> WriteDict() {
-    shared_ptr<OwnedMutableBuffer> dict_buffer = std::make_shared<OwnedMutableBuffer>();
-    dict_buffer->Resize(encoder_->dict_encoded_size());
+    shared_ptr<OwnedMutableBuffer> dict_buffer = std::make_shared<OwnedMutableBuffer>(
+        encoder_->dict_encoded_size());
     encoder_->WriteDict(dict_buffer->mutable_data());
     return dict_buffer;
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/decoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h
index 55b29e8..5bbec0b 100644
--- a/src/parquet/encodings/decoder.h
+++ b/src/parquet/encodings/decoder.h
@@ -22,6 +22,7 @@
 
 #include "parquet/exception.h"
 #include "parquet/types.h"
+#include "parquet/util/mem-allocator.h"
 
 namespace parquet_cpp {
 
@@ -54,8 +55,7 @@ class Decoder {
   const Encoding::type encoding() const { return encoding_; }
 
  protected:
-  explicit Decoder(const ColumnDescriptor* descr,
-      const Encoding::type& encoding)
+  explicit Decoder(const ColumnDescriptor* descr, const Encoding::type& encoding)
       : descr_(descr), encoding_(encoding), num_values_(0) {}
 
   // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/delta-bit-pack-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h
index 3e36af6..4bc0418 100644
--- a/src/parquet/encodings/delta-bit-pack-encoding.h
+++ b/src/parquet/encodings/delta-bit-pack-encoding.h
@@ -24,6 +24,7 @@
 
 #include "parquet/encodings/decoder.h"
 #include "parquet/util/bit-stream-utils.inline.h"
+#include "parquet/util/buffer.h"
 
 namespace parquet_cpp {
 
@@ -32,8 +33,10 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
  public:
   typedef typename type_traits<TYPE>::value_type T;
 
-  explicit DeltaBitPackDecoder(const ColumnDescriptor* descr)
-      : Decoder<TYPE>(descr, Encoding::DELTA_BINARY_PACKED) {
+  explicit DeltaBitPackDecoder(const ColumnDescriptor* descr,
+      MemoryAllocator* allocator = default_allocator())
+      : Decoder<TYPE>(descr, Encoding::DELTA_BINARY_PACKED),
+        delta_bit_widths_(0, allocator) {
     if (TYPE != Type::INT32 && TYPE != Type::INT64) {
       throw ParquetException("Delta bit pack encoding should only be for integer data.");
     }
@@ -61,7 +64,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
       ParquetException::EofException();
     }
     if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
-    delta_bit_widths_.resize(num_mini_blocks_);
+    delta_bit_widths_.Resize(num_mini_blocks_);
 
     if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
     for (int i = 0; i < num_mini_blocks_; ++i) {
@@ -81,7 +84,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
     for (int i = 0; i < max_values; ++i) {
       if (UNLIKELY(values_current_mini_block_ == 0)) {
         ++mini_block_idx_;
-        if (mini_block_idx_ < delta_bit_widths_.size()) {
+        if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_.size())) {
           delta_bit_width_ = delta_bit_widths_[mini_block_idx_];
           values_current_mini_block_ = values_per_mini_block_;
         } else {
@@ -111,7 +114,7 @@ class DeltaBitPackDecoder : public Decoder<TYPE> {
 
   int32_t min_delta_;
   size_t mini_block_idx_;
-  std::vector<uint8_t> delta_bit_widths_;
+  OwnedMutableBuffer delta_bit_widths_;
   int delta_bit_width_;
 
   int32_t last_value_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/delta-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h
index 01dceea..e42179a 100644
--- a/src/parquet/encodings/delta-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-byte-array-encoding.h
@@ -28,10 +28,11 @@ namespace parquet_cpp {
 
 class DeltaByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> {
  public:
-  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr)
+  explicit DeltaByteArrayDecoder(const ColumnDescriptor* descr,
+      MemoryAllocator* allocator = default_allocator())
       : Decoder<Type::BYTE_ARRAY>(descr, Encoding::DELTA_BYTE_ARRAY),
-      prefix_len_decoder_(nullptr),
-      suffix_decoder_(nullptr) {
+      prefix_len_decoder_(nullptr, allocator),
+      suffix_decoder_(nullptr, allocator) {
   }
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/delta-length-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h
index a1b4fd3..367528c 100644
--- a/src/parquet/encodings/delta-length-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-length-byte-array-encoding.h
@@ -29,10 +29,10 @@ namespace parquet_cpp {
 
 class DeltaLengthByteArrayDecoder : public Decoder<Type::BYTE_ARRAY> {
  public:
-  explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr)
-      : Decoder<Type::BYTE_ARRAY>(descr,
-          Encoding::DELTA_LENGTH_BYTE_ARRAY),
-      len_decoder_(nullptr) {
+  explicit DeltaLengthByteArrayDecoder(const ColumnDescriptor* descr,
+      MemoryAllocator* allocator = default_allocator()) :
+      Decoder<Type::BYTE_ARRAY>(descr, Encoding::DELTA_LENGTH_BYTE_ARRAY),
+      len_decoder_(nullptr, allocator) {
   }
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 43669b7..07981cf 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -27,8 +27,10 @@
 #include "parquet/encodings/decoder.h"
 #include "parquet/encodings/encoder.h"
 #include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/buffer.h"
 #include "parquet/util/cpu-info.h"
 #include "parquet/util/hash-util.h"
+#include "parquet/util/mem-allocator.h"
 #include "parquet/util/mem-pool.h"
 #include "parquet/util/rle-encoding.h"
 
@@ -42,9 +44,10 @@ class DictionaryDecoder : public Decoder<TYPE> {
   // Initializes the dictionary with values from 'dictionary'. The data in
   // dictionary is not guaranteed to persist in memory after this call so the
   // dictionary decoder needs to copy the data out if necessary.
-  explicit DictionaryDecoder(const ColumnDescriptor* descr)
-      : Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY) {
-  }
+  explicit DictionaryDecoder(const ColumnDescriptor* descr,
+      MemoryAllocator* allocator = default_allocator()):
+      Decoder<TYPE>(descr, Encoding::RLE_DICTIONARY), dictionary_(0, allocator),
+      byte_array_data_(0, allocator) {}
 
   // Perform type-specific initiatialization
   void SetDict(Decoder<TYPE>* dictionary);
@@ -77,11 +80,11 @@ class DictionaryDecoder : public Decoder<TYPE> {
   }
 
   // Only one is set.
-  std::vector<T> dictionary_;
+  Vector<T> dictionary_;
 
   // Data that contains the byte array data (byte_array_dictionary_ just has the
   // pointers).
-  std::vector<uint8_t> byte_array_data_;
+  OwnedMutableBuffer byte_array_data_;
 
   RleDecoder idx_decoder_;
 };
@@ -89,7 +92,7 @@ class DictionaryDecoder : public Decoder<TYPE> {
 template <int TYPE>
 inline void DictionaryDecoder<TYPE>::SetDict(Decoder<TYPE>* dictionary) {
   int num_dictionary_values = dictionary->values_left();
-  dictionary_.resize(num_dictionary_values);
+  dictionary_.Resize(num_dictionary_values);
   dictionary->Decode(&dictionary_[0], num_dictionary_values);
 }
 
@@ -103,14 +106,14 @@ template <>
 inline void DictionaryDecoder<Type::BYTE_ARRAY>::SetDict(
     Decoder<Type::BYTE_ARRAY>* dictionary) {
   int num_dictionary_values = dictionary->values_left();
-  dictionary_.resize(num_dictionary_values);
+  dictionary_.Resize(num_dictionary_values);
   dictionary->Decode(&dictionary_[0], num_dictionary_values);
 
   int total_size = 0;
   for (int i = 0; i < num_dictionary_values; ++i) {
     total_size += dictionary_[i].len;
   }
-  byte_array_data_.resize(total_size);
+  byte_array_data_.Resize(total_size);
   int offset = 0;
   for (int i = 0; i < num_dictionary_values; ++i) {
     memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len);
@@ -123,13 +126,13 @@ template <>
 inline void DictionaryDecoder<Type::FIXED_LEN_BYTE_ARRAY>::SetDict(
     Decoder<Type::FIXED_LEN_BYTE_ARRAY>* dictionary) {
   int num_dictionary_values = dictionary->values_left();
-  dictionary_.resize(num_dictionary_values);
+  dictionary_.Resize(num_dictionary_values);
   dictionary->Decode(&dictionary_[0], num_dictionary_values);
 
   int fixed_len = descr_->type_length();
   int total_size = num_dictionary_values*fixed_len;
 
-  byte_array_data_.resize(total_size);
+  byte_array_data_.Resize(total_size);
   int offset = 0;
   for (int i = 0; i < num_dictionary_values; ++i) {
     memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len);
@@ -198,12 +201,14 @@ class DictEncoderBase {
   int dict_encoded_size() { return dict_encoded_size_; }
 
  protected:
-  explicit DictEncoderBase(MemPool* pool) :
+  explicit DictEncoderBase(MemPool* pool, MemoryAllocator* allocator) :
       hash_table_size_(INITIAL_HASH_TABLE_SIZE),
       mod_bitmask_(hash_table_size_ - 1),
-      hash_slots_(hash_table_size_, HASH_SLOT_EMPTY),
+      hash_slots_(0, allocator),
+      allocator_(allocator),
       pool_(pool),
       dict_encoded_size_(0) {
+    hash_slots_.Assign(hash_table_size_, HASH_SLOT_EMPTY);
     if (!CpuInfo::initialized()) {
       CpuInfo::Init();
     }
@@ -219,7 +224,8 @@ class DictEncoderBase {
   // We use a fixed-size hash table with linear probing
   //
   // These values correspond to the uniques_ array
-  std::vector<hash_slot_t> hash_slots_;
+  Vector<hash_slot_t> hash_slots_;
+  MemoryAllocator* allocator_;
 
   // For ByteArray / FixedLenByteArray data. Not owned
   MemPool* pool_;
@@ -234,9 +240,9 @@ class DictEncoderBase {
 template <typename T>
 class DictEncoder : public DictEncoderBase {
  public:
-  explicit DictEncoder(MemPool* pool = nullptr, int type_length = -1) :
-      DictEncoderBase(pool),
-      type_length_(type_length) { }
+  explicit DictEncoder(MemPool* pool = nullptr,
+      MemoryAllocator* allocator = default_allocator(), int type_length = -1) :
+      DictEncoderBase(pool, allocator), type_length_(type_length) {}
 
   // TODO(wesm): think about how to address the construction semantics in
   // encodings/dictionary-encoding.h
@@ -331,7 +337,8 @@ inline void DictEncoder<T>::Put(const T& v) {
 template <typename T>
 inline void DictEncoder<T>::DoubleTableSize() {
   int new_size = hash_table_size_ * 2;
-  std::vector<hash_slot_t> new_hash_slots(new_size, HASH_SLOT_EMPTY);
+  Vector<hash_slot_t> new_hash_slots(0, allocator_);
+  new_hash_slots.Assign(new_size, HASH_SLOT_EMPTY);
   hash_slot_t index, slot;
   int j;
   for (int i = 0; i < hash_table_size_; ++i) {
@@ -360,7 +367,8 @@ inline void DictEncoder<T>::DoubleTableSize() {
 
   hash_table_size_ = new_size;
   mod_bitmask_ = new_size - 1;
-  new_hash_slots.swap(hash_slots_);
+
+  hash_slots_.Swap(new_hash_slots);
 }
 
 template<typename T>
@@ -376,7 +384,6 @@ inline void DictEncoder<ByteArray>::AddDictKey(const ByteArray& v) {
     throw ParquetException("out of memory");
   }
   memcpy(heap, v.ptr, v.len);
-
   uniques_.push_back(ByteArray(v.len, heap));
   dict_encoded_size_ += v.len + sizeof(uint32_t);
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index ce91a29..b7201d5 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -43,12 +43,13 @@ class Encoder {
 
  protected:
   explicit Encoder(const ColumnDescriptor* descr,
-      const Encoding::type& encoding)
-      : descr_(descr), encoding_(encoding) {}
+      const Encoding::type& encoding, MemoryAllocator* allocator)
+      : descr_(descr), encoding_(encoding), allocator_(allocator) {}
 
   // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
   const ColumnDescriptor* descr_;
   const Encoding::type encoding_;
+  MemoryAllocator* allocator_;
 };
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index 490047c..96d1b29 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -151,6 +151,7 @@ class TestEncodingBase : public ::testing::Test {
     if (descr_) {
       type_length_ = descr_->type_length();
     }
+    allocator_ = default_allocator();
   }
 
   void TearDown() {
@@ -182,6 +183,7 @@ class TestEncodingBase : public ::testing::Test {
 
  protected:
   MemPool pool_;
+  MemoryAllocator* allocator_;
 
   int num_values_;
   int type_length_;
@@ -199,6 +201,7 @@ class TestEncodingBase : public ::testing::Test {
 // out an alternative to this class layering at some point
 #define USING_BASE_MEMBERS()                    \
   using TestEncodingBase<Type>::pool_;          \
+  using TestEncodingBase<Type>::allocator_;     \
   using TestEncodingBase<Type>::descr_;         \
   using TestEncodingBase<Type>::num_values_;    \
   using TestEncodingBase<Type>::draws_;         \
@@ -252,7 +255,7 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
   static constexpr int TYPE = Type::type_num;
 
   void CheckRoundtrip() {
-    DictEncoder<T> encoder(&pool_, type_length_);
+    DictEncoder<T> encoder(&pool_, allocator_, type_length_);
 
     dict_buffer_ = std::make_shared<OwnedMutableBuffer>();
     auto indices = std::make_shared<OwnedMutableBuffer>();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 95c353c..6d63023 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -25,6 +25,7 @@
 #include "parquet/encodings/encoder.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/util/bit-stream-utils.inline.h"
+#include "parquet/util/buffer.h"
 #include "parquet/util/output.h"
 
 namespace parquet_cpp {
@@ -172,8 +173,9 @@ class PlainEncoder : public Encoder<TYPE> {
  public:
   typedef typename type_traits<TYPE>::value_type T;
 
-  explicit PlainEncoder(const ColumnDescriptor* descr) :
-      Encoder<TYPE>(descr, Encoding::PLAIN) {}
+  explicit PlainEncoder(const ColumnDescriptor* descr,
+      MemoryAllocator* allocator = default_allocator()) :
+      Encoder<TYPE>(descr, Encoding::PLAIN, allocator) {}
 
   void Encode(const T* src, int num_values, OutputStream* dst);
 };
@@ -181,12 +183,13 @@ class PlainEncoder : public Encoder<TYPE> {
 template <>
 class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> {
  public:
-  explicit PlainEncoder(const ColumnDescriptor* descr) :
-      Encoder<Type::BOOLEAN>(descr, Encoding::PLAIN) {}
+  explicit PlainEncoder(const ColumnDescriptor* descr,
+      MemoryAllocator* allocator = default_allocator()) :
+      Encoder<Type::BOOLEAN>(descr, Encoding::PLAIN, allocator) {}
 
   virtual void Encode(const bool* src, int num_values, OutputStream* dst) {
     int bytes_required = BitUtil::Ceil(num_values, 8);
-    std::vector<uint8_t> tmp_buffer(bytes_required);
+    OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
 
     BitWriter bit_writer(&tmp_buffer[0], bytes_required);
     for (int i = 0; i < num_values; ++i) {
@@ -205,7 +208,7 @@ class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> {
     // Use a temporary buffer for now and copy, because the BitWriter is not
     // aware of OutputStream. Later we can add some kind of Request/Flush API
     // to OutputStream
-    std::vector<uint8_t> tmp_buffer(bytes_required);
+    OwnedMutableBuffer tmp_buffer(bytes_required, allocator_);
 
     BitWriter bit_writer(&tmp_buffer[0], bytes_required);
     for (int i = 0; i < num_values; ++i) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 2a7c347..6b7755c 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -87,7 +87,7 @@ class TestPageSerde : public ::testing::Test {
   }
 
   void ResetStream() {
-    out_stream_.reset(new InMemoryOutputStream());
+    out_stream_.reset(new InMemoryOutputStream);
   }
 
   void EndStream() {
@@ -244,7 +244,8 @@ class TestParquetFileReader : public ::testing::Test {
     std::unique_ptr<BufferReader> reader(new BufferReader(buffer));
     reader_.reset(new ParquetFileReader());
 
-    ASSERT_THROW(reader_->Open(SerializedFile::Open(std::move(reader))),
+    ASSERT_THROW(
+        reader_->Open(SerializedFile::Open(std::move(reader))),
         ParquetException);
   }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 89e8298..4c192f4 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -42,8 +42,9 @@ namespace parquet_cpp {
 // assembled in a serialized stream for storing in a Parquet files
 
 SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
-    Compression::type codec_type) :
-    stream_(std::move(stream)) {
+    Compression::type codec_type, MemoryAllocator* allocator) :
+    stream_(std::move(stream)),
+    decompression_buffer_(0, allocator) {
   max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
   decompressor_ = Codec::Create(codec_type);
 }
@@ -97,7 +98,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
     if (decompressor_ != NULL) {
       // Grow the uncompressed buffer if we need to.
       if (uncompressed_len > static_cast<int>(decompression_buffer_.size())) {
-        decompression_buffer_.resize(uncompressed_len);
+        decompression_buffer_.Resize(uncompressed_len);
       }
       decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
           &decompression_buffer_[0]);
@@ -181,7 +182,7 @@ std::unique_ptr<PageReader> SerializedRowGroup::GetColumnPageReader(int i) {
 
   std::unique_ptr<InputStream> stream(new InMemoryInputStream(buffer));
   return std::unique_ptr<PageReader>(new SerializedPageReader(std::move(stream),
-          FromThrift(col.meta_data.codec)));
+          FromThrift(col.meta_data.codec), allocator_));
 }
 
 RowGroupStatistics SerializedRowGroup::GetColumnStats(int i) {
@@ -204,9 +205,9 @@ static constexpr uint32_t FOOTER_SIZE = 8;
 static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
 
 std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
-    std::unique_ptr<RandomAccessSource> source) {
+    std::unique_ptr<RandomAccessSource> source, MemoryAllocator* allocator) {
   std::unique_ptr<ParquetFileReader::Contents> result(
-      new SerializedFile(std::move(source)));
+      new SerializedFile(std::move(source), allocator));
 
   // Access private methods here, but otherwise unavailable
   SerializedFile* file = static_cast<SerializedFile*>(result.get());
@@ -227,9 +228,9 @@ SerializedFile::~SerializedFile() {
 
 std::shared_ptr<RowGroupReader> SerializedFile::GetRowGroup(int i) {
   std::unique_ptr<SerializedRowGroup> contents(new SerializedRowGroup(source_.get(),
-          &metadata_.row_groups[i]));
+          &metadata_.row_groups[i], allocator_));
 
-  return std::make_shared<RowGroupReader>(&schema_, std::move(contents));
+  return std::make_shared<RowGroupReader>(&schema_, std::move(contents), allocator_);
 }
 
 int64_t SerializedFile::num_rows() const {
@@ -244,8 +245,10 @@ int SerializedFile::num_row_groups() const {
   return metadata_.row_groups.size();
 }
 
-SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source) :
-    source_(std::move(source)) {}
+SerializedFile::SerializedFile(
+    std::unique_ptr<RandomAccessSource> source,
+    MemoryAllocator* allocator = default_allocator()) :
+        source_(std::move(source)), allocator_(allocator) {}
 
 
 void SerializedFile::ParseMetaData() {
@@ -271,7 +274,7 @@ void SerializedFile::ParseMetaData() {
   }
   source_->Seek(metadata_start);
 
-  std::vector<uint8_t> metadata_buffer(metadata_len);
+  OwnedMutableBuffer metadata_buffer(metadata_len, allocator_);
   bytes_read = source_->Read(metadata_len, &metadata_buffer[0]);
   if (bytes_read != metadata_len) {
     throw ParquetException("Invalid parquet file. Could not read metadata bytes.");

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index b62f249..847c8a9 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -43,7 +43,7 @@ static constexpr uint32_t DEFAULT_PAGE_HEADER_SIZE = 16 * 1024;
 class SerializedPageReader : public PageReader {
  public:
   SerializedPageReader(std::unique_ptr<InputStream> stream,
-      Compression::type codec);
+      Compression::type codec, MemoryAllocator* allocator = default_allocator());
 
   virtual ~SerializedPageReader() {}
 
@@ -62,7 +62,7 @@ class SerializedPageReader : public PageReader {
 
   // Compression codec to use.
   std::unique_ptr<Codec> decompressor_;
-  std::vector<uint8_t> decompression_buffer_;
+  OwnedMutableBuffer decompression_buffer_;
   // Maximum allowed page size
   uint32_t max_page_header_size_;
 };
@@ -71,9 +71,10 @@ class SerializedPageReader : public PageReader {
 class SerializedRowGroup : public RowGroupReader::Contents {
  public:
   SerializedRowGroup(RandomAccessSource* source,
-      const parquet::RowGroup* metadata) :
+      const parquet::RowGroup* metadata, MemoryAllocator* allocator) :
       source_(source),
-      metadata_(metadata) {}
+      metadata_(metadata),
+      allocator_(allocator) {}
 
   virtual int num_columns() const;
   virtual int64_t num_rows() const;
@@ -83,6 +84,7 @@ class SerializedRowGroup : public RowGroupReader::Contents {
  private:
   RandomAccessSource* source_;
   const parquet::RowGroup* metadata_;
+  MemoryAllocator* allocator_;
 };
 
 // An implementation of ParquetFileReader::Contents that deals with the Parquet
@@ -95,7 +97,8 @@ class SerializedFile : public ParquetFileReader::Contents {
   // This class does _not_ take ownership of the data source. You must manage its
   // lifetime separately
   static std::unique_ptr<ParquetFileReader::Contents> Open(
-      std::unique_ptr<RandomAccessSource> source);
+      std::unique_ptr<RandomAccessSource> source,
+      MemoryAllocator* allocator = default_allocator());
   virtual void Close();
   virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i);
   virtual int64_t num_rows() const;
@@ -105,10 +108,12 @@ class SerializedFile : public ParquetFileReader::Contents {
 
  private:
   // This class takes ownership of the provided data source
-  explicit SerializedFile(std::unique_ptr<RandomAccessSource> source);
+  explicit SerializedFile(std::unique_ptr<RandomAccessSource> source,
+      MemoryAllocator* allocator);
 
   std::unique_ptr<RandomAccessSource> source_;
   parquet::FileMetaData metadata_;
+  MemoryAllocator* allocator_;
 
   void ParseMetaData();
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 2937f9e..9020008 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -41,9 +41,10 @@ namespace parquet_cpp {
 // RowGroupReader public API
 
 RowGroupReader::RowGroupReader(const SchemaDescriptor* schema,
-    std::unique_ptr<Contents> contents) :
+    std::unique_ptr<Contents> contents, MemoryAllocator* allocator) :
     schema_(schema),
-    contents_(std::move(contents)) {}
+    contents_(std::move(contents)),
+    allocator_(allocator) {}
 
 int RowGroupReader::num_columns() const {
   return contents_->num_columns();
@@ -58,7 +59,7 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(int i) {
   const ColumnDescriptor* descr = schema_->Column(i);
 
   std::unique_ptr<PageReader> page_reader = contents_->GetColumnPageReader(i);
-  return ColumnReader::Make(descr, std::move(page_reader));
+  return ColumnReader::Make(descr, std::move(page_reader), allocator_);
 }
 
 RowGroupStatistics RowGroupReader::GetColumnStats(int i) const {
@@ -74,16 +75,16 @@ ParquetFileReader::~ParquetFileReader() {
 }
 
 std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(const std::string& path,
-    bool memory_map) {
+    bool memory_map, MemoryAllocator* allocator) {
   std::unique_ptr<LocalFileSource> file;
   if (memory_map) {
-    file.reset(new MemoryMapSource());
+    file.reset(new MemoryMapSource(allocator));
   } else {
-    file.reset(new LocalFileSource());
+    file.reset(new LocalFileSource(allocator));
   }
   file->Open(path);
 
-  auto contents = SerializedFile::Open(std::move(file));
+  auto contents = SerializedFile::Open(std::move(file), allocator);
 
   std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
   result->Open(std::move(contents));

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index 436d1e8..f4455ac 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -48,7 +48,8 @@ class RowGroupReader {
     virtual std::unique_ptr<PageReader> GetColumnPageReader(int i) = 0;
   };
 
-  RowGroupReader(const SchemaDescriptor* schema, std::unique_ptr<Contents> contents);
+  RowGroupReader(const SchemaDescriptor* schema,
+      std::unique_ptr<Contents> contents, MemoryAllocator* allocator);
 
   // Construct a ColumnReader for the indicated row group-relative
   // column. Ownership is shared with the RowGroupReader.
@@ -66,6 +67,8 @@ class RowGroupReader {
   // This is declared in the .cc file so that we can hide compiled Thrift
   // headers from the public API and also more easily create test fixtures.
   std::unique_ptr<Contents> contents_;
+
+  MemoryAllocator* allocator_;
 };
 
 
@@ -95,7 +98,7 @@ class ParquetFileReader {
 
   // API Convenience to open a serialized Parquet file on disk
   static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path,
-      bool memory_map = true);
+      bool memory_map = true, MemoryAllocator* allocator = default_allocator());
 
   void Open(std::unique_ptr<Contents> contents);
   void Close();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index c273487..10bcff7 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -28,6 +28,7 @@
 #include "parquet/column/reader.h"
 #include "parquet/column/scanner.h"
 #include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
 
 using std::string;
 
@@ -93,7 +94,8 @@ TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
   std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
 
   // column 0, id
-  std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
+  std::shared_ptr<Int32Scanner> scanner(
+      new Int32Scanner(group->Column(0)));
   int32_t val;
   bool is_null;
   for (int i = 0; i < 8; ++i) {
@@ -110,7 +112,8 @@ TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
   std::shared_ptr<RowGroupReader> group = reader_->RowGroup(0);
 
   // column 0, id
-  std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
+  std::shared_ptr<Int32Scanner> scanner(
+      new Int32Scanner(group->Column(0)));
 
   ASSERT_EQ(128, scanner->batch_size());
   scanner->SetBatchSize(1024);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 5cb7b2f..b4faaa1 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -28,6 +28,7 @@ install(FILES
   input.h
   logging.h
   macros.h
+  mem-allocator.h
   mem-pool.h
   output.h
   rle-encoding.h
@@ -39,6 +40,7 @@ add_library(parquet_util STATIC
   buffer.cc
   cpu-info.cc
   input.cc
+  mem-allocator.cc
   mem-pool.cc
   output.cc
 )
@@ -64,5 +66,6 @@ endif()
 ADD_PARQUET_TEST(bit-util-test)
 ADD_PARQUET_TEST(buffer-test)
 ADD_PARQUET_TEST(input-output-test)
+ADD_PARQUET_TEST(mem-allocator-test)
 ADD_PARQUET_TEST(mem-pool-test)
 ADD_PARQUET_TEST(rle-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/buffer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer-test.cc b/src/parquet/util/buffer-test.cc
index f494326..af6d54c 100644
--- a/src/parquet/util/buffer-test.cc
+++ b/src/parquet/util/buffer-test.cc
@@ -48,8 +48,6 @@ TEST_F(TestBuffer, Resize) {
 }
 
 TEST_F(TestBuffer, ResizeOOM) {
-  // realloc fails, even though there may be no explicit limit
-
   // Tests that deliberately throw Exceptions foul up valgrind and report
   // red herring memory leaks
 #ifndef PARQUET_VALGRIND

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/buffer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.cc b/src/parquet/util/buffer.cc
index 6647c15..34db8df 100644
--- a/src/parquet/util/buffer.cc
+++ b/src/parquet/util/buffer.cc
@@ -17,9 +17,11 @@
 
 #include "parquet/util/buffer.h"
 
+#include <algorithm>
 #include <cstdint>
 
 #include "parquet/exception.h"
+#include "parquet/types.h"
 
 namespace parquet_cpp {
 
@@ -34,18 +36,90 @@ std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() {
   return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size());
 }
 
-OwnedMutableBuffer::OwnedMutableBuffer() :
-    ResizableBuffer(nullptr, 0) {}
+OwnedMutableBuffer::OwnedMutableBuffer(int64_t size, MemoryAllocator* allocator) :
+    ResizableBuffer(nullptr, 0), allocator_(allocator) {
+  Resize(size);
+}
+
+OwnedMutableBuffer::~OwnedMutableBuffer() {
+  if (mutable_data_) {
+    allocator_->Free(mutable_data_, capacity_);
+  }
+}
+
+void OwnedMutableBuffer::Reserve(int64_t new_capacity) {
+  if (!mutable_data_ || new_capacity > capacity_) {
+    if (mutable_data_) {
+      uint8_t* new_data = allocator_->Malloc(new_capacity);
+      memcpy(new_data, mutable_data_, size_);
+      allocator_->Free(mutable_data_, capacity_);
+      mutable_data_ = new_data;
+    } else {
+      mutable_data_ = allocator_->Malloc(new_capacity);
+    }
+    data_ = mutable_data_;
+    capacity_ = new_capacity;
+  }
+}
 
 void OwnedMutableBuffer::Resize(int64_t new_size) {
-  try {
-    buffer_owner_.resize(new_size);
-  } catch (const std::bad_alloc& e) {
-    throw ParquetException("OOM: resize failed");
+  Reserve(new_size);
+  size_ = new_size;
+}
+
+uint8_t& OwnedMutableBuffer::operator[](int64_t i) {
+  return mutable_data_[i];
+}
+
+template <class T>
+Vector<T>::Vector(int64_t size, MemoryAllocator* allocator) :
+    buffer_(new OwnedMutableBuffer(size * sizeof(T), allocator)),
+    size_(size), capacity_(size) {
+  if (size > 0) {
+    data_ = reinterpret_cast<T*>(buffer_->mutable_data());
+  } else {
+    data_ = nullptr;
   }
+}
+
+template <class T>
+void Vector<T>::Reserve(int64_t new_capacity) {
+  if (new_capacity > capacity_) {
+    buffer_->Resize(new_capacity * sizeof(T));
+    data_ = reinterpret_cast<T*>(buffer_->mutable_data());
+    capacity_ = new_capacity;
+  }
+}
+
+template <class T>
+void Vector<T>::Resize(int64_t new_size) {
+  Reserve(new_size);
   size_ = new_size;
-  data_ = buffer_owner_.data();
-  mutable_data_ = buffer_owner_.data();
 }
 
+template <class T>
+void Vector<T>::Assign(int64_t size, const T val) {
+  Resize(size);
+  for (int64_t i = 0; i < size_; i++) {
+    data_[i] = val;
+  }
+}
+
+template <class T>
+void Vector<T>::Swap(Vector<T>& v) {
+  buffer_.swap(v.buffer_);
+  std::swap(size_, v.size_);
+  std::swap(capacity_, v.capacity_);
+  std::swap(data_, v.data_);
+}
+
+template class Vector<int32_t>;
+template class Vector<int64_t>;
+template class Vector<bool>;
+template class Vector<float>;
+template class Vector<double>;
+template class Vector<Int96>;
+template class Vector<ByteArray>;
+template class Vector<FixedLenByteArray>;
+
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/buffer.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.h b/src/parquet/util/buffer.h
index 8be2b17..7ea2122 100644
--- a/src/parquet/util/buffer.h
+++ b/src/parquet/util/buffer.h
@@ -25,6 +25,7 @@
 #include <vector>
 
 #include "parquet/util/macros.h"
+#include "parquet/util/mem-allocator.h"
 
 namespace parquet_cpp {
 
@@ -119,7 +120,8 @@ class ResizableBuffer : public MutableBuffer {
 
  protected:
   ResizableBuffer(uint8_t* data, int64_t size) :
-      MutableBuffer(data, size) {}
+      MutableBuffer(data, size), capacity_(size) {}
+  int64_t capacity_;
 };
 
 // A ResizableBuffer whose memory is owned by the class instance. For example,
@@ -127,12 +129,39 @@ class ResizableBuffer : public MutableBuffer {
 // garbage-collected
 class OwnedMutableBuffer : public ResizableBuffer {
  public:
-  OwnedMutableBuffer();
-  virtual void Resize(int64_t new_size);
+  explicit OwnedMutableBuffer(int64_t size = 0,
+      MemoryAllocator* allocator = default_allocator());
+  virtual ~OwnedMutableBuffer();
+  void Resize(int64_t new_size) override;
+  void Reserve(int64_t new_capacity);
+  uint8_t& operator[](int64_t i);
 
  private:
   // TODO: aligned allocations
-  std::vector<uint8_t> buffer_owner_;
+  MemoryAllocator* allocator_;
+
+  DISALLOW_COPY_AND_ASSIGN(OwnedMutableBuffer);
+};
+
+template <class T>
+class Vector {
+ public:
+  explicit Vector(int64_t size, MemoryAllocator* allocator);
+  void Resize(int64_t new_size);
+  void Reserve(int64_t new_capacity);
+  void Assign(int64_t size, const T val);
+  void Swap(Vector<T>& v);
+  inline T& operator[](int64_t i) {
+    return data_[i];
+  }
+
+ private:
+  std::unique_ptr<OwnedMutableBuffer> buffer_;
+  int64_t size_;
+  int64_t capacity_;
+  T* data_;
+
+  DISALLOW_COPY_AND_ASSIGN(Vector);
 };
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
index 424be3a..b5c412c 100644
--- a/src/parquet/util/input-output-test.cc
+++ b/src/parquet/util/input-output-test.cc
@@ -28,6 +28,7 @@
 #include "parquet/exception.h"
 #include "parquet/util/buffer.h"
 #include "parquet/util/input.h"
+#include "parquet/util/mem-allocator.h"
 #include "parquet/util/output.h"
 #include "parquet/util/test-common.h"
 
@@ -118,7 +119,6 @@ TYPED_TEST(TestFileReaders, FileDisappeared) {
 
 TYPED_TEST(TestFileReaders, BadSeek) {
   this->source.Open(this->test_path_);
-
   ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
index 897d81c..0c52c16 100644
--- a/src/parquet/util/input.cc
+++ b/src/parquet/util/input.cc
@@ -35,6 +35,10 @@ std::shared_ptr<Buffer> RandomAccessSource::ReadAt(int64_t pos, int64_t nbytes)
   return Read(nbytes);
 }
 
+int64_t RandomAccessSource::Size() const {
+  return size_;
+}
+
 // ----------------------------------------------------------------------
 // LocalFileSource
 
@@ -86,10 +90,6 @@ void LocalFileSource::Seek(int64_t pos) {
   SeekFile(pos);
 }
 
-int64_t LocalFileSource::Size() const {
-  return size_;
-}
-
 int64_t LocalFileSource::Tell() const {
   int64_t position = ftell(file_);
   if (position < 0) {
@@ -107,7 +107,7 @@ int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) {
 }
 
 std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
-  auto result = std::make_shared<OwnedMutableBuffer>();
+  auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_);
   result->Resize(nbytes);
 
   int64_t bytes_read = Read(nbytes, result->mutable_data());
@@ -198,10 +198,6 @@ void BufferReader::Seek(int64_t pos) {
   pos_ = pos;
 }
 
-int64_t BufferReader::Size() const {
-  return size_;
-}
-
 int64_t BufferReader::Read(int64_t nbytes, uint8_t* out) {
   ParquetException::NYI("not implemented");
   return 0;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
index 80fb730..cf015ee 100644
--- a/src/parquet/util/input.h
+++ b/src/parquet/util/input.h
@@ -18,6 +18,8 @@
 #ifndef PARQUET_UTIL_INPUT_H
 #define PARQUET_UTIL_INPUT_H
 
+#include <parquet/util/mem-allocator.h>
+
 #include <cstdint>
 #include <cstdio>
 #include <memory>
@@ -36,11 +38,10 @@ class RandomAccessSource {
  public:
   virtual ~RandomAccessSource() {}
 
-  virtual int64_t Size() const = 0;
-
   virtual void Close() = 0;
   virtual int64_t Tell() const = 0;
   virtual void Seek(int64_t pos) = 0;
+  int64_t Size() const;
 
   // Returns actual number of bytes read
   virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
@@ -55,13 +56,14 @@ class RandomAccessSource {
 
 class LocalFileSource : public RandomAccessSource {
  public:
-  LocalFileSource() : file_(nullptr), is_open_(false) {}
+  explicit LocalFileSource(MemoryAllocator* allocator = default_allocator()) :
+      file_(nullptr), is_open_(false), allocator_(allocator) {}
+
   virtual ~LocalFileSource();
 
   virtual void Open(const std::string& path);
 
   virtual void Close();
-  virtual int64_t Size() const;
   virtual int64_t Tell() const;
   virtual void Seek(int64_t pos);
 
@@ -83,14 +85,13 @@ class LocalFileSource : public RandomAccessSource {
   std::string path_;
   FILE* file_;
   bool is_open_;
+  MemoryAllocator* allocator_;
 };
 
 class MemoryMapSource : public LocalFileSource {
  public:
-  MemoryMapSource() :
-      LocalFileSource(),
-      data_(nullptr),
-      pos_(0) {}
+  explicit MemoryMapSource(MemoryAllocator* allocator = default_allocator()) :
+      LocalFileSource(allocator), data_(nullptr), pos_(0) {}
 
   virtual ~MemoryMapSource();
 
@@ -123,7 +124,6 @@ class BufferReader : public RandomAccessSource {
   virtual void Close() {}
   virtual int64_t Tell() const;
   virtual void Seek(int64_t pos);
-  virtual int64_t Size() const;
 
   virtual int64_t Read(int64_t nbytes, uint8_t* out);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-allocator-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator-test.cc b/src/parquet/util/mem-allocator-test.cc
new file mode 100644
index 0000000..2101176
--- /dev/null
+++ b/src/parquet/util/mem-allocator-test.cc
@@ -0,0 +1,67 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include "parquet/exception.h"
+#include "parquet/util/mem-allocator.h"
+
+namespace parquet_cpp {
+
+TEST(TestAllocator, AllocateFree) {
+  TrackingAllocator allocator;
+
+  uint8_t* data = allocator.Malloc(100);
+  ASSERT_TRUE(nullptr != data);
+  data[99] = 55;
+  allocator.Free(data, 100);
+
+  data = allocator.Malloc(0);
+  ASSERT_EQ(nullptr, data);
+  allocator.Free(data, 0);
+
+  data = allocator.Malloc(1);
+  ASSERT_THROW(allocator.Free(data, 2), ParquetException);
+  ASSERT_NO_THROW(allocator.Free(data, 1));
+
+  int64_t to_alloc = std::numeric_limits<int64_t>::max();
+  ASSERT_THROW(allocator.Malloc(to_alloc), ParquetException);
+}
+
+TEST(TestAllocator, TotalMax) {
+  TrackingAllocator allocator;
+  ASSERT_EQ(0, allocator.TotalMemory());
+  ASSERT_EQ(0, allocator.MaxMemory());
+
+  uint8_t* data = allocator.Malloc(100);
+  ASSERT_EQ(100, allocator.TotalMemory());
+  ASSERT_EQ(100, allocator.MaxMemory());
+
+  uint8_t* data2 = allocator.Malloc(10);
+  ASSERT_EQ(110, allocator.TotalMemory());
+  ASSERT_EQ(110, allocator.MaxMemory());
+
+  allocator.Free(data, 100);
+  ASSERT_EQ(10, allocator.TotalMemory());
+  ASSERT_EQ(110, allocator.MaxMemory());
+
+  allocator.Free(data2, 10);
+  ASSERT_EQ(0, allocator.TotalMemory());
+  ASSERT_EQ(110, allocator.MaxMemory());
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-allocator.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator.cc b/src/parquet/util/mem-allocator.cc
new file mode 100644
index 0000000..2bffff9
--- /dev/null
+++ b/src/parquet/util/mem-allocator.cc
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/mem-allocator.h"
+
+#include <cstdlib>
+
+#include "parquet/exception.h"
+
+namespace parquet_cpp {
+
+MemoryAllocator::~MemoryAllocator() {}
+
+uint8_t* TrackingAllocator::Malloc(int64_t size) {
+  if (0 == size) {
+    return nullptr;
+  }
+
+  uint8_t* p = static_cast<uint8_t*>(std::malloc(size));
+  if (!p) {
+    throw ParquetException("OOM: memory allocation failed");
+  }
+  total_memory_ += size;
+  if (total_memory_ > max_memory_) {
+    max_memory_ = total_memory_;
+  }
+  return p;
+}
+
+void TrackingAllocator::Free(uint8_t* p, int64_t size) {
+  if (nullptr != p && size > 0) {
+    if (total_memory_ < size) {
+      throw ParquetException("Attempting to free too much memory");
+    }
+    total_memory_ -= size;
+    std::free(p);
+  }
+}
+
+TrackingAllocator::~TrackingAllocator() {}
+
+MemoryAllocator* default_allocator() {
+  static TrackingAllocator default_allocator;
+  return &default_allocator;
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-allocator.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator.h b/src/parquet/util/mem-allocator.h
new file mode 100644
index 0000000..076a8e0
--- /dev/null
+++ b/src/parquet/util/mem-allocator.h
@@ -0,0 +1,60 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_MEMORY_POOL_H
+#define PARQUET_UTIL_MEMORY_POOL_H
+
+#include "parquet/util/logging.h"
+#include "parquet/util/bit-util.h"
+
+namespace parquet_cpp {
+
+class MemoryAllocator {
+ public:
+  virtual ~MemoryAllocator();
+
+  // Returns nullptr if size is 0
+  virtual uint8_t* Malloc(int64_t size) = 0;
+  virtual void Free(uint8_t* p, int64_t size) = 0;
+};
+
+MemoryAllocator* default_allocator();
+
+class TrackingAllocator: public MemoryAllocator {
+ public:
+  TrackingAllocator() : total_memory_(0), max_memory_(0) {}
+  virtual ~TrackingAllocator();
+
+  uint8_t* Malloc(int64_t size) override;
+  void Free(uint8_t* p, int64_t size) override;
+
+  int64_t TotalMemory() {
+    return total_memory_;
+  }
+
+  int64_t MaxMemory() {
+    return max_memory_;
+  }
+
+ private:
+  int64_t total_memory_;
+  int64_t max_memory_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_MEMORY_POOL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-pool.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.cc b/src/parquet/util/mem-pool.cc
index f8626bc..2f78a18 100644
--- a/src/parquet/util/mem-pool.cc
+++ b/src/parquet/util/mem-pool.cc
@@ -34,12 +34,13 @@ namespace parquet_cpp {
 const int MemPool::INITIAL_CHUNK_SIZE;
 const int MemPool::MAX_CHUNK_SIZE;
 
-MemPool::MemPool()
+MemPool::MemPool(MemoryAllocator* allocator)
   : current_chunk_idx_(-1),
     next_chunk_size_(INITIAL_CHUNK_SIZE),
     total_allocated_bytes_(0),
     peak_allocated_bytes_(0),
-    total_reserved_bytes_(0) {}
+    total_reserved_bytes_(0),
+    allocator_(allocator) {}
 
 MemPool::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
   : data(buf),
@@ -51,7 +52,7 @@ MemPool::~MemPool() {
   int64_t total_bytes_released = 0;
   for (size_t i = 0; i < chunks_.size(); ++i) {
     total_bytes_released += chunks_[i].size;
-    free(chunks_[i].data);
+    allocator_->Free(chunks_[i].data, chunks_[i].size);
   }
 
   DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
@@ -70,7 +71,7 @@ void MemPool::FreeAll() {
   int64_t total_bytes_released = 0;
   for (size_t i = 0; i < chunks_.size(); ++i) {
     total_bytes_released += chunks_[i].size;
-    free(chunks_[i].data);
+    allocator_->Free(chunks_[i].data, chunks_[i].size);
   }
   chunks_.clear();
   next_chunk_size_ = INITIAL_CHUNK_SIZE;
@@ -108,7 +109,7 @@ bool MemPool::FindChunk(int64_t min_size) {
     chunk_size = std::max<int64_t>(min_size, next_chunk_size_);
 
     // Allocate a new chunk. Return early if malloc fails.
-    uint8_t* buf = reinterpret_cast<uint8_t*>(malloc(chunk_size));
+    uint8_t* buf = allocator_->Malloc(chunk_size);
     if (UNLIKELY(buf == NULL)) {
       DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
       current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/mem-pool.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.h b/src/parquet/util/mem-pool.h
index 3f21aa7..c81712b 100644
--- a/src/parquet/util/mem-pool.h
+++ b/src/parquet/util/mem-pool.h
@@ -29,11 +29,12 @@
 
 #include "parquet/util/logging.h"
 #include "parquet/util/bit-util.h"
+#include "parquet/util/mem-allocator.h"
 
 namespace parquet_cpp {
 
-/// A MemPool maintains a list of memory chunks from which it allocates memory in
-/// response to Allocate() calls;
+/// A MemPool maintains a list of memory chunks from which it allocates memory
+/// in response to Allocate() calls;
 /// Chunks stay around for the lifetime of the mempool or until they are passed on to
 /// another mempool.
 //
@@ -75,7 +76,7 @@ namespace parquet_cpp {
 
 class MemPool {
  public:
-  MemPool();
+  explicit MemPool(MemoryAllocator* allocator = default_allocator());
 
   /// Frees all chunks of memory and subtracts the total allocated bytes
   /// from the registered limits.
@@ -165,6 +166,8 @@ class MemPool {
 
   std::vector<ChunkInfo> chunks_;
 
+  MemoryAllocator* allocator_;
+
   /// Find or allocated a chunk with at least min_size spare capacity and update
   /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
   /// if a new chunk needs to be created.

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/output.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc
index b28421e..fa641fe 100644
--- a/src/parquet/util/output.cc
+++ b/src/parquet/util/output.cc
@@ -28,21 +28,14 @@ namespace parquet_cpp {
 // ----------------------------------------------------------------------
 // In-memory output stream
 
-static constexpr int64_t IN_MEMORY_DEFAULT_CAPACITY = 1024;
-
-InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity) :
-    size_(0),
-    capacity_(initial_capacity) {
+InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity,
+    MemoryAllocator* allocator) : size_(0), capacity_(initial_capacity) {
   if (initial_capacity == 0) {
     initial_capacity = IN_MEMORY_DEFAULT_CAPACITY;
   }
-  buffer_.reset(new OwnedMutableBuffer());
-  buffer_->Resize(initial_capacity);
+  buffer_.reset(new OwnedMutableBuffer(initial_capacity, allocator));
 }
 
-InMemoryOutputStream::InMemoryOutputStream() :
-    InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY) {}
-
 uint8_t* InMemoryOutputStream::Head() {
   return buffer_->mutable_data() + size_;
 }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/225fba78/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
index b466e0e..bc320a4 100644
--- a/src/parquet/util/output.h
+++ b/src/parquet/util/output.h
@@ -22,6 +22,7 @@
 #include <memory>
 
 #include "parquet/util/macros.h"
+#include "parquet/util/mem-allocator.h"
 
 namespace parquet_cpp {
 
@@ -44,12 +45,13 @@ class OutputStream {
   virtual void Write(const uint8_t* data, int64_t length) = 0;
 };
 
+static constexpr int64_t IN_MEMORY_DEFAULT_CAPACITY = 1024;
 
 // An output stream that is an in-memory
 class InMemoryOutputStream : public OutputStream {
  public:
-  InMemoryOutputStream();
-  explicit InMemoryOutputStream(int64_t initial_capacity);
+  explicit InMemoryOutputStream(int64_t initial_capacity = IN_MEMORY_DEFAULT_CAPACITY,
+      MemoryAllocator* allocator = default_allocator());
 
   // Close is currently a no-op with the in-memory stream
   virtual void Close() {}