You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/12/30 16:36:37 UTC

[3/4] parquet-cpp git commit: PARQUET-818: Refactoring to utilize common IO, buffer, memory management abstractions and implementations

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
index 184c74d..13fb01b 100644
--- a/src/parquet/column/scanner.h
+++ b/src/parquet/column/scanner.h
@@ -29,7 +29,7 @@
 #include "parquet/exception.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
 namespace parquet {
@@ -44,7 +44,7 @@ class PARQUET_EXPORT Scanner {
       : batch_size_(batch_size),
         level_offset_(0),
         levels_buffered_(0),
-        value_buffer_(0, allocator),
+        value_buffer_(std::make_shared<PoolBuffer>(allocator)),
         value_offset_(0),
         values_buffered_(0),
         reader_(reader) {
@@ -76,7 +76,7 @@ class PARQUET_EXPORT Scanner {
   int level_offset_;
   int levels_buffered_;
 
-  OwnedMutableBuffer value_buffer_;
+  std::shared_ptr<PoolBuffer> value_buffer_;
   int value_offset_;
   int64_t values_buffered_;
 
@@ -95,8 +95,8 @@ class PARQUET_EXPORT TypedScanner : public Scanner {
       : Scanner(reader, batch_size, allocator) {
     typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
     int value_byte_size = type_traits<DType::type_num>::value_byte_size;
-    value_buffer_.Resize(batch_size_ * value_byte_size);
-    values_ = reinterpret_cast<T*>(&value_buffer_[0]);
+    PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
+    values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
   }
 
   virtual ~TypedScanner() {}

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics-test.cc b/src/parquet/column/statistics-test.cc
index 364d9d4..c8641a1 100644
--- a/src/parquet/column/statistics-test.cc
+++ b/src/parquet/column/statistics-test.cc
@@ -33,9 +33,7 @@
 #include "parquet/file/writer.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
-#include "parquet/util/input.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -150,8 +148,8 @@ class TestRowGroupStatistics : public PrimitiveTypedTest<TestType> {
     file_writer->Close();
 
     auto buffer = sink->GetBuffer();
-    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
-    auto file_reader = ParquetFileReader::Open(std::move(source));
+    auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+    auto file_reader = ParquetFileReader::Open(source);
     auto rg_reader = file_reader->RowGroup(0);
     auto column_chunk = rg_reader->metadata()->ColumnChunk(0);
     std::shared_ptr<RowGroupStatistics> stats = column_chunk->statistics();
@@ -191,7 +189,8 @@ std::vector<FLBA> TestRowGroupStatistics<FLBAType>::GetDeepCopy(
   std::vector<FLBA> copy;
   MemoryAllocator* allocator = default_allocator();
   for (const FLBA& flba : values) {
-    uint8_t* ptr = allocator->Malloc(FLBA_LENGTH);
+    uint8_t* ptr;
+    PARQUET_THROW_NOT_OK(allocator->Allocate(FLBA_LENGTH, &ptr));
     memcpy(ptr, flba.ptr, FLBA_LENGTH);
     copy.emplace_back(ptr);
   }
@@ -204,7 +203,8 @@ std::vector<ByteArray> TestRowGroupStatistics<ByteArrayType>::GetDeepCopy(
   std::vector<ByteArray> copy;
   MemoryAllocator* allocator = default_allocator();
   for (const ByteArray& ba : values) {
-    uint8_t* ptr = allocator->Malloc(ba.len);
+    uint8_t* ptr;
+    PARQUET_THROW_NOT_OK(allocator->Allocate(ba.len, &ptr));
     memcpy(ptr, ba.ptr, ba.len);
     copy.emplace_back(ba.len, ptr);
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.cc b/src/parquet/column/statistics.cc
index 0330ac1..9b76fab 100644
--- a/src/parquet/column/statistics.cc
+++ b/src/parquet/column/statistics.cc
@@ -21,16 +21,17 @@
 #include "parquet/column/statistics.h"
 #include "parquet/encodings/plain-encoding.h"
 #include "parquet/exception.h"
-#include "parquet/util/buffer.h"
 #include "parquet/util/comparison.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
 template <typename DType>
 TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(
     const ColumnDescriptor* schema, MemoryAllocator* allocator)
-    : allocator_(allocator), min_buffer_(0, allocator_), max_buffer_(0, allocator_) {
+    : allocator_(allocator),
+      min_buffer_(AllocateBuffer(allocator_, 0)),
+      max_buffer_(AllocateBuffer(allocator_, 0)) {
   SetDescr(schema);
   Reset();
 }
@@ -40,14 +41,14 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const typename DType::c_
     const typename DType::c_type& max, int64_t num_values, int64_t null_count,
     int64_t distinct_count)
     : allocator_(default_allocator()),
-      min_buffer_(0, allocator_),
-      max_buffer_(0, allocator_) {
+      min_buffer_(AllocateBuffer(allocator_, 0)),
+      max_buffer_(AllocateBuffer(allocator_, 0)) {
   IncrementNumValues(num_values);
   IncrementNullCount(null_count);
   IncrementDistinctCount(distinct_count);
 
-  Copy(min, &min_, min_buffer_);
-  Copy(max, &max_, max_buffer_);
+  Copy(min, &min_, min_buffer_.get());
+  Copy(max, &max_, max_buffer_.get());
   has_min_max_ = true;
 }
 
@@ -56,7 +57,9 @@ TypedRowGroupStatistics<DType>::TypedRowGroupStatistics(const ColumnDescriptor*
     const std::string& encoded_min, const std::string& encoded_max, int64_t num_values,
     int64_t null_count, int64_t distinct_count, bool has_min_max,
     MemoryAllocator* allocator)
-    : allocator_(allocator), min_buffer_(0, allocator_), max_buffer_(0, allocator_) {
+    : allocator_(allocator),
+      min_buffer_(AllocateBuffer(allocator_, 0)),
+      max_buffer_(AllocateBuffer(allocator_, 0)) {
   IncrementNumValues(num_values);
   IncrementNullCount(null_count);
   IncrementDistinctCount(distinct_count);
@@ -94,11 +97,11 @@ void TypedRowGroupStatistics<DType>::Update(
   auto batch_minmax = std::minmax_element(values, values + num_not_null, compare);
   if (!has_min_max_) {
     has_min_max_ = true;
-    Copy(*batch_minmax.first, &min_, min_buffer_);
-    Copy(*batch_minmax.second, &max_, max_buffer_);
+    Copy(*batch_minmax.first, &min_, min_buffer_.get());
+    Copy(*batch_minmax.second, &max_, max_buffer_.get());
   } else {
-    Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_);
-    Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_);
+    Copy(std::min(min_, *batch_minmax.first, compare), &min_, min_buffer_.get());
+    Copy(std::max(max_, *batch_minmax.second, compare), &max_, max_buffer_.get());
   }
 }
 
@@ -119,15 +122,15 @@ void TypedRowGroupStatistics<DType>::Merge(const TypedRowGroupStatistics<DType>&
   if (!other.HasMinMax()) return;
 
   if (!has_min_max_) {
-    Copy(other.min_, &this->min_, min_buffer_);
-    Copy(other.max_, &this->max_, max_buffer_);
+    Copy(other.min_, &this->min_, min_buffer_.get());
+    Copy(other.max_, &this->max_, max_buffer_.get());
     has_min_max_ = true;
     return;
   }
 
   Compare<T> compare(descr_);
-  Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_);
-  Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_);
+  Copy(std::min(this->min_, other.min_, compare), &this->min_, min_buffer_.get());
+  Copy(std::max(this->max_, other.max_, compare), &this->max_, max_buffer_.get());
 }
 
 template <typename DType>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/statistics.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/statistics.h b/src/parquet/column/statistics.h
index a21a0fa..cf41dc0 100644
--- a/src/parquet/column/statistics.h
+++ b/src/parquet/column/statistics.h
@@ -24,8 +24,7 @@
 
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
 namespace parquet {
@@ -166,34 +165,33 @@ class TypedRowGroupStatistics : public RowGroupStatistics {
 
   void PlainEncode(const T& src, std::string* dst);
   void PlainDecode(const std::string& src, T* dst);
-  void Copy(const T& src, T* dst, OwnedMutableBuffer& buffer);
+  void Copy(const T& src, T* dst, PoolBuffer* buffer);
 
-  OwnedMutableBuffer min_buffer_, max_buffer_;
+  std::shared_ptr<PoolBuffer> min_buffer_, max_buffer_;
 };
 
 template <typename DType>
-inline void TypedRowGroupStatistics<DType>::Copy(
-    const T& src, T* dst, OwnedMutableBuffer&) {
+inline void TypedRowGroupStatistics<DType>::Copy(const T& src, T* dst, PoolBuffer*) {
   *dst = src;
 }
 
 template <>
 inline void TypedRowGroupStatistics<FLBAType>::Copy(
-    const FLBA& src, FLBA* dst, OwnedMutableBuffer& buffer) {
+    const FLBA& src, FLBA* dst, PoolBuffer* buffer) {
   if (dst->ptr == src.ptr) return;
   uint32_t len = descr_->type_length();
-  buffer.Resize(len);
-  std::memcpy(&buffer[0], src.ptr, len);
-  *dst = FLBA(buffer.data());
+  PARQUET_THROW_NOT_OK(buffer->Resize(len));
+  std::memcpy(buffer->mutable_data(), src.ptr, len);
+  *dst = FLBA(buffer->data());
 }
 
 template <>
 inline void TypedRowGroupStatistics<ByteArrayType>::Copy(
-    const ByteArray& src, ByteArray* dst, OwnedMutableBuffer& buffer) {
+    const ByteArray& src, ByteArray* dst, PoolBuffer* buffer) {
   if (dst->ptr == src.ptr) return;
-  buffer.Resize(src.len);
-  std::memcpy(&buffer[0], src.ptr, src.len);
-  *dst = ByteArray(src.len, buffer.data());
+  PARQUET_THROW_NOT_OK(buffer->Resize(src.len));
+  std::memcpy(buffer->mutable_data(), src.ptr, src.len);
+  *dst = ByteArray(src.len, buffer->data());
 }
 
 template <>

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 10632d2..9efa623 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -28,13 +28,15 @@
 #include <string>
 #include <vector>
 
+#include <gtest/gtest.h>
+
 #include "parquet/column/levels.h"
 #include "parquet/column/page.h"
 
 // Depended on by SerializedPageReader test utilities for now
 #include "parquet/encodings/dictionary-encoding.h"
 #include "parquet/encodings/plain-encoding.h"
-#include "parquet/util/input.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/test-common.h"
 
 using std::vector;
@@ -253,8 +255,8 @@ class DictionaryPageBuilder {
   }
 
   shared_ptr<Buffer> WriteDict() {
-    shared_ptr<OwnedMutableBuffer> dict_buffer =
-        std::make_shared<OwnedMutableBuffer>(encoder_->dict_encoded_size());
+    std::shared_ptr<PoolBuffer> dict_buffer =
+        AllocateBuffer(default_allocator(), encoder_->dict_encoded_size());
     encoder_->WriteDict(dict_buffer->mutable_data());
     return dict_buffer;
   }
@@ -262,7 +264,7 @@ class DictionaryPageBuilder {
   int32_t num_values() const { return num_dict_values_; }
 
  private:
-  MemPool pool_;
+  ChunkedAllocator pool_;
   shared_ptr<DictEncoder<TYPE>> encoder_;
   int32_t num_dict_values_;
   bool have_values_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 6112efe..7319d46 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -21,6 +21,7 @@
 #include "parquet/column/statistics.h"
 #include "parquet/encodings/dictionary-encoding.h"
 #include "parquet/encodings/plain-encoding.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -55,8 +56,8 @@ ColumnWriter::ColumnWriter(ColumnChunkMetaDataBuilder* metadata,
 }
 
 void ColumnWriter::InitSinks() {
-  definition_levels_sink_.reset(new InMemoryOutputStream());
-  repetition_levels_sink_.reset(new InMemoryOutputStream());
+  definition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator()));
+  repetition_levels_sink_.reset(new InMemoryOutputStream(properties_->allocator()));
 }
 
 void ColumnWriter::WriteDefinitionLevels(int64_t num_levels, const int16_t* levels) {
@@ -77,7 +78,8 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
   int64_t rle_size =
       LevelEncoder::MaxBufferSize(Encoding::RLE, max_level, num_buffered_values_) +
       sizeof(uint32_t);
-  auto buffer_rle = std::make_shared<OwnedMutableBuffer>(rle_size, allocator_);
+  std::shared_ptr<PoolBuffer> buffer_rle =
+      AllocateBuffer(properties_->allocator(), rle_size);
   level_encoder_.Init(Encoding::RLE, max_level, num_buffered_values_,
       buffer_rle->mutable_data() + sizeof(uint32_t),
       buffer_rle->size() - sizeof(uint32_t));
@@ -87,7 +89,7 @@ std::shared_ptr<Buffer> ColumnWriter::RleEncodeLevels(
   reinterpret_cast<uint32_t*>(buffer_rle->mutable_data())[0] = level_encoder_.len();
   int64_t encoded_size = level_encoder_.len() + sizeof(uint32_t);
   DCHECK(rle_size >= encoded_size);
-  buffer_rle->Resize(encoded_size);
+  PARQUET_THROW_NOT_OK(buffer_rle->Resize(encoded_size));
   return std::static_pointer_cast<Buffer>(buffer_rle);
 }
 
@@ -110,8 +112,8 @@ void ColumnWriter::AddDataPage() {
       definition_levels->size() + repetition_levels->size() + values->size();
 
   // Concatenate data into a single buffer
-  std::shared_ptr<OwnedMutableBuffer> uncompressed_data =
-      std::make_shared<OwnedMutableBuffer>(uncompressed_size, allocator_);
+  std::shared_ptr<PoolBuffer> uncompressed_data =
+      AllocateBuffer(allocator_, uncompressed_size);
   uint8_t* uncompressed_ptr = uncompressed_data->mutable_data();
   memcpy(uncompressed_ptr, repetition_levels->data(), repetition_levels->size());
   uncompressed_ptr += repetition_levels->size();
@@ -223,7 +225,8 @@ void TypedColumnWriter<Type>::CheckDictionarySizeLimit() {
 template <typename Type>
 void TypedColumnWriter<Type>::WriteDictionaryPage() {
   auto dict_encoder = static_cast<DictEncoder<Type>*>(current_encoder_.get());
-  auto buffer = std::make_shared<OwnedMutableBuffer>(dict_encoder->dict_encoded_size());
+  std::shared_ptr<PoolBuffer> buffer =
+      AllocateBuffer(properties_->allocator(), dict_encoder->dict_encoded_size());
   dict_encoder->WriteDict(buffer->mutable_data());
   // TODO Get rid of this deep call
   dict_encoder->mem_pool()->FreeAll();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
index 67a29bc..39d5934 100644
--- a/src/parquet/column/writer.h
+++ b/src/parquet/column/writer.h
@@ -28,9 +28,7 @@
 #include "parquet/file/metadata.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/mem-pool.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
 namespace parquet {
@@ -111,7 +109,7 @@ class PARQUET_EXPORT ColumnWriter {
   LevelEncoder level_encoder_;
 
   MemoryAllocator* allocator_;
-  MemPool pool_;
+  ChunkedAllocator pool_;
 
   // The total number of values stored in the data page. This is the maximum of
   // the number of encoded definition levels or encoded values. For

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/decoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/decoder.h b/src/parquet/encodings/decoder.h
index 4442507..1ac9f35 100644
--- a/src/parquet/encodings/decoder.h
+++ b/src/parquet/encodings/decoder.h
@@ -22,7 +22,7 @@
 
 #include "parquet/exception.h"
 #include "parquet/types.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/delta-bit-pack-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h
index 5353817..59774a4 100644
--- a/src/parquet/encodings/delta-bit-pack-encoding.h
+++ b/src/parquet/encodings/delta-bit-pack-encoding.h
@@ -24,7 +24,7 @@
 
 #include "parquet/encodings/decoder.h"
 #include "parquet/util/bit-stream-utils.inline.h"
-#include "parquet/util/buffer.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -36,7 +36,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
   explicit DeltaBitPackDecoder(
       const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
       : Decoder<DType>(descr, Encoding::DELTA_BINARY_PACKED),
-        delta_bit_widths_(0, allocator) {
+        delta_bit_widths_(new PoolBuffer(allocator)) {
     if (DType::type_num != Type::INT32 && DType::type_num != Type::INT64) {
       throw ParquetException("Delta bit pack encoding should only be for integer data.");
     }
@@ -62,28 +62,31 @@ class DeltaBitPackDecoder : public Decoder<DType> {
     if (!decoder_.GetVlqInt(&num_mini_blocks_)) ParquetException::EofException();
     if (!decoder_.GetVlqInt(&values_current_block_)) { ParquetException::EofException(); }
     if (!decoder_.GetZigZagVlqInt(&last_value_)) ParquetException::EofException();
-    delta_bit_widths_.Resize(num_mini_blocks_);
+    PARQUET_THROW_NOT_OK(delta_bit_widths_->Resize(num_mini_blocks_));
+
+    uint8_t* bit_width_data = delta_bit_widths_->mutable_data();
 
     if (!decoder_.GetZigZagVlqInt(&min_delta_)) ParquetException::EofException();
     for (int i = 0; i < num_mini_blocks_; ++i) {
-      if (!decoder_.GetAligned<uint8_t>(1, &delta_bit_widths_[i])) {
+      if (!decoder_.GetAligned<uint8_t>(1, bit_width_data + i)) {
         ParquetException::EofException();
       }
     }
     values_per_mini_block_ = block_size / num_mini_blocks_;
     mini_block_idx_ = 0;
-    delta_bit_width_ = delta_bit_widths_[0];
+    delta_bit_width_ = bit_width_data[0];
     values_current_mini_block_ = values_per_mini_block_;
   }
 
   template <typename T>
   int GetInternal(T* buffer, int max_values) {
     max_values = std::min(max_values, num_values_);
+    const uint8_t* bit_width_data = delta_bit_widths_->data();
     for (int i = 0; i < max_values; ++i) {
       if (UNLIKELY(values_current_mini_block_ == 0)) {
         ++mini_block_idx_;
-        if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_.size())) {
-          delta_bit_width_ = delta_bit_widths_[mini_block_idx_];
+        if (mini_block_idx_ < static_cast<size_t>(delta_bit_widths_->size())) {
+          delta_bit_width_ = bit_width_data[mini_block_idx_];
           values_current_mini_block_ = values_per_mini_block_;
         } else {
           InitBlock();
@@ -112,7 +115,7 @@ class DeltaBitPackDecoder : public Decoder<DType> {
 
   int32_t min_delta_;
   size_t mini_block_idx_;
-  OwnedMutableBuffer delta_bit_widths_;
+  std::unique_ptr<PoolBuffer> delta_bit_widths_;
   int delta_bit_width_;
 
   int32_t last_value_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 7823307..b79744a 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -27,11 +27,9 @@
 #include "parquet/encodings/decoder.h"
 #include "parquet/encodings/encoder.h"
 #include "parquet/encodings/plain-encoding.h"
-#include "parquet/util/buffer.h"
 #include "parquet/util/cpu-info.h"
 #include "parquet/util/hash-util.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/mem-pool.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/rle-encoding.h"
 
 namespace parquet {
@@ -48,7 +46,7 @@ class DictionaryDecoder : public Decoder<Type> {
       const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
       : Decoder<Type>(descr, Encoding::RLE_DICTIONARY),
         dictionary_(0, allocator),
-        byte_array_data_(0, allocator) {}
+        byte_array_data_(AllocateBuffer(allocator, 0)) {}
 
   // Perform type-specific initiatialization
   void SetDict(Decoder<Type>* dictionary);
@@ -78,7 +76,7 @@ class DictionaryDecoder : public Decoder<Type> {
 
   // Data that contains the byte array data (byte_array_dictionary_ just has the
   // pointers).
-  OwnedMutableBuffer byte_array_data_;
+  std::shared_ptr<PoolBuffer> byte_array_data_;
 
   RleDecoder idx_decoder_;
 };
@@ -106,11 +104,13 @@ inline void DictionaryDecoder<ByteArrayType>::SetDict(
   for (int i = 0; i < num_dictionary_values; ++i) {
     total_size += dictionary_[i].len;
   }
-  byte_array_data_.Resize(total_size);
+  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size));
   int offset = 0;
+
+  uint8_t* bytes_data = byte_array_data_->mutable_data();
   for (int i = 0; i < num_dictionary_values; ++i) {
-    memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len);
-    dictionary_[i].ptr = &byte_array_data_[offset];
+    memcpy(bytes_data + offset, dictionary_[i].ptr, dictionary_[i].len);
+    dictionary_[i].ptr = bytes_data + offset;
     offset += dictionary_[i].len;
   }
 }
@@ -124,11 +124,12 @@ inline void DictionaryDecoder<FLBAType>::SetDict(Decoder<FLBAType>* dictionary)
   int fixed_len = descr_->type_length();
   int total_size = num_dictionary_values * fixed_len;
 
-  byte_array_data_.Resize(total_size);
+  PARQUET_THROW_NOT_OK(byte_array_data_->Resize(total_size));
+  uint8_t* bytes_data = byte_array_data_->mutable_data();
   int offset = 0;
   for (int i = 0; i < num_dictionary_values; ++i) {
-    memcpy(&byte_array_data_[offset], dictionary_[i].ptr, fixed_len);
-    dictionary_[i].ptr = &byte_array_data_[offset];
+    memcpy(bytes_data + offset, dictionary_[i].ptr, fixed_len);
+    dictionary_[i].ptr = bytes_data + offset;
     offset += fixed_len;
   }
 }
@@ -158,7 +159,7 @@ class DictEncoder : public Encoder<DType> {
  public:
   typedef typename DType::c_type T;
 
-  explicit DictEncoder(const ColumnDescriptor* desc, MemPool* pool = nullptr,
+  explicit DictEncoder(const ColumnDescriptor* desc, ChunkedAllocator* pool = nullptr,
       MemoryAllocator* allocator = default_allocator())
       : Encoder<DType>(desc, Encoding::PLAIN_DICTIONARY, allocator),
         allocator_(allocator),
@@ -176,7 +177,7 @@ class DictEncoder : public Encoder<DType> {
 
   // TODO(wesm): think about how to address the construction semantics in
   // encodings/dictionary-encoding.h
-  void set_mem_pool(MemPool* pool) { pool_ = pool; }
+  void set_mem_pool(ChunkedAllocator* pool) { pool_ = pool; }
 
   void set_type_length(int type_length) { type_length_ = type_length; }
 
@@ -215,11 +216,11 @@ class DictEncoder : public Encoder<DType> {
   void Put(const T& value);
 
   std::shared_ptr<Buffer> FlushValues() override {
-    auto buffer = std::make_shared<OwnedMutableBuffer>(
-        EstimatedDataEncodedSize(), this->allocator_);
+    std::shared_ptr<PoolBuffer> buffer =
+        AllocateBuffer(this->allocator_, EstimatedDataEncodedSize());
     int result_size = WriteIndices(buffer->mutable_data(), EstimatedDataEncodedSize());
     ClearIndices();
-    buffer->Resize(result_size);
+    PARQUET_THROW_NOT_OK(buffer->Resize(result_size));
     return buffer;
   };
 
@@ -233,7 +234,7 @@ class DictEncoder : public Encoder<DType> {
   /// dict_encoded_size() bytes.
   void WriteDict(uint8_t* buffer);
 
-  MemPool* mem_pool() { return pool_; }
+  ChunkedAllocator* mem_pool() { return pool_; }
 
   /// The number of entries in the dictionary.
   int num_entries() const { return uniques_.size(); }
@@ -242,7 +243,7 @@ class DictEncoder : public Encoder<DType> {
   MemoryAllocator* allocator_;
 
   // For ByteArray / FixedLenByteArray data. Not owned
-  MemPool* pool_;
+  ChunkedAllocator* pool_;
 
   /// Size of the table. Must be a power of 2.
   int hash_table_size_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoder.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoder.h b/src/parquet/encodings/encoder.h
index a325ab5..c51f8d5 100644
--- a/src/parquet/encodings/encoder.h
+++ b/src/parquet/encodings/encoder.h
@@ -23,12 +23,11 @@
 
 #include "parquet/exception.h"
 #include "parquet/types.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
-class Buffer;
 class ColumnDescriptor;
-class OutputStream;
 
 // Base class for value encoders. Since encoders may or not have state (e.g.,
 // dictionary encoding) we use a class instance to maintain any state.

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoding-benchmark.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-benchmark.cc b/src/parquet/encodings/encoding-benchmark.cc
index e62d758..516e453 100644
--- a/src/parquet/encodings/encoding-benchmark.cc
+++ b/src/parquet/encodings/encoding-benchmark.cc
@@ -19,7 +19,7 @@
 
 #include "parquet/encodings/dictionary-encoding.h"
 #include "parquet/file/reader-internal.h"
-#include "parquet/util/mem-pool.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -101,23 +101,25 @@ static void DecodeDict(
   typedef typename Type::c_type T;
   int num_values = values.size();
 
-  MemPool pool;
+  ChunkedAllocator pool;
   MemoryAllocator* allocator = default_allocator();
   std::shared_ptr<ColumnDescriptor> descr = Int64Schema(Repetition::REQUIRED);
-  std::shared_ptr<OwnedMutableBuffer> dict_buffer =
-      std::make_shared<OwnedMutableBuffer>();
-  auto indices = std::make_shared<OwnedMutableBuffer>();
 
   DictEncoder<Type> encoder(descr.get(), &pool, allocator);
   for (int i = 0; i < num_values; ++i) {
     encoder.Put(values[i]);
   }
 
-  dict_buffer->Resize(encoder.dict_encoded_size());
+  std::shared_ptr<PoolBuffer> dict_buffer =
+      AllocateBuffer(allocator, encoder.dict_encoded_size());
+
+  std::shared_ptr<PoolBuffer> indices =
+      AllocateBuffer(allocator, encoder.EstimatedDataEncodedSize());
+
   encoder.WriteDict(dict_buffer->mutable_data());
-  indices->Resize(encoder.EstimatedDataEncodedSize());
   int actual_bytes = encoder.WriteIndices(indices->mutable_data(), indices->size());
-  indices->Resize(actual_bytes);
+
+  PARQUET_THROW_NOT_OK(indices->Resize(actual_bytes));
 
   while (state.KeepRunning()) {
     PlainDecoder<Type> dict_decoder(descr.get());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encoding-test.cc b/src/parquet/encodings/encoding-test.cc
index daa25cb..eccfc5d 100644
--- a/src/parquet/encodings/encoding-test.cc
+++ b/src/parquet/encodings/encoding-test.cc
@@ -28,8 +28,7 @@
 #include "parquet/schema/types.h"
 #include "parquet/types.h"
 #include "parquet/util/bit-util.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/test-common.h"
 
 using std::string;
@@ -178,7 +177,7 @@ class TestEncodingBase : public ::testing::Test {
   }
 
  protected:
-  MemPool pool_;
+  ChunkedAllocator pool_;
   MemoryAllocator* allocator_;
 
   int num_values_;
@@ -250,10 +249,9 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
   void CheckRoundtrip() {
     DictEncoder<Type> encoder(descr_.get(), &pool_);
 
-    dict_buffer_ = std::make_shared<OwnedMutableBuffer>();
-
     ASSERT_NO_THROW(encoder.Put(draws_, num_values_));
-    dict_buffer_->Resize(encoder.dict_encoded_size());
+    dict_buffer_ = AllocateBuffer(default_allocator(), encoder.dict_encoded_size());
+
     encoder.WriteDict(dict_buffer_->mutable_data());
 
     std::shared_ptr<Buffer> indices = encoder.FlushValues();
@@ -277,7 +275,7 @@ class TestDictionaryEncoding : public TestEncodingBase<Type> {
 
  protected:
   USING_BASE_MEMBERS();
-  std::shared_ptr<OwnedMutableBuffer> dict_buffer_;
+  std::shared_ptr<PoolBuffer> dict_buffer_;
 };
 
 TYPED_TEST_CASE(TestDictionaryEncoding, DictEncodedTypes);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index a3d7b69..d2127ef 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -25,8 +25,7 @@
 #include "parquet/encodings/encoder.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/util/bit-stream-utils.inline.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -163,8 +162,9 @@ class PlainEncoder : public Encoder<DType> {
 
   explicit PlainEncoder(
       const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
-      : Encoder<DType>(descr, Encoding::PLAIN, allocator),
-        values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {}
+      : Encoder<DType>(descr, Encoding::PLAIN, allocator) {
+    values_sink_.reset(new InMemoryOutputStream(allocator));
+  }
 
   int64_t EstimatedDataEncodedSize() override { return values_sink_->Tell(); }
 
@@ -172,7 +172,7 @@ class PlainEncoder : public Encoder<DType> {
   void Put(const T* src, int num_values) override;
 
  protected:
-  std::shared_ptr<InMemoryOutputStream> values_sink_;
+  std::unique_ptr<InMemoryOutputStream> values_sink_;
 };
 
 template <>
@@ -181,10 +181,10 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
   explicit PlainEncoder(
       const ColumnDescriptor* descr, MemoryAllocator* allocator = default_allocator())
       : Encoder<BooleanType>(descr, Encoding::PLAIN, allocator),
-        bits_available_(IN_MEMORY_DEFAULT_CAPACITY * 8),
-        bits_buffer_(IN_MEMORY_DEFAULT_CAPACITY, allocator),
-        values_sink_(new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, allocator)) {
-    bit_writer_.reset(new BitWriter(bits_buffer_.mutable_data(), bits_buffer_.size()));
+        bits_available_(kInMemoryDefaultCapacity * 8),
+        bits_buffer_(AllocateBuffer(allocator, kInMemoryDefaultCapacity)),
+        values_sink_(new InMemoryOutputStream(allocator)) {
+    bit_writer_.reset(new BitWriter(bits_buffer_->mutable_data(), bits_buffer_->size()));
   }
 
   int64_t EstimatedDataEncodedSize() override {
@@ -196,12 +196,11 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
       bit_writer_->Flush();
       values_sink_->Write(bit_writer_->buffer(), bit_writer_->bytes_written());
       bit_writer_->Clear();
-      bits_available_ = bits_buffer_.size() * 8;
+      bits_available_ = bits_buffer_->size() * 8;
     }
 
     std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
-    values_sink_.reset(
-        new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_));
+    values_sink_.reset(new InMemoryOutputStream(this->allocator_));
     return buffer;
   }
 
@@ -225,7 +224,7 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
                                                                                   \
     int bits_remaining = num_values - bit_offset;                                 \
     while (bit_offset < num_values) {                                             \
-      bits_available_ = bits_buffer_.size() * 8;                                  \
+      bits_available_ = bits_buffer_->size() * 8;                                 \
                                                                                   \
       int bits_to_write = std::min(bits_available_, bits_remaining);              \
       for (int i = bit_offset; i < bit_offset + bits_to_write; i++) {             \
@@ -249,15 +248,14 @@ class PlainEncoder<BooleanType> : public Encoder<BooleanType> {
  protected:
   int bits_available_;
   std::unique_ptr<BitWriter> bit_writer_;
-  OwnedMutableBuffer bits_buffer_;
-  std::shared_ptr<InMemoryOutputStream> values_sink_;
+  std::shared_ptr<PoolBuffer> bits_buffer_;
+  std::unique_ptr<InMemoryOutputStream> values_sink_;
 };
 
 template <typename DType>
 inline std::shared_ptr<Buffer> PlainEncoder<DType>::FlushValues() {
   std::shared_ptr<Buffer> buffer = values_sink_->GetBuffer();
-  values_sink_.reset(
-      new InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY, this->allocator_));
+  values_sink_.reset(new InMemoryOutputStream(this->allocator_));
   return buffer;
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/file-deserialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-deserialize-test.cc b/src/parquet/file/file-deserialize-test.cc
index 5287885..fbb511a 100644
--- a/src/parquet/file/file-deserialize-test.cc
+++ b/src/parquet/file/file-deserialize-test.cc
@@ -33,12 +33,13 @@
 #include "parquet/thrift/parquet_types.h"
 #include "parquet/thrift/util.h"
 #include "parquet/types.h"
-#include "parquet/util/input.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/test-common.h"
 
 namespace parquet {
 
+using ::arrow::io::BufferReader;
+
 // Adds page statistics occupying a certain amount of bytes (for testing very
 // large page headers)
 static inline void AddDummyStats(int stat_size, format::DataPageHeader& data_page) {
@@ -234,11 +235,13 @@ TEST_F(TestPageSerde, LZONotSupported) {
 class TestParquetFileReader : public ::testing::Test {
  public:
   void AssertInvalidFileThrows(const std::shared_ptr<Buffer>& buffer) {
-    std::unique_ptr<BufferReader> reader(new BufferReader(buffer));
     reader_.reset(new ParquetFileReader());
 
+    auto reader = std::make_shared<BufferReader>(buffer);
+    auto wrapper = std::unique_ptr<ArrowInputFile>(new ArrowInputFile(reader));
+
     ASSERT_THROW(
-        reader_->Open(SerializedFile::Open(std::move(reader))), ParquetException);
+        reader_->Open(SerializedFile::Open(std::move(wrapper))), ParquetException);
   }
 
  protected:

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/file-serialize-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/file-serialize-test.cc b/src/parquet/file/file-serialize-test.cc
index 3a11cd8..7a90eeb 100644
--- a/src/parquet/file/file-serialize-test.cc
+++ b/src/parquet/file/file-serialize-test.cc
@@ -24,8 +24,7 @@
 #include "parquet/file/reader.h"
 #include "parquet/file/writer.h"
 #include "parquet/types.h"
-#include "parquet/util/input.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -75,8 +74,9 @@ class TestSerialize : public PrimitiveTypedTest<TestType> {
     file_writer->Close();
 
     auto buffer = sink->GetBuffer();
-    std::unique_ptr<RandomAccessSource> source(new BufferReader(buffer));
-    auto file_reader = ParquetFileReader::Open(std::move(source));
+
+    auto source = std::make_shared<::arrow::io::BufferReader>(buffer);
+    auto file_reader = ParquetFileReader::Open(source);
     ASSERT_EQ(num_columns_, file_reader->metadata()->num_columns());
     ASSERT_EQ(1, file_reader->metadata()->num_row_groups());
     ASSERT_EQ(100, file_reader->metadata()->num_rows());

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/metadata.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.cc b/src/parquet/file/metadata.cc
index adfcb69..692a0f5 100644
--- a/src/parquet/file/metadata.cc
+++ b/src/parquet/file/metadata.cc
@@ -23,6 +23,7 @@
 #include "parquet/file/metadata.h"
 #include "parquet/schema/converter.h"
 #include "parquet/thrift/util.h"
+#include "parquet/util/memory.h"
 
 #include <boost/algorithm/string.hpp>
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/metadata.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/metadata.h b/src/parquet/file/metadata.h
index c5dd03a..ef19c98 100644
--- a/src/parquet/file/metadata.h
+++ b/src/parquet/file/metadata.h
@@ -27,7 +27,7 @@
 #include "parquet/compression/codec.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/types.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
 namespace parquet {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.cc b/src/parquet/file/reader-internal.cc
index 37c790c..2c3ebb3 100644
--- a/src/parquet/file/reader-internal.cc
+++ b/src/parquet/file/reader-internal.cc
@@ -32,8 +32,7 @@
 #include "parquet/schema/types.h"
 #include "parquet/thrift/util.h"
 #include "parquet/types.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/input.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -44,7 +43,7 @@ namespace parquet {
 SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
     int64_t total_num_rows, Compression::type codec_type, MemoryAllocator* allocator)
     : stream_(std::move(stream)),
-      decompression_buffer_(0, allocator),
+      decompression_buffer_(AllocateBuffer(allocator, 0)),
       seen_num_rows_(0),
       total_num_rows_(total_num_rows) {
   max_page_header_size_ = DEFAULT_MAX_PAGE_HEADER_SIZE;
@@ -97,12 +96,12 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
     // Uncompress it if we need to
     if (decompressor_ != NULL) {
       // Grow the uncompressed buffer if we need to.
-      if (uncompressed_len > static_cast<int>(decompression_buffer_.size())) {
-        decompression_buffer_.Resize(uncompressed_len);
+      if (uncompressed_len > static_cast<int>(decompression_buffer_->size())) {
+        PARQUET_THROW_NOT_OK(decompression_buffer_->Resize(uncompressed_len));
       }
-      decompressor_->Decompress(
-          compressed_len, buffer, uncompressed_len, &decompression_buffer_[0]);
-      buffer = &decompression_buffer_[0];
+      decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+          decompression_buffer_->mutable_data());
+      buffer = decompression_buffer_->data();
     }
 
     auto page_buffer = std::make_shared<Buffer>(buffer, uncompressed_len);
@@ -207,7 +206,7 @@ static constexpr uint32_t FOOTER_SIZE = 8;
 static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
 
 std::unique_ptr<ParquetFileReader::Contents> SerializedFile::Open(
-    std::unique_ptr<RandomAccessSource> source, ReaderProperties props) {
+    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props) {
   std::unique_ptr<ParquetFileReader::Contents> result(
       new SerializedFile(std::move(source), props));
 
@@ -239,39 +238,40 @@ const FileMetaData* SerializedFile::metadata() const {
 }
 
 SerializedFile::SerializedFile(std::unique_ptr<RandomAccessSource> source,
-    ReaderProperties props = default_reader_properties())
+    const ReaderProperties& props = default_reader_properties())
     : source_(std::move(source)), properties_(props) {}
 
 void SerializedFile::ParseMetaData() {
-  int64_t filesize = source_->Size();
+  int64_t file_size = source_->Size();
 
-  if (filesize < FOOTER_SIZE) {
+  if (file_size < FOOTER_SIZE) {
     throw ParquetException("Corrupted file, smaller than file footer");
   }
 
   uint8_t footer_buffer[FOOTER_SIZE];
-  source_->Seek(filesize - FOOTER_SIZE);
+  source_->Seek(file_size - FOOTER_SIZE);
   int64_t bytes_read = source_->Read(FOOTER_SIZE, footer_buffer);
   if (bytes_read != FOOTER_SIZE || memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
     throw ParquetException("Invalid parquet file. Corrupt footer.");
   }
 
   uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
-  int64_t metadata_start = filesize - FOOTER_SIZE - metadata_len;
-  if (FOOTER_SIZE + metadata_len > filesize) {
+  int64_t metadata_start = file_size - FOOTER_SIZE - metadata_len;
+  if (FOOTER_SIZE + metadata_len > file_size) {
     throw ParquetException(
         "Invalid parquet file. File is less than "
         "file metadata size.");
   }
   source_->Seek(metadata_start);
 
-  OwnedMutableBuffer metadata_buffer(metadata_len, properties_.allocator());
-  bytes_read = source_->Read(metadata_len, &metadata_buffer[0]);
+  std::shared_ptr<PoolBuffer> metadata_buffer =
+      AllocateBuffer(properties_.allocator(), metadata_len);
+  bytes_read = source_->Read(metadata_len, metadata_buffer->mutable_data());
   if (bytes_read != metadata_len) {
     throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
   }
 
-  file_metadata_ = FileMetaData::Make(&metadata_buffer[0], &metadata_len);
+  file_metadata_ = FileMetaData::Make(metadata_buffer->data(), &metadata_len);
 }
 
 }  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader-internal.h b/src/parquet/file/reader-internal.h
index 582ab35..aa9b75e 100644
--- a/src/parquet/file/reader-internal.h
+++ b/src/parquet/file/reader-internal.h
@@ -29,7 +29,7 @@
 #include "parquet/file/reader.h"
 #include "parquet/thrift/parquet_types.h"
 #include "parquet/types.h"
-#include "parquet/util/input.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -62,7 +62,7 @@ class SerializedPageReader : public PageReader {
 
   // Compression codec to use.
   std::unique_ptr<Codec> decompressor_;
-  OwnedMutableBuffer decompression_buffer_;
+  std::shared_ptr<PoolBuffer> decompression_buffer_;
 
   // Maximum allowed page size
   uint32_t max_page_header_size_;
@@ -104,7 +104,7 @@ class SerializedFile : public ParquetFileReader::Contents {
   // lifetime separately
   static std::unique_ptr<ParquetFileReader::Contents> Open(
       std::unique_ptr<RandomAccessSource> source,
-      ReaderProperties props = default_reader_properties());
+      const ReaderProperties& props = default_reader_properties());
   virtual void Close();
   virtual std::shared_ptr<RowGroupReader> GetRowGroup(int i);
   virtual const FileMetaData* metadata() const;
@@ -113,7 +113,7 @@ class SerializedFile : public ParquetFileReader::Contents {
  private:
   // This class takes ownership of the provided data source
   explicit SerializedFile(
-      std::unique_ptr<RandomAccessSource> source, ReaderProperties props);
+      std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props);
 
   std::unique_ptr<RandomAccessSource> source_;
   std::unique_ptr<FileMetaData> file_metadata_;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.cc b/src/parquet/file/reader.cc
index 06d2d8e..52fe57a 100644
--- a/src/parquet/file/reader.cc
+++ b/src/parquet/file/reader.cc
@@ -24,14 +24,16 @@
 #include <utility>
 #include <vector>
 
+#include "arrow/io/file.h"
+
 #include "parquet/column/page.h"
 #include "parquet/column/reader.h"
 #include "parquet/column/scanner.h"
 #include "parquet/exception.h"
 #include "parquet/file/reader-internal.h"
 #include "parquet/types.h"
-#include "parquet/util/input.h"
 #include "parquet/util/logging.h"
+#include "parquet/util/memory.h"
 
 using std::string;
 using std::vector;
@@ -69,26 +71,36 @@ ParquetFileReader::~ParquetFileReader() {
 }
 
 std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
-    std::unique_ptr<RandomAccessSource> source, ReaderProperties props) {
-  auto contents = SerializedFile::Open(std::move(source), props);
+    const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
+    const ReaderProperties& props) {
+  std::unique_ptr<RandomAccessSource> io_wrapper(new ArrowInputFile(source));
+  return Open(std::move(io_wrapper), props);
+}
 
+std::unique_ptr<ParquetFileReader> ParquetFileReader::Open(
+    std::unique_ptr<RandomAccessSource> source, const ReaderProperties& props) {
+  auto contents = SerializedFile::Open(std::move(source), props);
   std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
   result->Open(std::move(contents));
-
   return result;
 }
 
 std::unique_ptr<ParquetFileReader> ParquetFileReader::OpenFile(
-    const std::string& path, bool memory_map, ReaderProperties props) {
-  std::unique_ptr<LocalFileSource> file;
+    const std::string& path, bool memory_map, const ReaderProperties& props) {
+  std::shared_ptr<::arrow::io::ReadableFileInterface> source;
   if (memory_map) {
-    file.reset(new MemoryMapSource(props.allocator()));
+    std::shared_ptr<::arrow::io::ReadableFile> handle;
+    PARQUET_THROW_NOT_OK(
+        ::arrow::io::ReadableFile::Open(path, props.allocator(), &handle));
+    source = handle;
   } else {
-    file.reset(new LocalFileSource(props.allocator()));
+    std::shared_ptr<::arrow::io::MemoryMappedFile> handle;
+    PARQUET_THROW_NOT_OK(
+        ::arrow::io::MemoryMappedFile::Open(path, ::arrow::io::FileMode::READ, &handle));
+    source = handle;
   }
-  file->Open(path);
 
-  return Open(std::move(file), props);
+  return Open(source, props);
 }
 
 void ParquetFileReader::Open(std::unique_ptr<ParquetFileReader::Contents> contents) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/reader.h b/src/parquet/file/reader.h
index ca28f67..1c24506 100644
--- a/src/parquet/file/reader.h
+++ b/src/parquet/file/reader.h
@@ -30,12 +30,12 @@
 #include "parquet/column/statistics.h"
 #include "parquet/file/metadata.h"
 #include "parquet/schema/descriptor.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
 namespace parquet {
 
 class ColumnReader;
-class RandomAccessSource;
 
 class PARQUET_EXPORT RowGroupReader {
  public:
@@ -79,15 +79,27 @@ class PARQUET_EXPORT ParquetFileReader {
   ParquetFileReader();
   ~ParquetFileReader();
 
+  // Create a reader from some implementation of parquet-cpp's generic file
+  // input interface
+  //
+  // If you cannot provide exclusive access to your file resource, create a
+  // subclass of RandomAccessSource that wraps the shared resource
+  static std::unique_ptr<ParquetFileReader> Open(
+      std::unique_ptr<RandomAccessSource> source,
+      const ReaderProperties& props = default_reader_properties());
+
+  // Create a file reader instance from an Arrow file object. Thread-safety is
+  // the responsibility of the file implementation
+  static std::unique_ptr<ParquetFileReader> Open(
+      const std::shared_ptr<::arrow::io::ReadableFileInterface>& source,
+      const ReaderProperties& props = default_reader_properties());
+
   // API Convenience to open a serialized Parquet file on disk, using built-in IO
   // interface implementations that were created for testing, and may not be robust for
   // all use cases.
   static std::unique_ptr<ParquetFileReader> OpenFile(const std::string& path,
-      bool memory_map = true, ReaderProperties props = default_reader_properties());
-
-  static std::unique_ptr<ParquetFileReader> Open(
-      std::unique_ptr<RandomAccessSource> source,
-      ReaderProperties props = default_reader_properties());
+      bool memory_map = true,
+      const ReaderProperties& props = default_reader_properties());
 
   void Open(std::unique_ptr<Contents> contents);
   void Close();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer-internal.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.cc b/src/parquet/file/writer-internal.cc
index c4681bd..48884ad 100644
--- a/src/parquet/file/writer-internal.cc
+++ b/src/parquet/file/writer-internal.cc
@@ -20,7 +20,7 @@
 #include "parquet/column/writer.h"
 #include "parquet/schema/converter.h"
 #include "parquet/thrift/util.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 
 using parquet::schema::GroupNode;
 using parquet::schema::SchemaFlattener;
@@ -37,6 +37,7 @@ SerializedPageWriter::SerializedPageWriter(OutputStream* sink, Compression::type
     ColumnChunkMetaDataBuilder* metadata, MemoryAllocator* allocator)
     : sink_(sink),
       metadata_(metadata),
+      allocator_(allocator),
       num_values_(0),
       dictionary_page_offset_(0),
       data_page_offset_(0),
@@ -71,10 +72,13 @@ std::shared_ptr<Buffer> SerializedPageWriter::Compress(
   // Compress the data
   int64_t max_compressed_size =
       compressor_->MaxCompressedLen(buffer->size(), buffer->data());
-  auto compression_buffer = std::make_shared<OwnedMutableBuffer>(max_compressed_size);
+
+  std::shared_ptr<PoolBuffer> compression_buffer =
+      AllocateBuffer(allocator_, max_compressed_size);
+
   int64_t compressed_size = compressor_->Compress(buffer->size(), buffer->data(),
       max_compressed_size, compression_buffer->mutable_data());
-  compression_buffer->Resize(compressed_size);
+  PARQUET_THROW_NOT_OK(compression_buffer->Resize(compressed_size));
   return compression_buffer;
 }
 
@@ -182,7 +186,7 @@ void RowGroupSerializer::Close() {
 // FileSerializer
 
 std::unique_ptr<ParquetFileWriter::Contents> FileSerializer::Open(
-    std::shared_ptr<OutputStream> sink, const std::shared_ptr<GroupNode>& schema,
+    const std::shared_ptr<OutputStream>& sink, const std::shared_ptr<GroupNode>& schema,
     const std::shared_ptr<WriterProperties>& properties) {
   std::unique_ptr<ParquetFileWriter::Contents> result(
       new FileSerializer(sink, schema, properties));
@@ -248,7 +252,7 @@ void FileSerializer::WriteMetaData() {
   sink_->Write(PARQUET_MAGIC, 4);
 }
 
-FileSerializer::FileSerializer(std::shared_ptr<OutputStream> sink,
+FileSerializer::FileSerializer(const std::shared_ptr<OutputStream>& sink,
     const std::shared_ptr<GroupNode>& schema,
     const std::shared_ptr<WriterProperties>& properties)
     : sink_(sink),

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer-internal.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer-internal.h b/src/parquet/file/writer-internal.h
index f1f76ab..81a0837 100644
--- a/src/parquet/file/writer-internal.h
+++ b/src/parquet/file/writer-internal.h
@@ -26,6 +26,7 @@
 #include "parquet/file/metadata.h"
 #include "parquet/file/writer.h"
 #include "parquet/thrift/parquet_types.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 
@@ -54,6 +55,7 @@ class SerializedPageWriter : public PageWriter {
  private:
   OutputStream* sink_;
   ColumnChunkMetaDataBuilder* metadata_;
+  MemoryAllocator* allocator_;
   int64_t num_values_;
   int64_t dictionary_page_offset_;
   int64_t data_page_offset_;
@@ -102,7 +104,7 @@ class RowGroupSerializer : public RowGroupWriter::Contents {
 class FileSerializer : public ParquetFileWriter::Contents {
  public:
   static std::unique_ptr<ParquetFileWriter::Contents> Open(
-      std::shared_ptr<OutputStream> sink,
+      const std::shared_ptr<OutputStream>& sink,
       const std::shared_ptr<schema::GroupNode>& schema,
       const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
 
@@ -119,7 +121,7 @@ class FileSerializer : public ParquetFileWriter::Contents {
   virtual ~FileSerializer();
 
  private:
-  explicit FileSerializer(std::shared_ptr<OutputStream> sink,
+  explicit FileSerializer(const std::shared_ptr<OutputStream>& sink,
       const std::shared_ptr<schema::GroupNode>& schema,
       const std::shared_ptr<WriterProperties>& properties);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.cc b/src/parquet/file/writer.cc
index 8c9f52f..a381c22 100644
--- a/src/parquet/file/writer.cc
+++ b/src/parquet/file/writer.cc
@@ -18,7 +18,7 @@
 #include "parquet/file/writer.h"
 
 #include "parquet/file/writer-internal.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 
 using parquet::schema::GroupNode;
 
@@ -51,13 +51,19 @@ ParquetFileWriter::~ParquetFileWriter() {
 }
 
 std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
-    std::shared_ptr<OutputStream> sink, const std::shared_ptr<GroupNode>& schema,
+    const std::shared_ptr<::arrow::io::OutputStream>& sink,
+    const std::shared_ptr<GroupNode>& schema,
     const std::shared_ptr<WriterProperties>& properties) {
-  auto contents = FileSerializer::Open(sink, schema, properties);
+  return Open(std::make_shared<ArrowOutputStream>(sink), schema, properties);
+}
 
+std::unique_ptr<ParquetFileWriter> ParquetFileWriter::Open(
+    const std::shared_ptr<OutputStream>& sink,
+    const std::shared_ptr<schema::GroupNode>& schema,
+    const std::shared_ptr<WriterProperties>& properties) {
+  auto contents = FileSerializer::Open(sink, schema, properties);
   std::unique_ptr<ParquetFileWriter> result(new ParquetFileWriter());
   result->Open(std::move(contents));
-
   return result;
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/file/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/file/writer.h b/src/parquet/file/writer.h
index e82f016..6d7161b 100644
--- a/src/parquet/file/writer.h
+++ b/src/parquet/file/writer.h
@@ -24,7 +24,7 @@
 #include "parquet/column/properties.h"
 #include "parquet/schema/descriptor.h"
 #include "parquet/schema/types.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
 #include "parquet/util/visibility.h"
 
 namespace parquet {
@@ -97,7 +97,13 @@ class PARQUET_EXPORT ParquetFileWriter {
   ParquetFileWriter();
   ~ParquetFileWriter();
 
-  static std::unique_ptr<ParquetFileWriter> Open(std::shared_ptr<OutputStream> sink,
+  static std::unique_ptr<ParquetFileWriter> Open(
+      const std::shared_ptr<::arrow::io::OutputStream>& sink,
+      const std::shared_ptr<schema::GroupNode>& schema,
+      const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
+
+  static std::unique_ptr<ParquetFileWriter> Open(
+      const std::shared_ptr<OutputStream>& sink,
       const std::shared_ptr<schema::GroupNode>& schema,
       const std::shared_ptr<WriterProperties>& properties = default_writer_properties());
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index d21a809..e3be9b0 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -23,17 +23,20 @@
 #include <memory>
 #include <string>
 
+#include "arrow/io/file.h"
+
 #include "parquet/column/reader.h"
 #include "parquet/column/scanner.h"
 #include "parquet/file/reader-internal.h"
 #include "parquet/file/reader.h"
-#include "parquet/util/input.h"
-#include "parquet/util/mem-allocator.h"
+#include "parquet/util/memory.h"
 
 using std::string;
 
 namespace parquet {
 
+using ReadableFile = ::arrow::io::ReadableFile;
+
 const char* data_dir = std::getenv("PARQUET_TEST_DATA");
 
 std::string alltypes_plain() {
@@ -159,7 +162,7 @@ TEST_F(TestAllTypesPlain, ColumnSelectionOutOfRange) {
   ASSERT_THROW(reader_->DebugPrint(ss, columns), ParquetException);
 }
 
-class TestLocalFileSource : public ::testing::Test {
+class TestLocalFile : public ::testing::Test {
  public:
   void SetUp() {
     std::string dir_string(data_dir);
@@ -168,24 +171,25 @@ class TestLocalFileSource : public ::testing::Test {
     ss << dir_string << "/"
        << "alltypes_plain.parquet";
 
-    file.reset(new LocalFileSource());
-    file->Open(ss.str());
+    PARQUET_THROW_NOT_OK(ReadableFile::Open(ss.str(), &handle));
+    fileno = handle->file_descriptor();
   }
 
   void TearDown() {}
 
  protected:
-  std::unique_ptr<LocalFileSource> file;
+  int fileno;
+  std::shared_ptr<::arrow::io::ReadableFile> handle;
 };
 
-TEST_F(TestLocalFileSource, FileClosedOnDestruction) {
-  int file_desc = file->file_descriptor();
+TEST_F(TestLocalFile, FileClosedOnDestruction) {
   {
-    auto contents = SerializedFile::Open(std::move(file));
+    auto contents = SerializedFile::Open(
+        std::unique_ptr<RandomAccessSource>(new ArrowInputFile(handle)));
     std::unique_ptr<ParquetFileReader> result(new ParquetFileReader());
     result->Open(std::move(contents));
   }
-  ASSERT_EQ(-1, fcntl(file_desc, F_GETFD));
+  ASSERT_EQ(-1, fcntl(fileno, F_GETFD));
   ASSERT_EQ(EBADF, errno);
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/thrift/util.h
----------------------------------------------------------------------
diff --git a/src/parquet/thrift/util.h b/src/parquet/thrift/util.h
index 1800435..9d2b66f 100644
--- a/src/parquet/thrift/util.h
+++ b/src/parquet/thrift/util.h
@@ -37,7 +37,7 @@
 #include "parquet/exception.h"
 #include "parquet/thrift/parquet_types.h"
 #include "parquet/util/logging.h"
-#include "parquet/util/output.h"
+#include "parquet/util/memory.h"
 
 namespace parquet {
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 3a4b1c9..7a9ccba 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -20,17 +20,13 @@ install(FILES
   bit-stream-utils.h
   bit-stream-utils.inline.h
   bit-util.h
-  buffer.h
   buffer-builder.h
   compiler-util.h
   cpu-info.h
   hash-util.h
-  input.h
   logging.h
   macros.h
-  mem-allocator.h
-  mem-pool.h
-  output.h
+  memory.h
   rle-encoding.h
   stopwatch.h
   sse-util.h
@@ -70,9 +66,6 @@ if (PARQUET_BUILD_BENCHMARKS)
 endif()
 
 ADD_PARQUET_TEST(bit-util-test)
-ADD_PARQUET_TEST(buffer-test)
 ADD_PARQUET_TEST(comparison-test)
-ADD_PARQUET_TEST(input-output-test)
-ADD_PARQUET_TEST(mem-allocator-test)
-ADD_PARQUET_TEST(mem-pool-test)
+ADD_PARQUET_TEST(memory-test)
 ADD_PARQUET_TEST(rle-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer-test.cc b/src/parquet/util/buffer-test.cc
deleted file mode 100644
index ee5b000..0000000
--- a/src/parquet/util/buffer-test.cc
+++ /dev/null
@@ -1,65 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <cstdint>
-#include <cstdlib>
-#include <exception>
-#include <gtest/gtest.h>
-#include <limits>
-#include <memory>
-#include <string>
-
-#include "parquet/exception.h"
-#include "parquet/util/buffer.h"
-
-using std::string;
-
-namespace parquet {
-
-class TestBuffer : public ::testing::Test {};
-
-TEST_F(TestBuffer, Resize) {
-  OwnedMutableBuffer buf;
-
-  ASSERT_EQ(0, buf.size());
-  ASSERT_NO_THROW(buf.Resize(100));
-  ASSERT_EQ(100, buf.size());
-  ASSERT_NO_THROW(buf.Resize(200));
-  ASSERT_EQ(200, buf.size());
-
-  // Make it smaller, too
-  ASSERT_NO_THROW(buf.Resize(50));
-  ASSERT_EQ(50, buf.size());
-}
-
-TEST_F(TestBuffer, ResizeOOM) {
-// Tests that deliberately throw Exceptions foul up valgrind and report
-// red herring memory leaks
-#ifndef PARQUET_VALGRIND
-  OwnedMutableBuffer buf;
-  ASSERT_NO_THROW(buf.Resize(100));
-  int64_t to_alloc = std::numeric_limits<int64_t>::max();
-  try {
-    buf.Resize(to_alloc);
-    FAIL() << "Exception not thrown";
-  } catch (const ParquetException& e) {
-    // pass
-  } catch (const std::exception& e) { FAIL() << "Different exception thrown"; }
-#endif
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.cc b/src/parquet/util/buffer.cc
deleted file mode 100644
index 0b7100c..0000000
--- a/src/parquet/util/buffer.cc
+++ /dev/null
@@ -1,123 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/util/buffer.h"
-
-#include <algorithm>
-#include <cstdint>
-
-#include "parquet/exception.h"
-#include "parquet/types.h"
-
-namespace parquet {
-
-Buffer::Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size) {
-  data_ = parent->data() + offset;
-  size_ = size;
-  parent_ = parent;
-}
-
-std::shared_ptr<Buffer> MutableBuffer::GetImmutableView() {
-  return std::make_shared<Buffer>(this->get_shared_ptr(), 0, size());
-}
-
-OwnedMutableBuffer::OwnedMutableBuffer(int64_t size, MemoryAllocator* allocator)
-    : ResizableBuffer(nullptr, 0), allocator_(allocator) {
-  Resize(size);
-}
-
-OwnedMutableBuffer::~OwnedMutableBuffer() {
-  if (mutable_data_) { allocator_->Free(mutable_data_, capacity_); }
-}
-
-void OwnedMutableBuffer::Reserve(int64_t new_capacity) {
-  if (!mutable_data_ || new_capacity > capacity_) {
-    if (mutable_data_) {
-      uint8_t* new_data = allocator_->Malloc(new_capacity);
-      memcpy(new_data, mutable_data_, size_);
-      allocator_->Free(mutable_data_, capacity_);
-      mutable_data_ = new_data;
-    } else {
-      mutable_data_ = allocator_->Malloc(new_capacity);
-    }
-    data_ = mutable_data_;
-    capacity_ = new_capacity;
-  }
-}
-
-void OwnedMutableBuffer::Resize(int64_t new_size) {
-  Reserve(new_size);
-  size_ = new_size;
-}
-
-uint8_t& OwnedMutableBuffer::operator[](int64_t i) {
-  return mutable_data_[i];
-}
-
-template <class T>
-Vector<T>::Vector(int64_t size, MemoryAllocator* allocator)
-    : buffer_(new OwnedMutableBuffer(size * sizeof(T), allocator)),
-      size_(size),
-      capacity_(size) {
-  if (size > 0) {
-    data_ = reinterpret_cast<T*>(buffer_->mutable_data());
-  } else {
-    data_ = nullptr;
-  }
-}
-
-template <class T>
-void Vector<T>::Reserve(int64_t new_capacity) {
-  if (new_capacity > capacity_) {
-    buffer_->Resize(new_capacity * sizeof(T));
-    data_ = reinterpret_cast<T*>(buffer_->mutable_data());
-    capacity_ = new_capacity;
-  }
-}
-
-template <class T>
-void Vector<T>::Resize(int64_t new_size) {
-  Reserve(new_size);
-  size_ = new_size;
-}
-
-template <class T>
-void Vector<T>::Assign(int64_t size, const T val) {
-  Resize(size);
-  for (int64_t i = 0; i < size_; i++) {
-    data_[i] = val;
-  }
-}
-
-template <class T>
-void Vector<T>::Swap(Vector<T>& v) {
-  buffer_.swap(v.buffer_);
-  std::swap(size_, v.size_);
-  std::swap(capacity_, v.capacity_);
-  std::swap(data_, v.data_);
-}
-
-template class Vector<int32_t>;
-template class Vector<int64_t>;
-template class Vector<bool>;
-template class Vector<float>;
-template class Vector<double>;
-template class Vector<Int96>;
-template class Vector<ByteArray>;
-template class Vector<FixedLenByteArray>;
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/buffer.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/buffer.h b/src/parquet/util/buffer.h
deleted file mode 100644
index 58a5f5e..0000000
--- a/src/parquet/util/buffer.h
+++ /dev/null
@@ -1,149 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_UTIL_BUFFER_H
-#define PARQUET_UTIL_BUFFER_H
-
-#include <cstdint>
-#include <cstdlib>
-#include <cstring>
-#include <memory>
-#include <vector>
-
-#include "parquet/util/macros.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// Buffer classes
-
-// Immutable API for a chunk of bytes which may or may not be owned by the
-// class instance
-class PARQUET_EXPORT Buffer : public std::enable_shared_from_this<Buffer> {
- public:
-  Buffer(const uint8_t* data, int64_t size) : data_(data), size_(size) {}
-
-  // An offset into data that is owned by another buffer, but we want to be
-  // able to retain a valid pointer to it even after other shared_ptr's to the
-  // parent buffer have been destroyed
-  Buffer(const std::shared_ptr<Buffer>& parent, int64_t offset, int64_t size);
-
-  std::shared_ptr<Buffer> get_shared_ptr() { return shared_from_this(); }
-
-  // Return true if both buffers are the same size and contain the same bytes
-  // up to the number of compared bytes
-  bool Equals(const Buffer& other, int64_t nbytes) const {
-    return this == &other || (size_ >= nbytes && other.size_ >= nbytes &&
-                                 !memcmp(data_, other.data_, nbytes));
-  }
-
-  bool Equals(const Buffer& other) const {
-    return this == &other || (size_ == other.size_ && !memcmp(data_, other.data_, size_));
-  }
-
-  const uint8_t* data() const { return data_; }
-
-  int64_t size() const { return size_; }
-
-  // Returns true if this Buffer is referencing memory (possibly) owned by some
-  // other buffer
-  bool is_shared() const { return static_cast<bool>(parent_); }
-
-  const std::shared_ptr<Buffer> parent() const { return parent_; }
-
- protected:
-  const uint8_t* data_;
-  int64_t size_;
-
-  // nullptr by default, but may be set
-  std::shared_ptr<Buffer> parent_;
-
- private:
-  DISALLOW_COPY_AND_ASSIGN(Buffer);
-};
-
-// A Buffer whose contents can be mutated. May or may not own its data.
-class PARQUET_EXPORT MutableBuffer : public Buffer {
- public:
-  MutableBuffer(uint8_t* data, int64_t size) : Buffer(data, size) {
-    mutable_data_ = data;
-  }
-
-  uint8_t* mutable_data() { return mutable_data_; }
-
-  // Get a read-only view of this buffer
-  std::shared_ptr<Buffer> GetImmutableView();
-
- protected:
-  MutableBuffer() : Buffer(nullptr, 0), mutable_data_(nullptr) {}
-
-  uint8_t* mutable_data_;
-};
-
-class PARQUET_EXPORT ResizableBuffer : public MutableBuffer {
- public:
-  virtual void Resize(int64_t new_size) = 0;
-
- protected:
-  ResizableBuffer(uint8_t* data, int64_t size)
-      : MutableBuffer(data, size), capacity_(size) {}
-  int64_t capacity_;
-};
-
-// A ResizableBuffer whose memory is owned by the class instance. For example,
-// for reading data out of files that you want to deallocate when this class is
-// garbage-collected
-class PARQUET_EXPORT OwnedMutableBuffer : public ResizableBuffer {
- public:
-  explicit OwnedMutableBuffer(
-      int64_t size = 0, MemoryAllocator* allocator = default_allocator());
-  virtual ~OwnedMutableBuffer();
-  void Resize(int64_t new_size) override;
-  void Reserve(int64_t new_capacity);
-  uint8_t& operator[](int64_t i);
-
- private:
-  // TODO: aligned allocations
-  MemoryAllocator* allocator_;
-
-  DISALLOW_COPY_AND_ASSIGN(OwnedMutableBuffer);
-};
-
-template <class T>
-class Vector {
- public:
-  explicit Vector(int64_t size, MemoryAllocator* allocator);
-  void Resize(int64_t new_size);
-  void Reserve(int64_t new_capacity);
-  void Assign(int64_t size, const T val);
-  void Swap(Vector<T>& v);
-  inline T& operator[](int64_t i) const { return data_[i]; }
-
- private:
-  std::unique_ptr<OwnedMutableBuffer> buffer_;
-  int64_t size_;
-  int64_t capacity_;
-  T* data_;
-
-  DISALLOW_COPY_AND_ASSIGN(Vector);
-};
-
-}  // namespace parquet
-
-#endif  // PARQUET_UTIL_BUFFER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input-output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input-output-test.cc b/src/parquet/util/input-output-test.cc
deleted file mode 100644
index 72aad9c..0000000
--- a/src/parquet/util/input-output-test.cc
+++ /dev/null
@@ -1,244 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include <cstdint>
-#include <cstdio>
-#include <fstream>
-#include <iostream>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "parquet/exception.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/input.h"
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/output.h"
-#include "parquet/util/test-common.h"
-
-namespace parquet {
-
-TEST(TestBufferedInputStream, Basics) {
-  int64_t source_size = 256;
-  int64_t stream_offset = 10;
-  int64_t stream_size = source_size - stream_offset;
-  int64_t chunk_size = 50;
-  auto buf = std::make_shared<OwnedMutableBuffer>(source_size);
-  ASSERT_EQ(source_size, buf->size());
-  for (int i = 0; i < source_size; i++) {
-    buf->mutable_data()[i] = i;
-  }
-
-  std::unique_ptr<BufferReader> source(new BufferReader(buf));
-  std::unique_ptr<MemoryAllocator> allocator(new TrackingAllocator());
-  std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream(
-      allocator.get(), chunk_size, source.get(), stream_offset, stream_size));
-
-  const uint8_t* output;
-  int64_t bytes_read;
-
-  // source is at offset 10
-  output = stream->Peek(10, &bytes_read);
-  ASSERT_EQ(10, bytes_read);
-  for (int i = 0; i < 10; i++) {
-    ASSERT_EQ(10 + i, output[i]) << i;
-  }
-  output = stream->Read(10, &bytes_read);
-  ASSERT_EQ(10, bytes_read);
-  for (int i = 0; i < 10; i++) {
-    ASSERT_EQ(10 + i, output[i]) << i;
-  }
-  output = stream->Read(10, &bytes_read);
-  ASSERT_EQ(10, bytes_read);
-  for (int i = 0; i < 10; i++) {
-    ASSERT_EQ(20 + i, output[i]) << i;
-  }
-  stream->Advance(5);
-  stream->Advance(5);
-  // source is at offset 40
-  // read across buffer boundary. buffer size is 50
-  output = stream->Read(20, &bytes_read);
-  ASSERT_EQ(20, bytes_read);
-  for (int i = 0; i < 20; i++) {
-    ASSERT_EQ(40 + i, output[i]) << i;
-  }
-  // read more than original chunk_size
-  output = stream->Read(60, &bytes_read);
-  ASSERT_EQ(60, bytes_read);
-  for (int i = 0; i < 60; i++) {
-    ASSERT_EQ(60 + i, output[i]) << i;
-  }
-
-  stream->Advance(120);
-  // source is at offset 240
-  // read outside of source boundary. source size is 256
-  output = stream->Read(30, &bytes_read);
-  ASSERT_EQ(16, bytes_read);
-  for (int i = 0; i < 16; i++) {
-    ASSERT_EQ(240 + i, output[i]) << i;
-  }
-}
-
-TEST(TestInMemoryOutputStream, Basics) {
-  std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
-
-  std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
-
-  stream->Write(&data[0], 4);
-  ASSERT_EQ(4, stream->Tell());
-  stream->Write(&data[4], data.size() - 4);
-
-  std::shared_ptr<Buffer> buffer = stream->GetBuffer();
-
-  Buffer data_buf(data.data(), data.size());
-
-  ASSERT_TRUE(data_buf.Equals(*buffer));
-}
-
-TEST(TestBufferedReader, Basics) {
-  std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
-  auto buffer = std::make_shared<Buffer>(data.data(), data.size());
-  BufferReader reader(buffer);
-
-  uint8_t out[4];
-  ASSERT_EQ(4, reader.Read(4, out));
-  ASSERT_EQ(4, reader.Tell());
-  ASSERT_EQ(0, out[0]);
-  ASSERT_EQ(1, out[1]);
-  ASSERT_EQ(2, out[2]);
-  ASSERT_EQ(3, out[3]);
-
-  reader.Seek(8);
-  ASSERT_EQ(8, reader.Tell());
-
-  auto out_buffer = reader.Read(5);
-  ASSERT_EQ(8, out_buffer->data()[0]);
-  ASSERT_EQ(9, out_buffer->data()[1]);
-  ASSERT_EQ(10, out_buffer->data()[2]);
-  ASSERT_EQ(11, out_buffer->data()[3]);
-  ASSERT_EQ(12, out_buffer->data()[4]);
-
-  // Read past the end of the buffer
-  ASSERT_EQ(13, reader.Tell());
-  ASSERT_EQ(0, reader.Read(4, out));
-  ASSERT_EQ(0, reader.Read(4)->size());
-
-  reader.Close();
-}
-
-static bool file_exists(const std::string& path) {
-  return std::ifstream(path.c_str()).good();
-}
-
-template <typename ReaderType>
-class TestFileReaders : public ::testing::Test {
- public:
-  void SetUp() {
-    test_path_ = "parquet-input-output-test.txt";
-    if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
-    test_data_ = "testingdata";
-
-    std::ofstream stream;
-    stream.open(test_path_.c_str());
-    stream << test_data_;
-    filesize_ = test_data_.size();
-  }
-
-  void TearDown() { DeleteTestFile(); }
-
-  void DeleteTestFile() {
-    if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
-  }
-
- protected:
-  ReaderType source;
-  std::string test_path_;
-  std::string test_data_;
-  int filesize_;
-};
-
-typedef ::testing::Types<LocalFileSource, MemoryMapSource> ReaderTypes;
-
-TYPED_TEST_CASE(TestFileReaders, ReaderTypes);
-
-TYPED_TEST(TestFileReaders, NonExistentFile) {
-  ASSERT_THROW(this->source.Open("0xDEADBEEF.txt"), ParquetException);
-}
-
-TYPED_TEST(TestFileReaders, Read) {
-  this->source.Open(this->test_path_);
-
-  ASSERT_EQ(this->filesize_, this->source.Size());
-
-  std::shared_ptr<Buffer> buffer = this->source.Read(4);
-  ASSERT_EQ(4, buffer->size());
-  ASSERT_EQ(0, memcmp(this->test_data_.c_str(), buffer->data(), 4));
-
-  // Read past EOF
-  buffer = this->source.Read(10);
-  ASSERT_EQ(7, buffer->size());
-  ASSERT_EQ(0, memcmp(this->test_data_.c_str() + 4, buffer->data(), 7));
-}
-
-TYPED_TEST(TestFileReaders, FileDisappeared) {
-  this->source.Open(this->test_path_);
-  this->source.Seek(4);
-  this->DeleteTestFile();
-  this->source.Close();
-}
-
-TYPED_TEST(TestFileReaders, BadSeek) {
-  this->source.Open(this->test_path_);
-  ASSERT_THROW(this->source.Seek(this->filesize_ + 1), ParquetException);
-}
-
-class TestFileWriter : public ::testing::Test {
- public:
-  void SetUp() {
-    test_path_ = "parquet-input-output-test.txt";
-    if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
-  }
-
-  void TearDown() { DeleteTestFile(); }
-
-  void DeleteTestFile() {
-    if (file_exists(test_path_)) { std::remove(test_path_.c_str()); }
-  }
-
- protected:
-  std::string test_path_;
-  uint8_t test_data_[4] = {1, 2, 3, 4};
-};
-
-TEST_F(TestFileWriter, Write) {
-  LocalFileOutputStream sink(test_path_);
-  ASSERT_EQ(0, sink.Tell());
-  sink.Write(test_data_, 4);
-  ASSERT_EQ(4, sink.Tell());
-  sink.Close();
-
-  // Check that the correct content was written
-  LocalFileSource source;
-  source.Open(test_path_);
-  std::shared_ptr<Buffer> buffer = source.Read(4);
-  ASSERT_EQ(4, buffer->size());
-  ASSERT_EQ(0, memcmp(test_data_, buffer->data(), 4));
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
deleted file mode 100644
index 127b90c..0000000
--- a/src/parquet/util/input.cc
+++ /dev/null
@@ -1,285 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/util/input.h"
-
-#include <algorithm>
-#include <sstream>
-#include <string>
-#include <sys/mman.h>
-
-#include "parquet/exception.h"
-#include "parquet/util/buffer.h"
-#include "parquet/util/logging.h"
-
-namespace parquet {
-
-// ----------------------------------------------------------------------
-// RandomAccessSource
-
-std::shared_ptr<Buffer> RandomAccessSource::ReadAt(int64_t pos, int64_t nbytes) {
-  Seek(pos);
-  return Read(nbytes);
-}
-
-int64_t RandomAccessSource::Size() const {
-  return size_;
-}
-
-// ----------------------------------------------------------------------
-// LocalFileSource
-
-LocalFileSource::~LocalFileSource() {
-  CloseFile();
-}
-
-void LocalFileSource::Open(const std::string& path) {
-  path_ = path;
-  file_ = fopen(path_.c_str(), "rb");
-  if (file_ == nullptr || ferror(file_)) {
-    std::stringstream ss;
-    ss << "Unable to open file: " << path;
-    throw ParquetException(ss.str());
-  }
-  is_open_ = true;
-  SeekFile(0, SEEK_END);
-  size_ = LocalFileSource::Tell();
-  Seek(0);
-}
-
-void LocalFileSource::SeekFile(int64_t pos, int origin) {
-  if (origin == SEEK_SET && (pos < 0 || pos >= size_)) {
-    std::stringstream ss;
-    ss << "Position " << pos << " is not in range.";
-    throw ParquetException(ss.str());
-  }
-
-  if (0 != fseek(file_, pos, origin)) {
-    std::stringstream ss;
-    ss << "File seek to position " << pos << " failed.";
-    throw ParquetException(ss.str());
-  }
-}
-
-void LocalFileSource::Close() {
-  // Pure virtual
-  CloseFile();
-}
-
-void LocalFileSource::CloseFile() {
-  if (is_open_) {
-    fclose(file_);
-    is_open_ = false;
-  }
-}
-
-void LocalFileSource::Seek(int64_t pos) {
-  SeekFile(pos);
-}
-
-int64_t LocalFileSource::Tell() const {
-  int64_t position = ftell(file_);
-  if (position < 0) { throw ParquetException("ftell failed, did the file disappear?"); }
-  return position;
-}
-
-int LocalFileSource::file_descriptor() const {
-  return fileno(file_);
-}
-
-int64_t LocalFileSource::Read(int64_t nbytes, uint8_t* buffer) {
-  return fread(buffer, 1, nbytes, file_);
-}
-
-std::shared_ptr<Buffer> LocalFileSource::Read(int64_t nbytes) {
-  auto result = std::make_shared<OwnedMutableBuffer>(0, allocator_);
-  result->Resize(nbytes);
-
-  int64_t bytes_read = Read(nbytes, result->mutable_data());
-  if (bytes_read < nbytes) { result->Resize(bytes_read); }
-  return result;
-}
-// ----------------------------------------------------------------------
-// MemoryMapSource methods
-
-MemoryMapSource::~MemoryMapSource() {
-  CloseFile();
-}
-
-void MemoryMapSource::Open(const std::string& path) {
-  LocalFileSource::Open(path);
-  data_ = reinterpret_cast<uint8_t*>(
-      mmap(nullptr, size_, PROT_READ, MAP_SHARED, fileno(file_), 0));
-  if (data_ == nullptr) { throw ParquetException("Memory mapping file failed"); }
-  pos_ = 0;
-}
-
-void MemoryMapSource::Close() {
-  // Pure virtual
-  CloseFile();
-}
-
-void MemoryMapSource::CloseFile() {
-  if (data_ != nullptr) {
-    munmap(data_, size_);
-    data_ = nullptr;
-  }
-
-  LocalFileSource::CloseFile();
-}
-
-void MemoryMapSource::Seek(int64_t pos) {
-  if (pos < 0 || pos >= size_) {
-    std::stringstream ss;
-    ss << "Position " << pos << " is not in range.";
-    throw ParquetException(ss.str());
-  }
-
-  pos_ = pos;
-}
-
-int64_t MemoryMapSource::Tell() const {
-  return pos_;
-}
-
-int64_t MemoryMapSource::Read(int64_t nbytes, uint8_t* buffer) {
-  int64_t bytes_available = std::min(nbytes, size_ - pos_);
-  memcpy(buffer, data_ + pos_, bytes_available);
-  pos_ += bytes_available;
-  return bytes_available;
-}
-
-std::shared_ptr<Buffer> MemoryMapSource::Read(int64_t nbytes) {
-  int64_t bytes_available = std::min(nbytes, size_ - pos_);
-  auto result = std::make_shared<Buffer>(data_ + pos_, bytes_available);
-  pos_ += bytes_available;
-  return result;
-}
-
-// ----------------------------------------------------------------------
-// BufferReader
-
-BufferReader::BufferReader(const std::shared_ptr<Buffer>& buffer)
-    : buffer_(buffer), data_(buffer->data()), pos_(0) {
-  size_ = buffer->size();
-}
-
-int64_t BufferReader::Tell() const {
-  return pos_;
-}
-
-void BufferReader::Seek(int64_t pos) {
-  if (pos < 0 || pos >= size_) {
-    std::stringstream ss;
-    ss << "Cannot seek to " << pos << "File is length " << size_;
-    throw ParquetException(ss.str());
-  }
-  pos_ = pos;
-}
-
-int64_t BufferReader::Read(int64_t nbytes, uint8_t* out) {
-  int64_t bytes_available = std::min(nbytes, size_ - pos_);
-  memcpy(out, Head(), bytes_available);
-  pos_ += bytes_available;
-  return bytes_available;
-}
-
-std::shared_ptr<Buffer> BufferReader::Read(int64_t nbytes) {
-  int64_t bytes_available = std::min(nbytes, size_ - pos_);
-  auto result = std::make_shared<Buffer>(Head(), bytes_available);
-  pos_ += bytes_available;
-  return result;
-}
-
-// ----------------------------------------------------------------------
-// InMemoryInputStream
-
-InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer)
-    : buffer_(buffer), offset_(0) {
-  len_ = buffer_->size();
-}
-
-InMemoryInputStream::InMemoryInputStream(
-    RandomAccessSource* source, int64_t start, int64_t num_bytes)
-    : offset_(0) {
-  buffer_ = source->ReadAt(start, num_bytes);
-  if (buffer_->size() < num_bytes) {
-    throw ParquetException("Unable to read column chunk data");
-  }
-  len_ = buffer_->size();
-}
-
-const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
-  *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
-  return buffer_->data() + offset_;
-}
-
-const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
-  const uint8_t* result = Peek(num_to_read, num_bytes);
-  offset_ += *num_bytes;
-  return result;
-}
-
-void InMemoryInputStream::Advance(int64_t num_bytes) {
-  offset_ += num_bytes;
-}
-
-// ----------------------------------------------------------------------
-// BufferedInputStream
-BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
-    RandomAccessSource* source, int64_t start, int64_t num_bytes)
-    : source_(source), stream_offset_(start), stream_end_(start + num_bytes) {
-  buffer_ = std::make_shared<OwnedMutableBuffer>(buffer_size, pool);
-  buffer_size_ = buffer_->size();
-  // Required to force a lazy read
-  buffer_offset_ = buffer_size_;
-}
-
-const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
-  *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_);
-  // increase the buffer size if needed
-  if (*num_bytes > buffer_size_) {
-    buffer_->Resize(*num_bytes);
-    buffer_size_ = buffer_->size();
-    DCHECK(buffer_size_ >= *num_bytes);
-  }
-  // Read more data when buffer has insufficient left or when resized
-  if (*num_bytes > (buffer_size_ - buffer_offset_)) {
-    source_->Seek(stream_offset_);
-    buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_);
-    int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data());
-    if (bytes_read < *num_bytes) {
-      throw ParquetException("Failed reading column data from source");
-    }
-    buffer_offset_ = 0;
-  }
-  return buffer_->data() + buffer_offset_;
-}
-
-const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
-  const uint8_t* result = Peek(num_to_read, num_bytes);
-  stream_offset_ += *num_bytes;
-  buffer_offset_ += *num_bytes;
-  return result;
-}
-
-void BufferedInputStream::Advance(int64_t num_bytes) {
-  stream_offset_ += num_bytes;
-  buffer_offset_ += num_bytes;
-}
-
-}  // namespace parquet