You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by we...@apache.org on 2016/12/30 16:36:36 UTC

[2/4] parquet-cpp git commit: PARQUET-818: Refactoring to utilize common IO, buffer, memory management abstractions and implementations

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
deleted file mode 100644
index 1bb41e3..0000000
--- a/src/parquet/util/input.h
+++ /dev/null
@@ -1,211 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_UTIL_INPUT_H
-#define PARQUET_UTIL_INPUT_H
-
-#include <cstdint>
-#include <cstdio>
-#include <memory>
-#include <string>
-#include <vector>
-
-#include "parquet/util/mem-allocator.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-class Buffer;
-class OwnedMutableBuffer;
-
-// ----------------------------------------------------------------------
-// Random access input (e.g. file-like)
-
-// Random
-class PARQUET_EXPORT RandomAccessSource {
- public:
-  virtual ~RandomAccessSource() {}
-
-  virtual void Close() = 0;
-  virtual int64_t Tell() const = 0;
-  virtual void Seek(int64_t pos) = 0;
-  int64_t Size() const;
-
-  // Returns actual number of bytes read
-  virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
-
-  virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;
-  std::shared_ptr<Buffer> ReadAt(int64_t pos, int64_t nbytes);
-
- protected:
-  int64_t size_;
-};
-
-// ----------------------------------------------------------------------
-// Implementations of RandomAccessSource used for testing and internal CLI tools.
-// May not be sufficiently robust for general production use.
-
-class PARQUET_EXPORT LocalFileSource : public RandomAccessSource {
- public:
-  explicit LocalFileSource(MemoryAllocator* allocator = default_allocator())
-      : file_(nullptr), is_open_(false), allocator_(allocator) {}
-
-  virtual ~LocalFileSource();
-
-  virtual void Open(const std::string& path);
-
-  virtual void Close();
-  virtual int64_t Tell() const;
-  virtual void Seek(int64_t pos);
-
-  // Returns actual number of bytes read
-  virtual int64_t Read(int64_t nbytes, uint8_t* out);
-
-  virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
-
-  bool is_open() const { return is_open_; }
-  const std::string& path() const { return path_; }
-
-  // Return the integer file descriptor
-  int file_descriptor() const;
-
- protected:
-  void CloseFile();
-  void SeekFile(int64_t pos, int origin = SEEK_SET);
-
-  std::string path_;
-  FILE* file_;
-  bool is_open_;
-  MemoryAllocator* allocator_;
-};
-
-class PARQUET_EXPORT MemoryMapSource : public LocalFileSource {
- public:
-  explicit MemoryMapSource(MemoryAllocator* allocator = default_allocator())
-      : LocalFileSource(allocator), data_(nullptr), pos_(0) {}
-
-  virtual ~MemoryMapSource();
-
-  virtual void Close();
-  virtual void Open(const std::string& path);
-
-  virtual int64_t Tell() const;
-  virtual void Seek(int64_t pos);
-
-  // Copy data from memory map into out (must be already allocated memory)
-  // @returns: actual number of bytes read
-  virtual int64_t Read(int64_t nbytes, uint8_t* out);
-
-  // Return a buffer referencing memory-map (no copy)
-  virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
-
- private:
-  void CloseFile();
-
-  uint8_t* data_;
-  int64_t pos_;
-};
-
-// ----------------------------------------------------------------------
-// A file-like object that reads from virtual address space
-
-class PARQUET_EXPORT BufferReader : public RandomAccessSource {
- public:
-  explicit BufferReader(const std::shared_ptr<Buffer>& buffer);
-  virtual void Close() {}
-  virtual int64_t Tell() const;
-  virtual void Seek(int64_t pos);
-
-  virtual int64_t Read(int64_t nbytes, uint8_t* out);
-
-  virtual std::shared_ptr<Buffer> Read(int64_t nbytes);
-
- protected:
-  const uint8_t* Head() { return data_ + pos_; }
-
-  std::shared_ptr<Buffer> buffer_;
-  const uint8_t* data_;
-  int64_t pos_;
-};
-
-// ----------------------------------------------------------------------
-// Streaming input interfaces
-
-// Interface for the column reader to get the bytes. The interface is a stream
-// interface, meaning the bytes in order and once a byte is read, it does not
-// need to be read again.
-class InputStream {
- public:
-  // Returns the next 'num_to_peek' without advancing the current position.
-  // *num_bytes will contain the number of bytes returned which can only be
-  // less than num_to_peek at end of stream cases.
-  // Since the position is not advanced, calls to this function are idempotent.
-  // The buffer returned to the caller is still owned by the input stream and must
-  // stay valid until the next call to Peek() or Read().
-  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0;
-
-  // Identical to Peek(), except the current position in the stream is advanced by
-  // *num_bytes.
-  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;
-
-  // Advance the stream without reading
-  virtual void Advance(int64_t num_bytes) = 0;
-
-  virtual ~InputStream() {}
-
- protected:
-  InputStream() {}
-};
-
-// Implementation of an InputStream when all the bytes are in memory.
-class InMemoryInputStream : public InputStream {
- public:
-  InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end);
-  explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer);
-  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
-  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
-
-  virtual void Advance(int64_t num_bytes);
-
- private:
-  std::shared_ptr<Buffer> buffer_;
-  int64_t len_;
-  int64_t offset_;
-};
-
-// Implementation of an InputStream when only some of the bytes are in memory.
-class BufferedInputStream : public InputStream {
- public:
-  BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
-      RandomAccessSource* source, int64_t start, int64_t end);
-  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
-  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
-
-  virtual void Advance(int64_t num_bytes);
-
- private:
-  std::shared_ptr<OwnedMutableBuffer> buffer_;
-  RandomAccessSource* source_;
-  int64_t stream_offset_;
-  int64_t stream_end_;
-  int64_t buffer_offset_;
-  int64_t buffer_size_;
-};
-
-}  // namespace parquet
-
-#endif  // PARQUET_UTIL_INPUT_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator-test.cc b/src/parquet/util/mem-allocator-test.cc
deleted file mode 100644
index 336d3b4..0000000
--- a/src/parquet/util/mem-allocator-test.cc
+++ /dev/null
@@ -1,67 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include <gtest/gtest.h>
-
-#include "parquet/exception.h"
-#include "parquet/util/mem-allocator.h"
-
-namespace parquet {
-
-TEST(TestAllocator, AllocateFree) {
-  TrackingAllocator allocator;
-
-  uint8_t* data = allocator.Malloc(100);
-  ASSERT_TRUE(nullptr != data);
-  data[99] = 55;
-  allocator.Free(data, 100);
-
-  data = allocator.Malloc(0);
-  ASSERT_EQ(nullptr, data);
-  allocator.Free(data, 0);
-
-  data = allocator.Malloc(1);
-  ASSERT_THROW(allocator.Free(data, 2), ParquetException);
-  ASSERT_NO_THROW(allocator.Free(data, 1));
-
-  int64_t to_alloc = std::numeric_limits<int64_t>::max();
-  ASSERT_THROW(allocator.Malloc(to_alloc), ParquetException);
-}
-
-TEST(TestAllocator, TotalMax) {
-  TrackingAllocator allocator;
-  ASSERT_EQ(0, allocator.TotalMemory());
-  ASSERT_EQ(0, allocator.MaxMemory());
-
-  uint8_t* data = allocator.Malloc(100);
-  ASSERT_EQ(100, allocator.TotalMemory());
-  ASSERT_EQ(100, allocator.MaxMemory());
-
-  uint8_t* data2 = allocator.Malloc(10);
-  ASSERT_EQ(110, allocator.TotalMemory());
-  ASSERT_EQ(110, allocator.MaxMemory());
-
-  allocator.Free(data, 100);
-  ASSERT_EQ(10, allocator.TotalMemory());
-  ASSERT_EQ(110, allocator.MaxMemory());
-
-  allocator.Free(data2, 10);
-  ASSERT_EQ(0, allocator.TotalMemory());
-  ASSERT_EQ(110, allocator.MaxMemory());
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator.cc b/src/parquet/util/mem-allocator.cc
deleted file mode 100644
index 2b6592d..0000000
--- a/src/parquet/util/mem-allocator.cc
+++ /dev/null
@@ -1,61 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/util/mem-allocator.h"
-
-#include <cstdlib>
-
-#include "parquet/exception.h"
-
-namespace parquet {
-
-MemoryAllocator::~MemoryAllocator() {}
-
-uint8_t* TrackingAllocator::Malloc(int64_t size) {
-  if (0 == size) { return nullptr; }
-
-  uint8_t* p = static_cast<uint8_t*>(std::malloc(size));
-  if (!p) { throw ParquetException("OOM: memory allocation failed"); }
-  {
-    std::lock_guard<std::mutex> lock(stats_mutex_);
-    total_memory_ += size;
-    if (total_memory_ > max_memory_) { max_memory_ = total_memory_; }
-  }
-  return p;
-}
-
-void TrackingAllocator::Free(uint8_t* p, int64_t size) {
-  if (nullptr != p && size > 0) {
-    {
-      std::lock_guard<std::mutex> lock(stats_mutex_);
-      if (total_memory_ < size) {
-        throw ParquetException("Attempting to free too much memory");
-      }
-      total_memory_ -= size;
-    }
-    std::free(p);
-  }
-}
-
-TrackingAllocator::~TrackingAllocator() {}
-
-MemoryAllocator* default_allocator() {
-  static TrackingAllocator default_allocator;
-  return &default_allocator;
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-allocator.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-allocator.h b/src/parquet/util/mem-allocator.h
deleted file mode 100644
index a0f3693..0000000
--- a/src/parquet/util/mem-allocator.h
+++ /dev/null
@@ -1,59 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_UTIL_MEMORY_POOL_H
-#define PARQUET_UTIL_MEMORY_POOL_H
-
-#include <cstdint>
-#include <mutex>
-
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-class PARQUET_EXPORT MemoryAllocator {
- public:
-  virtual ~MemoryAllocator();
-
-  // Returns nullptr if size is 0
-  virtual uint8_t* Malloc(int64_t size) = 0;
-  virtual void Free(uint8_t* p, int64_t size) = 0;
-};
-
-PARQUET_EXPORT MemoryAllocator* default_allocator();
-
-class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator {
- public:
-  TrackingAllocator() : total_memory_(0), max_memory_(0) {}
-  virtual ~TrackingAllocator();
-
-  uint8_t* Malloc(int64_t size) override;
-  void Free(uint8_t* p, int64_t size) override;
-
-  int64_t TotalMemory() { return total_memory_; }
-
-  int64_t MaxMemory() { return max_memory_; }
-
- private:
-  std::mutex stats_mutex_;
-  int64_t total_memory_;
-  int64_t max_memory_;
-};
-
-}  // namespace parquet
-
-#endif  // PARQUET_UTIL_MEMORY_POOL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool-test.cc b/src/parquet/util/mem-pool-test.cc
deleted file mode 100644
index 3f3424b..0000000
--- a/src/parquet/util/mem-pool-test.cc
+++ /dev/null
@@ -1,247 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Initially imported from Apache Impala on 2016-02-23, and has been modified
-// since for parquet-cpp
-
-#include <cstdint>
-#include <gtest/gtest.h>
-#include <limits>
-#include <string>
-
-#include "parquet/util/bit-util.h"
-#include "parquet/util/mem-pool.h"
-
-namespace parquet {
-
-// Utility class to call private functions on MemPool.
-class MemPoolTest {
- public:
-  static bool CheckIntegrity(MemPool* pool, bool current_chunk_empty) {
-    return pool->CheckIntegrity(current_chunk_empty);
-  }
-
-  static const int INITIAL_CHUNK_SIZE = MemPool::INITIAL_CHUNK_SIZE;
-  static const int MAX_CHUNK_SIZE = MemPool::MAX_CHUNK_SIZE;
-};
-
-const int MemPoolTest::INITIAL_CHUNK_SIZE;
-const int MemPoolTest::MAX_CHUNK_SIZE;
-
-TEST(MemPoolTest, Basic) {
-  MemPool p;
-  MemPool p2;
-  MemPool p3;
-
-  for (int iter = 0; iter < 2; ++iter) {
-    // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
-    for (int i = 0; i < 768; ++i) {
-      // pads to 32 bytes
-      p.Allocate(25);
-    }
-    // we handed back 24K
-    EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
-    // .. and allocated 28K of chunks (4, 8, 16)
-    EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());
-
-    // we're passing on the first two chunks, containing 12K of data; we're left with
-    // one chunk of 16K containing 12K of data
-    p2.AcquireData(&p, true);
-    EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
-    EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());
-
-    // we allocate 8K, for which there isn't enough room in the current chunk,
-    // so another one is allocated (32K)
-    p.Allocate(8 * 1024);
-    EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());
-
-    // we allocate 65K, which doesn't fit into the current chunk or the default
-    // size of the next allocated chunk (64K)
-    p.Allocate(65 * 1024);
-    EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
-    if (iter == 0) {
-      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
-    } else {
-      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    }
-    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
-    // Clear() resets allocated data, but doesn't remove any chunks
-    p.Clear();
-    EXPECT_EQ(0, p.total_allocated_bytes());
-    if (iter == 0) {
-      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
-    } else {
-      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    }
-    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
-    // next allocation reuses existing chunks
-    p.Allocate(1024);
-    EXPECT_EQ(1024, p.total_allocated_bytes());
-    if (iter == 0) {
-      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
-    } else {
-      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    }
-    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
-    // ... unless it doesn't fit into any available chunk
-    p.Allocate(120 * 1024);
-    EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
-    if (iter == 0) {
-      EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
-    } else {
-      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    }
-    EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
-    // ... Try another chunk that fits into an existing chunk
-    p.Allocate(33 * 1024);
-    EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
-    EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
-
-    // we're releasing 3 chunks, which get added to p2
-    p2.AcquireData(&p, false);
-    EXPECT_EQ(0, p.total_allocated_bytes());
-    EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
-    EXPECT_EQ(0, p.GetTotalChunkSizes());
-
-    p3.AcquireData(&p2, true);  // we're keeping the 65k chunk
-    EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
-    EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());
-
-    p.FreeAll();
-    p2.FreeAll();
-    p3.FreeAll();
-  }
-}
-
-// Test that we can keep an allocated chunk and a free chunk.
-// This case verifies that when chunks are acquired by another memory pool the
-// remaining chunks are consistent if there were more than one used chunk and some
-// free chunks.
-TEST(MemPoolTest, Keep) {
-  MemPool p;
-  p.Allocate(4 * 1024);
-  p.Allocate(8 * 1024);
-  p.Allocate(16 * 1024);
-  EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
-  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
-  p.Clear();
-  EXPECT_EQ(0, p.total_allocated_bytes());
-  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
-  p.Allocate(1 * 1024);
-  p.Allocate(4 * 1024);
-  EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
-  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
-
-  MemPool p2;
-  p2.AcquireData(&p, true);
-  EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
-  EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
-  EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
-  EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());
-
-  p.FreeAll();
-  p2.FreeAll();
-}
-
-// Tests that we can return partial allocations.
-TEST(MemPoolTest, ReturnPartial) {
-  MemPool p;
-  uint8_t* ptr = p.Allocate(1024);
-  EXPECT_EQ(1024, p.total_allocated_bytes());
-  memset(ptr, 0, 1024);
-  p.ReturnPartialAllocation(1024);
-
-  uint8_t* ptr2 = p.Allocate(1024);
-  EXPECT_EQ(1024, p.total_allocated_bytes());
-  EXPECT_TRUE(ptr == ptr2);
-  p.ReturnPartialAllocation(1016);
-
-  ptr2 = p.Allocate(1016);
-  EXPECT_EQ(1024, p.total_allocated_bytes());
-  EXPECT_TRUE(ptr2 == ptr + 8);
-  p.ReturnPartialAllocation(512);
-  memset(ptr2, 1, 1016 - 512);
-
-  uint8_t* ptr3 = p.Allocate(512);
-  EXPECT_EQ(1024, p.total_allocated_bytes());
-  EXPECT_TRUE(ptr3 == ptr + 512);
-  memset(ptr3, 2, 512);
-
-  for (int i = 0; i < 8; ++i) {
-    EXPECT_EQ(0, ptr[i]);
-  }
-  for (int i = 8; i < 512; ++i) {
-    EXPECT_EQ(1, ptr[i]);
-  }
-  for (int i = 512; i < 1024; ++i) {
-    EXPECT_EQ(2, ptr[i]);
-  }
-
-  p.FreeAll();
-}
-
-// Test that the MemPool overhead is bounded when we make allocations of
-// INITIAL_CHUNK_SIZE.
-TEST(MemPoolTest, MemoryOverhead) {
-  MemPool p;
-  const int alloc_size = MemPoolTest::INITIAL_CHUNK_SIZE;
-  const int num_allocs = 1000;
-  int64_t total_allocated = 0;
-
-  for (int i = 0; i < num_allocs; ++i) {
-    uint8_t* mem = p.Allocate(alloc_size);
-    ASSERT_TRUE(mem != NULL);
-    total_allocated += alloc_size;
-
-    int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
-    // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
-    // one empty chunk at the end.
-    EXPECT_LE(wasted_memory, MemPoolTest::MAX_CHUNK_SIZE);
-    // The chunk doubling algorithm should not allocate chunks larger than the total
-    // amount of memory already allocated.
-    EXPECT_LE(wasted_memory, total_allocated);
-  }
-
-  p.FreeAll();
-}
-
-// Test that the MemPool overhead is bounded when we make alternating large and small
-// allocations.
-TEST(MemPoolTest, FragmentationOverhead) {
-  MemPool p;
-  const int num_allocs = 100;
-  int64_t total_allocated = 0;
-
-  for (int i = 0; i < num_allocs; ++i) {
-    int alloc_size = i % 2 == 0 ? 1 : MemPoolTest::MAX_CHUNK_SIZE;
-    uint8_t* mem = p.Allocate(alloc_size);
-    ASSERT_TRUE(mem != NULL);
-    total_allocated += alloc_size;
-
-    int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
-    // Fragmentation should not waste more than half of each completed chunk.
-    EXPECT_LE(wasted_memory, total_allocated + MemPoolTest::MAX_CHUNK_SIZE);
-  }
-
-  p.FreeAll();
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.cc b/src/parquet/util/mem-pool.cc
deleted file mode 100644
index 1ab40bc..0000000
--- a/src/parquet/util/mem-pool.cc
+++ /dev/null
@@ -1,264 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Initially imported from Apache Impala on 2016-02-23, and has been modified
-// since for parquet-cpp
-
-#include "parquet/util/mem-pool.h"
-
-#include <stdio.h>
-
-#include <algorithm>
-#include <cstdint>
-#include <sstream>
-#include <string>
-
-#include "parquet/util/bit-util.h"
-#include "parquet/util/logging.h"
-
-namespace parquet {
-
-const int MemPool::INITIAL_CHUNK_SIZE;
-const int MemPool::MAX_CHUNK_SIZE;
-
-MemPool::MemPool(MemoryAllocator* allocator)
-    : current_chunk_idx_(-1),
-      next_chunk_size_(INITIAL_CHUNK_SIZE),
-      total_allocated_bytes_(0),
-      peak_allocated_bytes_(0),
-      total_reserved_bytes_(0),
-      allocator_(allocator) {}
-
-MemPool::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
-    : data(buf), size(size), allocated_bytes(0) {}
-
-MemPool::~MemPool() {
-  int64_t total_bytes_released = 0;
-  for (size_t i = 0; i < chunks_.size(); ++i) {
-    total_bytes_released += chunks_[i].size;
-    allocator_->Free(chunks_[i].data, chunks_[i].size);
-  }
-
-  DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
-}
-
-void MemPool::ReturnPartialAllocation(int byte_size) {
-  DCHECK_GE(byte_size, 0);
-  DCHECK(current_chunk_idx_ != -1);
-  ChunkInfo& info = chunks_[current_chunk_idx_];
-  DCHECK_GE(info.allocated_bytes, byte_size);
-  info.allocated_bytes -= byte_size;
-  total_allocated_bytes_ -= byte_size;
-}
-
-template <bool CHECK_LIMIT_FIRST>
-uint8_t* MemPool::Allocate(int size) {
-  if (size == 0) return NULL;
-
-  int64_t num_bytes = BitUtil::RoundUp(size, 8);
-  if (current_chunk_idx_ == -1 ||
-      num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
-          chunks_[current_chunk_idx_].size) {
-    // If we couldn't allocate a new chunk, return NULL.
-    if (UNLIKELY(!FindChunk(num_bytes))) return NULL;
-  }
-  ChunkInfo& info = chunks_[current_chunk_idx_];
-  uint8_t* result = info.data + info.allocated_bytes;
-  DCHECK_LE(info.allocated_bytes + num_bytes, info.size);
-  info.allocated_bytes += num_bytes;
-  total_allocated_bytes_ += num_bytes;
-  DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1);
-  peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
-  return result;
-}
-
-uint8_t* MemPool::Allocate(int size) {
-  return Allocate<false>(size);
-}
-
-void MemPool::Clear() {
-  current_chunk_idx_ = -1;
-  for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) {
-    chunk->allocated_bytes = 0;
-  }
-  total_allocated_bytes_ = 0;
-  DCHECK(CheckIntegrity(false));
-}
-
-void MemPool::FreeAll() {
-  int64_t total_bytes_released = 0;
-  for (size_t i = 0; i < chunks_.size(); ++i) {
-    total_bytes_released += chunks_[i].size;
-    allocator_->Free(chunks_[i].data, chunks_[i].size);
-  }
-  chunks_.clear();
-  next_chunk_size_ = INITIAL_CHUNK_SIZE;
-  current_chunk_idx_ = -1;
-  total_allocated_bytes_ = 0;
-  total_reserved_bytes_ = 0;
-}
-
-bool MemPool::FindChunk(int64_t min_size) {
-  // Try to allocate from a free chunk. The first free chunk, if any, will be immediately
-  // after the current chunk.
-  int first_free_idx = current_chunk_idx_ + 1;
-  // (cast size() to signed int in order to avoid everything else being cast to
-  // unsigned long, in particular -1)
-  while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) {
-    // we found a free chunk
-    DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0);
-
-    if (chunks_[current_chunk_idx_].size >= min_size) {
-      // This chunk is big enough.  Move it before the other free chunks.
-      if (current_chunk_idx_ != first_free_idx) {
-        std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]);
-        current_chunk_idx_ = first_free_idx;
-      }
-      break;
-    }
-  }
-
-  if (current_chunk_idx_ == static_cast<int>(chunks_.size())) {
-    // need to allocate new chunk.
-    int64_t chunk_size;
-    DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
-    DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE);
-
-    chunk_size = std::max<int64_t>(min_size, next_chunk_size_);
-
-    // Allocate a new chunk. Return early if malloc fails.
-    uint8_t* buf = allocator_->Malloc(chunk_size);
-    if (UNLIKELY(buf == NULL)) {
-      DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
-      current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
-      return false;
-    }
-
-    // If there are no free chunks put it at the end, otherwise before the first free.
-    if (first_free_idx == static_cast<int>(chunks_.size())) {
-      chunks_.push_back(ChunkInfo(chunk_size, buf));
-    } else {
-      current_chunk_idx_ = first_free_idx;
-      auto insert_chunk = chunks_.begin() + current_chunk_idx_;
-      chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf));
-    }
-    total_reserved_bytes_ += chunk_size;
-    // Don't increment the chunk size until the allocation succeeds: if an attempted
-    // large allocation fails we don't want to increase the chunk size further.
-    next_chunk_size_ =
-        static_cast<int>(std::min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE));
-  }
-
-  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
-  DCHECK(CheckIntegrity(true));
-  return true;
-}
-
-void MemPool::AcquireData(MemPool* src, bool keep_current) {
-  DCHECK(src->CheckIntegrity(false));
-  int num_acquired_chunks;
-  if (keep_current) {
-    num_acquired_chunks = src->current_chunk_idx_;
-  } else if (src->GetFreeOffset() == 0) {
-    // nothing in the last chunk
-    num_acquired_chunks = src->current_chunk_idx_;
-  } else {
-    num_acquired_chunks = src->current_chunk_idx_ + 1;
-  }
-
-  if (num_acquired_chunks <= 0) {
-    if (!keep_current) src->FreeAll();
-    return;
-  }
-
-  auto end_chunk = src->chunks_.begin() + num_acquired_chunks;
-  int64_t total_transfered_bytes = 0;
-  for (auto i = src->chunks_.begin(); i != end_chunk; ++i) {
-    total_transfered_bytes += i->size;
-  }
-  src->total_reserved_bytes_ -= total_transfered_bytes;
-  total_reserved_bytes_ += total_transfered_bytes;
-
-  // insert new chunks after current_chunk_idx_
-  auto insert_chunk = chunks_.begin() + current_chunk_idx_ + 1;
-  chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk);
-  src->chunks_.erase(src->chunks_.begin(), end_chunk);
-  current_chunk_idx_ += num_acquired_chunks;
-
-  if (keep_current) {
-    src->current_chunk_idx_ = 0;
-    DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0);
-    total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset();
-    src->total_allocated_bytes_ = src->GetFreeOffset();
-  } else {
-    src->current_chunk_idx_ = -1;
-    total_allocated_bytes_ += src->total_allocated_bytes_;
-    src->total_allocated_bytes_ = 0;
-  }
-  peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
-
-  if (!keep_current) src->FreeAll();
-  DCHECK(CheckIntegrity(false));
-}
-
-std::string MemPool::DebugString() {
-  std::stringstream out;
-  char str[16];
-  out << "MemPool(#chunks=" << chunks_.size() << " [";
-  for (size_t i = 0; i < chunks_.size(); ++i) {
-    sprintf(str, "0x%lx=", reinterpret_cast<size_t>(chunks_[i].data));  // NOLINT
-    out << (i > 0 ? " " : "") << str << chunks_[i].size << "/"
-        << chunks_[i].allocated_bytes;
-  }
-  out << "] current_chunk=" << current_chunk_idx_
-      << " total_sizes=" << GetTotalChunkSizes()
-      << " total_alloc=" << total_allocated_bytes_ << ")";
-  return out.str();
-}
-
-int64_t MemPool::GetTotalChunkSizes() const {
-  int64_t result = 0;
-  for (size_t i = 0; i < chunks_.size(); ++i) {
-    result += chunks_[i].size;
-  }
-  return result;
-}
-
-bool MemPool::CheckIntegrity(bool current_chunk_empty) {
-  // check that current_chunk_idx_ points to the last chunk with allocated data
-  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
-  int64_t total_allocated = 0;
-  for (int i = 0; i < static_cast<int>(chunks_.size()); ++i) {
-    DCHECK_GT(chunks_[i].size, 0);
-    if (i < current_chunk_idx_) {
-      DCHECK_GT(chunks_[i].allocated_bytes, 0);
-    } else if (i == current_chunk_idx_) {
-      if (current_chunk_empty) {
-        DCHECK_EQ(chunks_[i].allocated_bytes, 0);
-      } else {
-        DCHECK_GT(chunks_[i].allocated_bytes, 0);
-      }
-    } else {
-      DCHECK_EQ(chunks_[i].allocated_bytes, 0);
-    }
-    total_allocated += chunks_[i].allocated_bytes;
-  }
-  DCHECK_EQ(total_allocated, total_allocated_bytes_);
-  return true;
-}
-
-}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/mem-pool.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/mem-pool.h b/src/parquet/util/mem-pool.h
deleted file mode 100644
index 5f6afa9..0000000
--- a/src/parquet/util/mem-pool.h
+++ /dev/null
@@ -1,179 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-// Initially imported from Apache Impala on 2016-02-23, and has been modified
-// since for parquet-cpp
-
-#ifndef PARQUET_UTIL_MEM_POOL_H
-#define PARQUET_UTIL_MEM_POOL_H
-
-#include <algorithm>
-#include <cstdint>
-#include <stdio.h>
-#include <string>
-#include <vector>
-
-#include "parquet/util/mem-allocator.h"
-
-namespace parquet {
-
-/// A MemPool maintains a list of memory chunks from which it allocates memory
-/// in response to Allocate() calls;
-/// Chunks stay around for the lifetime of the mempool or until they are passed on to
-/// another mempool.
-//
-/// An Allocate() call will attempt to allocate memory from the chunk that was most
-/// recently added; if that chunk doesn't have enough memory to
-/// satisfy the allocation request, the free chunks are searched for one that is
-/// big enough otherwise a new chunk is added to the list.
-/// The current_chunk_idx_ always points to the last chunk with allocated memory.
-/// In order to keep allocation overhead low, chunk sizes double with each new one
-/// added, until they hit a maximum size.
-//
-///     Example:
-///     MemPool* p = new MemPool();
-///     for (int i = 0; i < 1024; ++i) {
-/// returns 8-byte aligned memory (effectively 24 bytes):
-///       .. = p->Allocate(17);
-///     }
-/// at this point, 17K have been handed out in response to Allocate() calls and
-/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K)
-/// We track total and peak allocated bytes. At this point they would be the same:
-/// 28k bytes.  A call to Clear will return the allocated memory so
-/// total_allocate_bytes_
-/// becomes 0 while peak_allocate_bytes_ remains at 28k.
-///     p->Clear();
-/// the entire 1st chunk is returned:
-///     .. = p->Allocate(4 * 1024);
-/// 4K of the 2nd chunk are returned:
-///     .. = p->Allocate(4 * 1024);
-/// a new 20K chunk is created
-///     .. = p->Allocate(20 * 1024);
-//
-///      MemPool* p2 = new MemPool();
-/// the new mempool receives all chunks containing data from p
-///      p2->AcquireData(p, false);
-/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_
-/// remains unchanged.
-/// The one remaining (empty) chunk is released:
-///    delete p;
-
-class MemPool {
- public:
-  explicit MemPool(MemoryAllocator* allocator = default_allocator());
-
-  /// Frees all chunks of memory and subtracts the total allocated bytes
-  /// from the registered limits.
-  ~MemPool();
-
-  /// Allocates 8-byte aligned section of memory of 'size' bytes at the end
-  /// of the the current chunk. Creates a new chunk if there aren't any chunks
-  /// with enough capacity.
-  uint8_t* Allocate(int size);
-
-  /// Returns 'byte_size' to the current chunk back to the mem pool. This can
-  /// only be used to return either all or part of the previous allocation returned
-  /// by Allocate().
-  void ReturnPartialAllocation(int byte_size);
-
-  /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
-  void Clear();
-
-  /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for
-  /// each mem pool
-  void FreeAll();
-
-  /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on
-  /// to its last allocated chunk that contains data.
-  /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid.
-  void AcquireData(MemPool* src, bool keep_current);
-
-  std::string DebugString();
-
-  int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
-  int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
-  int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
-
-  /// Return sum of chunk_sizes_.
-  int64_t GetTotalChunkSizes() const;
-
- private:
-  friend class MemPoolTest;
-  static const int INITIAL_CHUNK_SIZE = 4 * 1024;
-
-  /// The maximum size of chunk that should be allocated. Allocations larger than this
-  /// size will get their own individual chunk.
-  static const int MAX_CHUNK_SIZE = 1024 * 1024;
-
-  struct ChunkInfo {
-    uint8_t* data;  // Owned by the ChunkInfo.
-    int64_t size;   // in bytes
-
-    /// bytes allocated via Allocate() in this chunk
-    int64_t allocated_bytes;
-
-    explicit ChunkInfo(int64_t size, uint8_t* buf);
-
-    ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {}
-  };
-
-  /// chunk from which we served the last Allocate() call;
-  /// always points to the last chunk that contains allocated data;
-  /// chunks 0..current_chunk_idx_ are guaranteed to contain data
-  /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_);
-  /// -1 if no chunks present
-  int current_chunk_idx_;
-
-  /// The size of the next chunk to allocate.
-  int64_t next_chunk_size_;
-
-  /// sum of allocated_bytes_
-  int64_t total_allocated_bytes_;
-
-  /// Maximum number of bytes allocated from this pool at one time.
-  int64_t peak_allocated_bytes_;
-
-  /// sum of all bytes allocated in chunks_
-  int64_t total_reserved_bytes_;
-
-  std::vector<ChunkInfo> chunks_;
-
-  MemoryAllocator* allocator_;
-
-  /// Find or allocated a chunk with at least min_size spare capacity and update
-  /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
-  /// if a new chunk needs to be created.
-  bool FindChunk(int64_t min_size);
-
-  /// Check integrity of the supporting data structures; always returns true but DCHECKs
-  /// all invariants.
-  /// If 'current_chunk_empty' is false, checks that the current chunk contains data.
-  bool CheckIntegrity(bool current_chunk_empty);
-
-  /// Return offset to unoccpied space in current chunk.
-  int GetFreeOffset() const {
-    if (current_chunk_idx_ == -1) return 0;
-    return chunks_[current_chunk_idx_].allocated_bytes;
-  }
-
-  template <bool CHECK_LIMIT_FIRST>
-  uint8_t* Allocate(int size);
-};
-
-}  // namespace parquet
-
-#endif  // PARQUET_UTIL_MEM_POOL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory-test.cc b/src/parquet/util/memory-test.cc
new file mode 100644
index 0000000..45aa819
--- /dev/null
+++ b/src/parquet/util/memory-test.cc
@@ -0,0 +1,385 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdio>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "parquet/exception.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet {
+
+class TestBuffer : public ::testing::Test {};
+
+TEST(TestAllocator, AllocateFree) {
+  TrackingAllocator allocator;
+
+  uint8_t* data;
+
+  ASSERT_TRUE(allocator.Allocate(100, &data).ok());
+  ASSERT_TRUE(nullptr != data);
+  data[99] = 55;
+  allocator.Free(data, 100);
+
+  ASSERT_TRUE(allocator.Allocate(0, &data).ok());
+  ASSERT_EQ(nullptr, data);
+  allocator.Free(data, 0);
+
+  int64_t to_alloc = std::numeric_limits<int64_t>::max();
+  ASSERT_FALSE(allocator.Allocate(to_alloc, &data).ok());
+}
+
+TEST(TestAllocator, TotalMax) {
+  TrackingAllocator allocator;
+  ASSERT_EQ(0, allocator.bytes_allocated());
+  ASSERT_EQ(0, allocator.max_memory());
+
+  uint8_t* data;
+  uint8_t* data2;
+  ASSERT_TRUE(allocator.Allocate(100, &data).ok());
+  ASSERT_EQ(100, allocator.bytes_allocated());
+  ASSERT_EQ(100, allocator.max_memory());
+
+  ASSERT_TRUE(allocator.Allocate(10, &data2).ok());
+  ASSERT_EQ(110, allocator.bytes_allocated());
+  ASSERT_EQ(110, allocator.max_memory());
+
+  allocator.Free(data, 100);
+  ASSERT_EQ(10, allocator.bytes_allocated());
+  ASSERT_EQ(110, allocator.max_memory());
+
+  allocator.Free(data2, 10);
+  ASSERT_EQ(0, allocator.bytes_allocated());
+  ASSERT_EQ(110, allocator.max_memory());
+}
+
+// Utility class to call private functions on MemPool.
+class ChunkedAllocatorTest {
+ public:
+  static bool CheckIntegrity(ChunkedAllocator* pool, bool current_chunk_empty) {
+    return pool->CheckIntegrity(current_chunk_empty);
+  }
+
+  static const int INITIAL_CHUNK_SIZE = ChunkedAllocator::INITIAL_CHUNK_SIZE;
+  static const int MAX_CHUNK_SIZE = ChunkedAllocator::MAX_CHUNK_SIZE;
+};
+
+const int ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
+const int ChunkedAllocatorTest::MAX_CHUNK_SIZE;
+
+TEST(ChunkedAllocatorTest, Basic) {
+  ChunkedAllocator p;
+  ChunkedAllocator p2;
+  ChunkedAllocator p3;
+
+  for (int iter = 0; iter < 2; ++iter) {
+    // allocate a total of 24K in 32-byte pieces (for which we only request 25 bytes)
+    for (int i = 0; i < 768; ++i) {
+      // pads to 32 bytes
+      p.Allocate(25);
+    }
+    // we handed back 24K
+    EXPECT_EQ(24 * 1024, p.total_allocated_bytes());
+    // .. and allocated 28K of chunks (4, 8, 16)
+    EXPECT_EQ(28 * 1024, p.GetTotalChunkSizes());
+
+    // we're passing on the first two chunks, containing 12K of data; we're left with
+    // one chunk of 16K containing 12K of data
+    p2.AcquireData(&p, true);
+    EXPECT_EQ(12 * 1024, p.total_allocated_bytes());
+    EXPECT_EQ(16 * 1024, p.GetTotalChunkSizes());
+
+    // we allocate 8K, for which there isn't enough room in the current chunk,
+    // so another one is allocated (32K)
+    p.Allocate(8 * 1024);
+    EXPECT_EQ((16 + 32) * 1024, p.GetTotalChunkSizes());
+
+    // we allocate 65K, which doesn't fit into the current chunk or the default
+    // size of the next allocated chunk (64K)
+    p.Allocate(65 * 1024);
+    EXPECT_EQ((12 + 8 + 65) * 1024, p.total_allocated_bytes());
+    if (iter == 0) {
+      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+    } else {
+      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    }
+    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // Clear() resets allocated data, but doesn't remove any chunks
+    p.Clear();
+    EXPECT_EQ(0, p.total_allocated_bytes());
+    if (iter == 0) {
+      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+    } else {
+      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    }
+    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // next allocation reuses existing chunks
+    p.Allocate(1024);
+    EXPECT_EQ(1024, p.total_allocated_bytes());
+    if (iter == 0) {
+      EXPECT_EQ((12 + 8 + 65) * 1024, p.peak_allocated_bytes());
+    } else {
+      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    }
+    EXPECT_EQ((16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // ... unless it doesn't fit into any available chunk
+    p.Allocate(120 * 1024);
+    EXPECT_EQ((1 + 120) * 1024, p.total_allocated_bytes());
+    if (iter == 0) {
+      EXPECT_EQ((1 + 120) * 1024, p.peak_allocated_bytes());
+    } else {
+      EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    }
+    EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // ... Try another chunk that fits into an existing chunk
+    p.Allocate(33 * 1024);
+    EXPECT_EQ((1 + 120 + 33) * 1024, p.total_allocated_bytes());
+    EXPECT_EQ((130 + 16 + 32 + 65) * 1024, p.GetTotalChunkSizes());
+
+    // we're releasing 3 chunks, which get added to p2
+    p2.AcquireData(&p, false);
+    EXPECT_EQ(0, p.total_allocated_bytes());
+    EXPECT_EQ((1 + 120 + 33) * 1024, p.peak_allocated_bytes());
+    EXPECT_EQ(0, p.GetTotalChunkSizes());
+
+    p3.AcquireData(&p2, true);  // we're keeping the 65k chunk
+    EXPECT_EQ(33 * 1024, p2.total_allocated_bytes());
+    EXPECT_EQ(65 * 1024, p2.GetTotalChunkSizes());
+
+    p.FreeAll();
+    p2.FreeAll();
+    p3.FreeAll();
+  }
+}
+
+// Test that we can keep an allocated chunk and a free chunk.
+// This case verifies that when chunks are acquired by another memory pool the
+// remaining chunks are consistent if there were more than one used chunk and some
+// free chunks.
+TEST(ChunkedAllocatorTest, Keep) {
+  ChunkedAllocator p;
+  p.Allocate(4 * 1024);
+  p.Allocate(8 * 1024);
+  p.Allocate(16 * 1024);
+  EXPECT_EQ((4 + 8 + 16) * 1024, p.total_allocated_bytes());
+  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+  p.Clear();
+  EXPECT_EQ(0, p.total_allocated_bytes());
+  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+  p.Allocate(1 * 1024);
+  p.Allocate(4 * 1024);
+  EXPECT_EQ((1 + 4) * 1024, p.total_allocated_bytes());
+  EXPECT_EQ((4 + 8 + 16) * 1024, p.GetTotalChunkSizes());
+
+  ChunkedAllocator p2;
+  p2.AcquireData(&p, true);
+  EXPECT_EQ(4 * 1024, p.total_allocated_bytes());
+  EXPECT_EQ((8 + 16) * 1024, p.GetTotalChunkSizes());
+  EXPECT_EQ(1 * 1024, p2.total_allocated_bytes());
+  EXPECT_EQ(4 * 1024, p2.GetTotalChunkSizes());
+
+  p.FreeAll();
+  p2.FreeAll();
+}
+
+// Tests that we can return partial allocations.
+TEST(ChunkedAllocatorTest, ReturnPartial) {
+  ChunkedAllocator p;
+  uint8_t* ptr = p.Allocate(1024);
+  EXPECT_EQ(1024, p.total_allocated_bytes());
+  memset(ptr, 0, 1024);
+  p.ReturnPartialAllocation(1024);
+
+  uint8_t* ptr2 = p.Allocate(1024);
+  EXPECT_EQ(1024, p.total_allocated_bytes());
+  EXPECT_TRUE(ptr == ptr2);
+  p.ReturnPartialAllocation(1016);
+
+  ptr2 = p.Allocate(1016);
+  EXPECT_EQ(1024, p.total_allocated_bytes());
+  EXPECT_TRUE(ptr2 == ptr + 8);
+  p.ReturnPartialAllocation(512);
+  memset(ptr2, 1, 1016 - 512);
+
+  uint8_t* ptr3 = p.Allocate(512);
+  EXPECT_EQ(1024, p.total_allocated_bytes());
+  EXPECT_TRUE(ptr3 == ptr + 512);
+  memset(ptr3, 2, 512);
+
+  for (int i = 0; i < 8; ++i) {
+    EXPECT_EQ(0, ptr[i]);
+  }
+  for (int i = 8; i < 512; ++i) {
+    EXPECT_EQ(1, ptr[i]);
+  }
+  for (int i = 512; i < 1024; ++i) {
+    EXPECT_EQ(2, ptr[i]);
+  }
+
+  p.FreeAll();
+}
+
+// Test that the ChunkedAllocator overhead is bounded when we make allocations of
+// INITIAL_CHUNK_SIZE.
+TEST(ChunkedAllocatorTest, MemoryOverhead) {
+  ChunkedAllocator p;
+  const int alloc_size = ChunkedAllocatorTest::INITIAL_CHUNK_SIZE;
+  const int num_allocs = 1000;
+  int64_t total_allocated = 0;
+
+  for (int i = 0; i < num_allocs; ++i) {
+    uint8_t* mem = p.Allocate(alloc_size);
+    ASSERT_TRUE(mem != NULL);
+    total_allocated += alloc_size;
+
+    int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
+    // The initial chunk fits evenly into MAX_CHUNK_SIZE, so should have at most
+    // one empty chunk at the end.
+    EXPECT_LE(wasted_memory, ChunkedAllocatorTest::MAX_CHUNK_SIZE);
+    // The chunk doubling algorithm should not allocate chunks larger than the total
+    // amount of memory already allocated.
+    EXPECT_LE(wasted_memory, total_allocated);
+  }
+
+  p.FreeAll();
+}
+
+// Test that the ChunkedAllocator overhead is bounded when we make alternating
+// large and small allocations.
+TEST(ChunkedAllocatorTest, FragmentationOverhead) {
+  ChunkedAllocator p;
+  const int num_allocs = 100;
+  int64_t total_allocated = 0;
+
+  for (int i = 0; i < num_allocs; ++i) {
+    int alloc_size = i % 2 == 0 ? 1 : ChunkedAllocatorTest::MAX_CHUNK_SIZE;
+    uint8_t* mem = p.Allocate(alloc_size);
+    ASSERT_TRUE(mem != NULL);
+    total_allocated += alloc_size;
+
+    int64_t wasted_memory = p.GetTotalChunkSizes() - total_allocated;
+    // Fragmentation should not waste more than half of each completed chunk.
+    EXPECT_LE(wasted_memory, total_allocated + ChunkedAllocatorTest::MAX_CHUNK_SIZE);
+  }
+
+  p.FreeAll();
+}
+
+TEST(TestBufferedInputStream, Basics) {
+  int64_t source_size = 256;
+  int64_t stream_offset = 10;
+  int64_t stream_size = source_size - stream_offset;
+  int64_t chunk_size = 50;
+  std::shared_ptr<PoolBuffer> buf = AllocateBuffer(default_allocator(), source_size);
+  ASSERT_EQ(source_size, buf->size());
+  for (int i = 0; i < source_size; i++) {
+    buf->mutable_data()[i] = i;
+  }
+
+  auto wrapper =
+      std::make_shared<ArrowInputFile>(std::make_shared<::arrow::io::BufferReader>(buf));
+
+  TrackingAllocator allocator;
+  std::unique_ptr<BufferedInputStream> stream(new BufferedInputStream(
+      &allocator, chunk_size, wrapper.get(), stream_offset, stream_size));
+
+  const uint8_t* output;
+  int64_t bytes_read;
+
+  // source is at offset 10
+  output = stream->Peek(10, &bytes_read);
+  ASSERT_EQ(10, bytes_read);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_EQ(10 + i, output[i]) << i;
+  }
+  output = stream->Read(10, &bytes_read);
+  ASSERT_EQ(10, bytes_read);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_EQ(10 + i, output[i]) << i;
+  }
+  output = stream->Read(10, &bytes_read);
+  ASSERT_EQ(10, bytes_read);
+  for (int i = 0; i < 10; i++) {
+    ASSERT_EQ(20 + i, output[i]) << i;
+  }
+  stream->Advance(5);
+  stream->Advance(5);
+  // source is at offset 40
+  // read across buffer boundary. buffer size is 50
+  output = stream->Read(20, &bytes_read);
+  ASSERT_EQ(20, bytes_read);
+  for (int i = 0; i < 20; i++) {
+    ASSERT_EQ(40 + i, output[i]) << i;
+  }
+  // read more than original chunk_size
+  output = stream->Read(60, &bytes_read);
+  ASSERT_EQ(60, bytes_read);
+  for (int i = 0; i < 60; i++) {
+    ASSERT_EQ(60 + i, output[i]) << i;
+  }
+
+  stream->Advance(120);
+  // source is at offset 240
+  // read outside of source boundary. source size is 256
+  output = stream->Read(30, &bytes_read);
+  ASSERT_EQ(16, bytes_read);
+  for (int i = 0; i < 16; i++) {
+    ASSERT_EQ(240 + i, output[i]) << i;
+  }
+}
+
+TEST(TestArrowInputFile, Basics) {
+  std::string data = "this is the data";
+  auto data_buffer = reinterpret_cast<const uint8_t*>(data.c_str());
+
+  auto file = std::make_shared<::arrow::io::BufferReader>(data_buffer, data.size());
+  auto source = std::make_shared<ArrowInputFile>(file);
+
+  ASSERT_EQ(0, source->Tell());
+  ASSERT_NO_THROW(source->Seek(5));
+  ASSERT_EQ(5, source->Tell());
+  ASSERT_NO_THROW(source->Seek(0));
+
+  // Seek out of bounds
+  ASSERT_THROW(source->Seek(100), ParquetException);
+
+  uint8_t buffer[50];
+
+  ASSERT_NO_THROW(source->Read(4, buffer));
+  ASSERT_EQ(0, std::memcmp(buffer, "this", 4));
+  ASSERT_EQ(4, source->Tell());
+
+  std::shared_ptr<Buffer> pq_buffer;
+
+  ASSERT_NO_THROW(pq_buffer = source->Read(7));
+
+  auto expected_buffer = std::make_shared<Buffer>(data_buffer + 4, 7);
+
+  ASSERT_TRUE(expected_buffer->Equals(*pq_buffer.get()));
+}
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.cc b/src/parquet/util/memory.cc
new file mode 100644
index 0000000..9ad0336
--- /dev/null
+++ b/src/parquet/util/memory.cc
@@ -0,0 +1,543 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/memory.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdio>
+#include <string>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+#include "parquet/util/bit-util.h"
+#include "parquet/util/logging.h"
+
+namespace parquet {
+
+::arrow::Status TrackingAllocator::Allocate(int64_t size, uint8_t** out) {
+  if (0 == size) {
+    *out = nullptr;
+    return ::arrow::Status::OK();
+  }
+
+  uint8_t* p = static_cast<uint8_t*>(std::malloc(size));
+  if (!p) { return ::arrow::Status::OutOfMemory("memory allocation failed"); }
+  {
+    std::lock_guard<std::mutex> lock(stats_mutex_);
+    total_memory_ += size;
+    if (total_memory_ > max_memory_) { max_memory_ = total_memory_; }
+  }
+  *out = p;
+  return ::arrow::Status::OK();
+}
+
+void TrackingAllocator::Free(uint8_t* p, int64_t size) {
+  if (nullptr != p && size > 0) {
+    {
+      std::lock_guard<std::mutex> lock(stats_mutex_);
+      DCHECK_GE(total_memory_, size) << "Attempting to free too much memory";
+      total_memory_ -= size;
+    }
+    std::free(p);
+  }
+}
+
+MemoryAllocator* default_allocator() {
+  static TrackingAllocator allocator;
+  return &allocator;
+}
+
+template <class T>
+Vector<T>::Vector(int64_t size, MemoryAllocator* allocator)
+    : buffer_(AllocateUniqueBuffer(allocator, size * sizeof(T))),
+      size_(size),
+      capacity_(size) {
+  if (size > 0) {
+    data_ = reinterpret_cast<T*>(buffer_->mutable_data());
+  } else {
+    data_ = nullptr;
+  }
+}
+
+template <class T>
+void Vector<T>::Reserve(int64_t new_capacity) {
+  if (new_capacity > capacity_) {
+    PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity * sizeof(T)));
+    data_ = reinterpret_cast<T*>(buffer_->mutable_data());
+    capacity_ = new_capacity;
+  }
+}
+
+template <class T>
+void Vector<T>::Resize(int64_t new_size) {
+  Reserve(new_size);
+  size_ = new_size;
+}
+
+template <class T>
+void Vector<T>::Assign(int64_t size, const T val) {
+  Resize(size);
+  for (int64_t i = 0; i < size_; i++) {
+    data_[i] = val;
+  }
+}
+
+template <class T>
+void Vector<T>::Swap(Vector<T>& v) {
+  buffer_.swap(v.buffer_);
+  std::swap(size_, v.size_);
+  std::swap(capacity_, v.capacity_);
+  std::swap(data_, v.data_);
+}
+
+template class Vector<int32_t>;
+template class Vector<int64_t>;
+template class Vector<bool>;
+template class Vector<float>;
+template class Vector<double>;
+template class Vector<Int96>;
+template class Vector<ByteArray>;
+template class Vector<FixedLenByteArray>;
+
+const int ChunkedAllocator::INITIAL_CHUNK_SIZE;
+const int ChunkedAllocator::MAX_CHUNK_SIZE;
+
+ChunkedAllocator::ChunkedAllocator(MemoryAllocator* allocator)
+    : current_chunk_idx_(-1),
+      next_chunk_size_(INITIAL_CHUNK_SIZE),
+      total_allocated_bytes_(0),
+      peak_allocated_bytes_(0),
+      total_reserved_bytes_(0),
+      allocator_(allocator) {}
+
+ChunkedAllocator::ChunkInfo::ChunkInfo(int64_t size, uint8_t* buf)
+    : data(buf), size(size), allocated_bytes(0) {}
+
+ChunkedAllocator::~ChunkedAllocator() {
+  int64_t total_bytes_released = 0;
+  for (size_t i = 0; i < chunks_.size(); ++i) {
+    total_bytes_released += chunks_[i].size;
+    allocator_->Free(chunks_[i].data, chunks_[i].size);
+  }
+
+  DCHECK(chunks_.empty()) << "Must call FreeAll() or AcquireData() for this pool";
+}
+
+void ChunkedAllocator::ReturnPartialAllocation(int byte_size) {
+  DCHECK_GE(byte_size, 0);
+  DCHECK(current_chunk_idx_ != -1);
+  ChunkInfo& info = chunks_[current_chunk_idx_];
+  DCHECK_GE(info.allocated_bytes, byte_size);
+  info.allocated_bytes -= byte_size;
+  total_allocated_bytes_ -= byte_size;
+}
+
+template <bool CHECK_LIMIT_FIRST>
+uint8_t* ChunkedAllocator::Allocate(int size) {
+  if (size == 0) return NULL;
+
+  int64_t num_bytes = BitUtil::RoundUp(size, 8);
+  if (current_chunk_idx_ == -1 ||
+      num_bytes + chunks_[current_chunk_idx_].allocated_bytes >
+          chunks_[current_chunk_idx_].size) {
+    // If we couldn't allocate a new chunk, return NULL.
+    if (UNLIKELY(!FindChunk(num_bytes))) return NULL;
+  }
+  ChunkInfo& info = chunks_[current_chunk_idx_];
+  uint8_t* result = info.data + info.allocated_bytes;
+  DCHECK_LE(info.allocated_bytes + num_bytes, info.size);
+  info.allocated_bytes += num_bytes;
+  total_allocated_bytes_ += num_bytes;
+  DCHECK_LE(current_chunk_idx_, static_cast<int>(chunks_.size()) - 1);
+  peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
+  return result;
+}
+
+uint8_t* ChunkedAllocator::Allocate(int size) {
+  return Allocate<false>(size);
+}
+
+void ChunkedAllocator::Clear() {
+  current_chunk_idx_ = -1;
+  for (auto chunk = chunks_.begin(); chunk != chunks_.end(); ++chunk) {
+    chunk->allocated_bytes = 0;
+  }
+  total_allocated_bytes_ = 0;
+  DCHECK(CheckIntegrity(false));
+}
+
+void ChunkedAllocator::FreeAll() {
+  int64_t total_bytes_released = 0;
+  for (size_t i = 0; i < chunks_.size(); ++i) {
+    total_bytes_released += chunks_[i].size;
+    allocator_->Free(chunks_[i].data, chunks_[i].size);
+  }
+  chunks_.clear();
+  next_chunk_size_ = INITIAL_CHUNK_SIZE;
+  current_chunk_idx_ = -1;
+  total_allocated_bytes_ = 0;
+  total_reserved_bytes_ = 0;
+}
+
+bool ChunkedAllocator::FindChunk(int64_t min_size) {
+  // Try to allocate from a free chunk. The first free chunk, if any, will be immediately
+  // after the current chunk.
+  int first_free_idx = current_chunk_idx_ + 1;
+  // (cast size() to signed int in order to avoid everything else being cast to
+  // unsigned long, in particular -1)
+  while (++current_chunk_idx_ < static_cast<int>(chunks_.size())) {
+    // we found a free chunk
+    DCHECK_EQ(chunks_[current_chunk_idx_].allocated_bytes, 0);
+
+    if (chunks_[current_chunk_idx_].size >= min_size) {
+      // This chunk is big enough.  Move it before the other free chunks.
+      if (current_chunk_idx_ != first_free_idx) {
+        std::swap(chunks_[current_chunk_idx_], chunks_[first_free_idx]);
+        current_chunk_idx_ = first_free_idx;
+      }
+      break;
+    }
+  }
+
+  if (current_chunk_idx_ == static_cast<int>(chunks_.size())) {
+    // need to allocate new chunk.
+    int64_t chunk_size;
+    DCHECK_GE(next_chunk_size_, INITIAL_CHUNK_SIZE);
+    DCHECK_LE(next_chunk_size_, MAX_CHUNK_SIZE);
+
+    chunk_size = std::max<int64_t>(min_size, next_chunk_size_);
+
+    // Allocate a new chunk. Return early if malloc fails.
+    uint8_t* buf = nullptr;
+    PARQUET_THROW_NOT_OK(allocator_->Allocate(chunk_size, &buf));
+    if (UNLIKELY(buf == NULL)) {
+      DCHECK_EQ(current_chunk_idx_, static_cast<int>(chunks_.size()));
+      current_chunk_idx_ = static_cast<int>(chunks_.size()) - 1;
+      return false;
+    }
+
+    // If there are no free chunks put it at the end, otherwise before the first free.
+    if (first_free_idx == static_cast<int>(chunks_.size())) {
+      chunks_.push_back(ChunkInfo(chunk_size, buf));
+    } else {
+      current_chunk_idx_ = first_free_idx;
+      auto insert_chunk = chunks_.begin() + current_chunk_idx_;
+      chunks_.insert(insert_chunk, ChunkInfo(chunk_size, buf));
+    }
+    total_reserved_bytes_ += chunk_size;
+    // Don't increment the chunk size until the allocation succeeds: if an attempted
+    // large allocation fails we don't want to increase the chunk size further.
+    next_chunk_size_ =
+        static_cast<int>(std::min<int64_t>(chunk_size * 2, MAX_CHUNK_SIZE));
+  }
+
+  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
+  DCHECK(CheckIntegrity(true));
+  return true;
+}
+
+void ChunkedAllocator::AcquireData(ChunkedAllocator* src, bool keep_current) {
+  DCHECK(src->CheckIntegrity(false));
+  int num_acquired_chunks;
+  if (keep_current) {
+    num_acquired_chunks = src->current_chunk_idx_;
+  } else if (src->GetFreeOffset() == 0) {
+    // nothing in the last chunk
+    num_acquired_chunks = src->current_chunk_idx_;
+  } else {
+    num_acquired_chunks = src->current_chunk_idx_ + 1;
+  }
+
+  if (num_acquired_chunks <= 0) {
+    if (!keep_current) src->FreeAll();
+    return;
+  }
+
+  auto end_chunk = src->chunks_.begin() + num_acquired_chunks;
+  int64_t total_transfered_bytes = 0;
+  for (auto i = src->chunks_.begin(); i != end_chunk; ++i) {
+    total_transfered_bytes += i->size;
+  }
+  src->total_reserved_bytes_ -= total_transfered_bytes;
+  total_reserved_bytes_ += total_transfered_bytes;
+
+  // insert new chunks after current_chunk_idx_
+  auto insert_chunk = chunks_.begin() + current_chunk_idx_ + 1;
+  chunks_.insert(insert_chunk, src->chunks_.begin(), end_chunk);
+  src->chunks_.erase(src->chunks_.begin(), end_chunk);
+  current_chunk_idx_ += num_acquired_chunks;
+
+  if (keep_current) {
+    src->current_chunk_idx_ = 0;
+    DCHECK(src->chunks_.size() == 1 || src->chunks_[1].allocated_bytes == 0);
+    total_allocated_bytes_ += src->total_allocated_bytes_ - src->GetFreeOffset();
+    src->total_allocated_bytes_ = src->GetFreeOffset();
+  } else {
+    src->current_chunk_idx_ = -1;
+    total_allocated_bytes_ += src->total_allocated_bytes_;
+    src->total_allocated_bytes_ = 0;
+  }
+  peak_allocated_bytes_ = std::max(total_allocated_bytes_, peak_allocated_bytes_);
+
+  if (!keep_current) src->FreeAll();
+  DCHECK(CheckIntegrity(false));
+}
+
+std::string ChunkedAllocator::DebugString() {
+  std::stringstream out;
+  char str[16];
+  out << "ChunkedAllocator(#chunks=" << chunks_.size() << " [";
+  for (size_t i = 0; i < chunks_.size(); ++i) {
+    sprintf(str, "0x%lx=", reinterpret_cast<size_t>(chunks_[i].data));  // NOLINT
+    out << (i > 0 ? " " : "") << str << chunks_[i].size << "/"
+        << chunks_[i].allocated_bytes;
+  }
+  out << "] current_chunk=" << current_chunk_idx_
+      << " total_sizes=" << GetTotalChunkSizes()
+      << " total_alloc=" << total_allocated_bytes_ << ")";
+  return out.str();
+}
+
+int64_t ChunkedAllocator::GetTotalChunkSizes() const {
+  int64_t result = 0;
+  for (size_t i = 0; i < chunks_.size(); ++i) {
+    result += chunks_[i].size;
+  }
+  return result;
+}
+
+bool ChunkedAllocator::CheckIntegrity(bool current_chunk_empty) {
+  // check that current_chunk_idx_ points to the last chunk with allocated data
+  DCHECK_LT(current_chunk_idx_, static_cast<int>(chunks_.size()));
+  int64_t total_allocated = 0;
+  for (int i = 0; i < static_cast<int>(chunks_.size()); ++i) {
+    DCHECK_GT(chunks_[i].size, 0);
+    if (i < current_chunk_idx_) {
+      DCHECK_GT(chunks_[i].allocated_bytes, 0);
+    } else if (i == current_chunk_idx_) {
+      if (current_chunk_empty) {
+        DCHECK_EQ(chunks_[i].allocated_bytes, 0);
+      } else {
+        DCHECK_GT(chunks_[i].allocated_bytes, 0);
+      }
+    } else {
+      DCHECK_EQ(chunks_[i].allocated_bytes, 0);
+    }
+    total_allocated += chunks_[i].allocated_bytes;
+  }
+  DCHECK_EQ(total_allocated, total_allocated_bytes_);
+  return true;
+}
+
+// ----------------------------------------------------------------------
+// Arrow IO wrappers
+
+// Close the output stream
+void ArrowFileMethods::Close() {
+  PARQUET_THROW_NOT_OK(file_interface()->Close());
+}
+
+// Return the current position in the output stream relative to the start
+int64_t ArrowFileMethods::Tell() {
+  int64_t position = 0;
+  PARQUET_THROW_NOT_OK(file_interface()->Tell(&position));
+  return position;
+}
+
+ArrowInputFile::ArrowInputFile(
+    const std::shared_ptr<::arrow::io::ReadableFileInterface>& file)
+    : file_(file) {}
+
+::arrow::io::FileInterface* ArrowInputFile::file_interface() {
+  return file_.get();
+}
+
+int64_t ArrowInputFile::Size() const {
+  int64_t size;
+  PARQUET_THROW_NOT_OK(file_->GetSize(&size));
+  return size;
+}
+
+void ArrowInputFile::Seek(int64_t position) {
+  PARQUET_THROW_NOT_OK(file_->Seek(position));
+}
+
+// Returns bytes read
+int64_t ArrowInputFile::Read(int64_t nbytes, uint8_t* out) {
+  int64_t bytes_read = 0;
+  PARQUET_THROW_NOT_OK(file_->Read(nbytes, &bytes_read, out));
+  return bytes_read;
+}
+
+std::shared_ptr<Buffer> ArrowInputFile::Read(int64_t nbytes) {
+  std::shared_ptr<Buffer> out;
+  PARQUET_THROW_NOT_OK(file_->Read(nbytes, &out));
+  return out;
+}
+
+std::shared_ptr<Buffer> ArrowInputFile::ReadAt(int64_t position, int64_t nbytes) {
+  std::shared_ptr<Buffer> out;
+  PARQUET_THROW_NOT_OK(file_->ReadAt(position, nbytes, &out));
+  return out;
+}
+
+ArrowOutputStream::ArrowOutputStream(
+    const std::shared_ptr<::arrow::io::OutputStream> file)
+    : file_(file) {}
+
+::arrow::io::FileInterface* ArrowOutputStream::file_interface() {
+  return file_.get();
+}
+
+// Copy bytes into the output stream
+void ArrowOutputStream::Write(const uint8_t* data, int64_t length) {
+  PARQUET_THROW_NOT_OK(file_->Write(data, length));
+}
+
+// ----------------------------------------------------------------------
+// InMemoryInputStream
+
+InMemoryInputStream::InMemoryInputStream(const std::shared_ptr<Buffer>& buffer)
+    : buffer_(buffer), offset_(0) {
+  len_ = buffer_->size();
+}
+
+InMemoryInputStream::InMemoryInputStream(
+    RandomAccessSource* source, int64_t start, int64_t num_bytes)
+    : offset_(0) {
+  buffer_ = source->ReadAt(start, num_bytes);
+  if (buffer_->size() < num_bytes) {
+    throw ParquetException("Unable to read column chunk data");
+  }
+  len_ = buffer_->size();
+}
+
+const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+  *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
+  return buffer_->data() + offset_;
+}
+
+const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+  const uint8_t* result = Peek(num_to_read, num_bytes);
+  offset_ += *num_bytes;
+  return result;
+}
+
+void InMemoryInputStream::Advance(int64_t num_bytes) {
+  offset_ += num_bytes;
+}
+
+// ----------------------------------------------------------------------
+// In-memory output stream
+
+InMemoryOutputStream::InMemoryOutputStream(
+    MemoryAllocator* allocator, int64_t initial_capacity)
+    : size_(0), capacity_(initial_capacity) {
+  if (initial_capacity == 0) { initial_capacity = kInMemoryDefaultCapacity; }
+  buffer_ = AllocateBuffer(allocator, initial_capacity);
+}
+
+InMemoryOutputStream::~InMemoryOutputStream() {}
+
+uint8_t* InMemoryOutputStream::Head() {
+  return buffer_->mutable_data() + size_;
+}
+
+void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) {
+  if (size_ + length > capacity_) {
+    int64_t new_capacity = capacity_ * 2;
+    while (new_capacity < size_ + length) {
+      new_capacity *= 2;
+    }
+    PARQUET_THROW_NOT_OK(buffer_->Resize(new_capacity));
+    capacity_ = new_capacity;
+  }
+  memcpy(Head(), data, length);
+  size_ += length;
+}
+
+int64_t InMemoryOutputStream::Tell() {
+  return size_;
+}
+
+std::shared_ptr<Buffer> InMemoryOutputStream::GetBuffer() {
+  PARQUET_THROW_NOT_OK(buffer_->Resize(size_));
+  std::shared_ptr<Buffer> result = buffer_;
+  buffer_ = nullptr;
+  return result;
+}
+
+// ----------------------------------------------------------------------
+// BufferedInputStream
+
+BufferedInputStream::BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
+    RandomAccessSource* source, int64_t start, int64_t num_bytes)
+    : source_(source), stream_offset_(start), stream_end_(start + num_bytes) {
+  buffer_ = AllocateBuffer(pool, buffer_size);
+  buffer_size_ = buffer_->size();
+  // Required to force a lazy read
+  buffer_offset_ = buffer_size_;
+}
+
+const uint8_t* BufferedInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+  *num_bytes = std::min(num_to_peek, stream_end_ - stream_offset_);
+  // increase the buffer size if needed
+  if (*num_bytes > buffer_size_) {
+    PARQUET_THROW_NOT_OK(buffer_->Resize(*num_bytes));
+    buffer_size_ = buffer_->size();
+    DCHECK(buffer_size_ >= *num_bytes);
+  }
+  // Read more data when buffer has insufficient left or when resized
+  if (*num_bytes > (buffer_size_ - buffer_offset_)) {
+    source_->Seek(stream_offset_);
+    buffer_size_ = std::min(buffer_size_, stream_end_ - stream_offset_);
+    int64_t bytes_read = source_->Read(buffer_size_, buffer_->mutable_data());
+    if (bytes_read < *num_bytes) {
+      throw ParquetException("Failed reading column data from source");
+    }
+    buffer_offset_ = 0;
+  }
+  return buffer_->data() + buffer_offset_;
+}
+
+const uint8_t* BufferedInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+  const uint8_t* result = Peek(num_to_read, num_bytes);
+  stream_offset_ += *num_bytes;
+  buffer_offset_ += *num_bytes;
+  return result;
+}
+
+void BufferedInputStream::Advance(int64_t num_bytes) {
+  stream_offset_ += num_bytes;
+  buffer_offset_ += num_bytes;
+}
+
+std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size) {
+  auto result = std::make_shared<PoolBuffer>(allocator);
+  if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); }
+  return result;
+}
+
+std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(
+    MemoryAllocator* allocator, int64_t size) {
+  std::unique_ptr<PoolBuffer> result(new PoolBuffer(allocator));
+  if (size > 0) { PARQUET_THROW_NOT_OK(result->Resize(size)); }
+  return result;
+}
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/2154e873/src/parquet/util/memory.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/memory.h b/src/parquet/util/memory.h
new file mode 100644
index 0000000..1ffca35
--- /dev/null
+++ b/src/parquet/util/memory.h
@@ -0,0 +1,440 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_MEMORY_H
+#define PARQUET_UTIL_MEMORY_H
+
+#include <cstdint>
+#include <cstdlib>
+#include <cstring>
+#include <memory>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "arrow/buffer.h"
+#include "arrow/io/interfaces.h"
+#include "arrow/io/memory.h"
+#include "arrow/memory_pool.h"
+#include "arrow/status.h"
+
+#include "parquet/exception.h"
+#include "parquet/util/macros.h"
+#include "parquet/util/visibility.h"
+
+#define PARQUET_CATCH_NOT_OK(s)                    \
+  try {                                            \
+    (s);                                           \
+  } catch (const ::parquet::ParquetException& e) { \
+    return ::arrow::Status::IOError(e.what());     \
+  }
+
+#define PARQUET_IGNORE_NOT_OK(s) \
+  try {                          \
+    (s);                         \
+  } catch (const ::parquet::ParquetException& e) {}
+
+#define PARQUET_THROW_NOT_OK(s)                     \
+  do {                                              \
+    ::arrow::Status _s = (s);                       \
+    if (!_s.ok()) {                                 \
+      std::stringstream ss;                         \
+      ss << "Arrow error: " << _s.ToString();       \
+      ::parquet::ParquetException::Throw(ss.str()); \
+    }                                               \
+  } while (0);
+
+namespace parquet {
+
+static constexpr int64_t kInMemoryDefaultCapacity = 1024;
+
+using Buffer = ::arrow::Buffer;
+using MutableBuffer = ::arrow::MutableBuffer;
+using ResizableBuffer = ::arrow::ResizableBuffer;
+using PoolBuffer = ::arrow::PoolBuffer;
+using MemoryAllocator = ::arrow::MemoryPool;
+
+PARQUET_EXPORT MemoryAllocator* default_allocator();
+
+class PARQUET_EXPORT TrackingAllocator : public MemoryAllocator {
+ public:
+  TrackingAllocator() : total_memory_(0), max_memory_(0) {}
+
+  ::arrow::Status Allocate(int64_t size, uint8_t** out) override;
+  void Free(uint8_t* p, int64_t size) override;
+
+  int64_t bytes_allocated() const override { return total_memory_; }
+
+  int64_t max_memory() { return max_memory_; }
+
+ private:
+  std::mutex stats_mutex_;
+  int64_t total_memory_;
+  int64_t max_memory_;
+};
+
+template <class T>
+class Vector {
+ public:
+  explicit Vector(int64_t size, MemoryAllocator* allocator);
+  void Resize(int64_t new_size);
+  void Reserve(int64_t new_capacity);
+  void Assign(int64_t size, const T val);
+  void Swap(Vector<T>& v);
+  inline T& operator[](int64_t i) const { return data_[i]; }
+
+ private:
+  std::unique_ptr<PoolBuffer> buffer_;
+  int64_t size_;
+  int64_t capacity_;
+  T* data_;
+
+  DISALLOW_COPY_AND_ASSIGN(Vector);
+};
+
+/// A ChunkedAllocator maintains a list of memory chunks from which it
+/// allocates memory in response to Allocate() calls; Chunks stay around for
+/// the lifetime of the allocator or until they are passed on to another
+/// allocator.
+//
+/// An Allocate() call will attempt to allocate memory from the chunk that was most
+/// recently added; if that chunk doesn't have enough memory to
+/// satisfy the allocation request, the free chunks are searched for one that is
+/// big enough otherwise a new chunk is added to the list.
+/// The current_chunk_idx_ always points to the last chunk with allocated memory.
+/// In order to keep allocation overhead low, chunk sizes double with each new one
+/// added, until they hit a maximum size.
+//
+///     Example:
+///     ChunkedAllocator* p = new ChunkedAllocator();
+///     for (int i = 0; i < 1024; ++i) {
+/// returns 8-byte aligned memory (effectively 24 bytes):
+///       .. = p->Allocate(17);
+///     }
+/// at this point, 17K have been handed out in response to Allocate() calls and
+/// 28K of chunks have been allocated (chunk sizes: 4K, 8K, 16K)
+/// We track total and peak allocated bytes. At this point they would be the same:
+/// 28k bytes.  A call to Clear will return the allocated memory so
+/// total_allocate_bytes_
+/// becomes 0 while peak_allocate_bytes_ remains at 28k.
+///     p->Clear();
+/// the entire 1st chunk is returned:
+///     .. = p->Allocate(4 * 1024);
+/// 4K of the 2nd chunk are returned:
+///     .. = p->Allocate(4 * 1024);
+/// a new 20K chunk is created
+///     .. = p->Allocate(20 * 1024);
+//
+///      ChunkedAllocator* p2 = new ChunkedAllocator();
+/// the new ChunkedAllocator receives all chunks containing data from p
+///      p2->AcquireData(p, false);
+/// At this point p.total_allocated_bytes_ would be 0 while p.peak_allocated_bytes_
+/// remains unchanged.
+/// The one remaining (empty) chunk is released:
+///    delete p;
+
+class ChunkedAllocator {
+ public:
+  explicit ChunkedAllocator(MemoryAllocator* allocator = default_allocator());
+
+  /// Frees all chunks of memory and subtracts the total allocated bytes
+  /// from the registered limits.
+  ~ChunkedAllocator();
+
+  /// Allocates 8-byte aligned section of memory of 'size' bytes at the end
+  /// of the the current chunk. Creates a new chunk if there aren't any chunks
+  /// with enough capacity.
+  uint8_t* Allocate(int size);
+
+  /// Returns 'byte_size' to the current chunk back to the mem pool. This can
+  /// only be used to return either all or part of the previous allocation returned
+  /// by Allocate().
+  void ReturnPartialAllocation(int byte_size);
+
+  /// Makes all allocated chunks available for re-use, but doesn't delete any chunks.
+  void Clear();
+
+  /// Deletes all allocated chunks. FreeAll() or AcquireData() must be called for
+  /// each mem pool
+  void FreeAll();
+
+  /// Absorb all chunks that hold data from src. If keep_current is true, let src hold on
+  /// to its last allocated chunk that contains data.
+  /// All offsets handed out by calls to GetCurrentOffset() for 'src' become invalid.
+  void AcquireData(ChunkedAllocator* src, bool keep_current);
+
+  std::string DebugString();
+
+  int64_t total_allocated_bytes() const { return total_allocated_bytes_; }
+  int64_t peak_allocated_bytes() const { return peak_allocated_bytes_; }
+  int64_t total_reserved_bytes() const { return total_reserved_bytes_; }
+
+  /// Return sum of chunk_sizes_.
+  int64_t GetTotalChunkSizes() const;
+
+ private:
+  friend class ChunkedAllocatorTest;
+  static const int INITIAL_CHUNK_SIZE = 4 * 1024;
+
+  /// The maximum size of chunk that should be allocated. Allocations larger than this
+  /// size will get their own individual chunk.
+  static const int MAX_CHUNK_SIZE = 1024 * 1024;
+
+  struct ChunkInfo {
+    uint8_t* data;  // Owned by the ChunkInfo.
+    int64_t size;   // in bytes
+
+    /// bytes allocated via Allocate() in this chunk
+    int64_t allocated_bytes;
+
+    explicit ChunkInfo(int64_t size, uint8_t* buf);
+
+    ChunkInfo() : data(NULL), size(0), allocated_bytes(0) {}
+  };
+
+  /// chunk from which we served the last Allocate() call;
+  /// always points to the last chunk that contains allocated data;
+  /// chunks 0..current_chunk_idx_ are guaranteed to contain data
+  /// (chunks_[i].allocated_bytes > 0 for i: 0..current_chunk_idx_);
+  /// -1 if no chunks present
+  int current_chunk_idx_;
+
+  /// The size of the next chunk to allocate.
+  int64_t next_chunk_size_;
+
+  /// sum of allocated_bytes_
+  int64_t total_allocated_bytes_;
+
+  /// Maximum number of bytes allocated from this pool at one time.
+  int64_t peak_allocated_bytes_;
+
+  /// sum of all bytes allocated in chunks_
+  int64_t total_reserved_bytes_;
+
+  std::vector<ChunkInfo> chunks_;
+
+  MemoryAllocator* allocator_;
+
+  /// Find or allocated a chunk with at least min_size spare capacity and update
+  /// current_chunk_idx_. Also updates chunks_, chunk_sizes_ and allocated_bytes_
+  /// if a new chunk needs to be created.
+  bool FindChunk(int64_t min_size);
+
+  /// Check integrity of the supporting data structures; always returns true but DCHECKs
+  /// all invariants.
+  /// If 'current_chunk_empty' is false, checks that the current chunk contains data.
+  bool CheckIntegrity(bool current_chunk_empty);
+
+  /// Return offset to unoccpied space in current chunk.
+  int GetFreeOffset() const {
+    if (current_chunk_idx_ == -1) return 0;
+    return chunks_[current_chunk_idx_].allocated_bytes;
+  }
+
+  template <bool CHECK_LIMIT_FIRST>
+  uint8_t* Allocate(int size);
+};
+
+// File input and output interfaces that translate arrow::Status to exceptions
+
+class PARQUET_EXPORT FileInterface {
+ public:
+  // Close the file
+  virtual void Close() = 0;
+
+  // Return the current position in the file relative to the start
+  virtual int64_t Tell() = 0;
+};
+
+class PARQUET_EXPORT RandomAccessSource : virtual public FileInterface {
+ public:
+  virtual ~RandomAccessSource() {}
+
+  virtual int64_t Size() const = 0;
+
+  virtual void Seek(int64_t position) = 0;
+
+  // Returns bytes read
+  virtual int64_t Read(int64_t nbytes, uint8_t* out) = 0;
+
+  virtual std::shared_ptr<Buffer> Read(int64_t nbytes) = 0;
+
+  virtual std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) = 0;
+};
+
+class PARQUET_EXPORT OutputStream : virtual public FileInterface {
+ public:
+  virtual ~OutputStream() {}
+
+  // Copy bytes into the output stream
+  virtual void Write(const uint8_t* data, int64_t length) = 0;
+};
+
+class PARQUET_EXPORT ArrowFileMethods : virtual public FileInterface {
+ public:
+  void Close() override;
+  int64_t Tell() override;
+
+ protected:
+  virtual ::arrow::io::FileInterface* file_interface() = 0;
+};
+
+class PARQUET_EXPORT ArrowInputFile : public ArrowFileMethods, public RandomAccessSource {
+ public:
+  explicit ArrowInputFile(
+      const std::shared_ptr<::arrow::io::ReadableFileInterface>& file);
+
+  int64_t Size() const override;
+
+  void Seek(int64_t position) override;
+
+  // Returns bytes read
+  int64_t Read(int64_t nbytes, uint8_t* out) override;
+
+  std::shared_ptr<Buffer> Read(int64_t nbytes) override;
+
+  std::shared_ptr<Buffer> ReadAt(int64_t position, int64_t nbytes) override;
+
+  std::shared_ptr<::arrow::io::ReadableFileInterface> file() const { return file_; }
+
+  // Diamond inheritance
+  using ArrowFileMethods::Close;
+  using ArrowFileMethods::Tell;
+
+ private:
+  ::arrow::io::FileInterface* file_interface() override;
+  std::shared_ptr<::arrow::io::ReadableFileInterface> file_;
+};
+
+class PARQUET_EXPORT ArrowOutputStream : public ArrowFileMethods, public OutputStream {
+ public:
+  explicit ArrowOutputStream(const std::shared_ptr<::arrow::io::OutputStream> file);
+
+  // Copy bytes into the output stream
+  void Write(const uint8_t* data, int64_t length) override;
+
+  std::shared_ptr<::arrow::io::OutputStream> file() { return file_; }
+
+  // Diamond inheritance
+  using ArrowFileMethods::Close;
+  using ArrowFileMethods::Tell;
+
+ private:
+  ::arrow::io::FileInterface* file_interface() override;
+  std::shared_ptr<::arrow::io::OutputStream> file_;
+};
+
+class PARQUET_EXPORT InMemoryOutputStream : public OutputStream {
+ public:
+  explicit InMemoryOutputStream(MemoryAllocator* allocator = default_allocator(),
+      int64_t initial_capacity = kInMemoryDefaultCapacity);
+
+  virtual ~InMemoryOutputStream();
+
+  // Close is currently a no-op with the in-memory stream
+  virtual void Close() {}
+
+  virtual int64_t Tell();
+
+  virtual void Write(const uint8_t* data, int64_t length);
+
+  // Return complete stream as Buffer
+  std::shared_ptr<Buffer> GetBuffer();
+
+ private:
+  // Mutable pointer to the current write position in the stream
+  uint8_t* Head();
+
+  std::shared_ptr<ResizableBuffer> buffer_;
+  int64_t size_;
+  int64_t capacity_;
+
+  DISALLOW_COPY_AND_ASSIGN(InMemoryOutputStream);
+};
+
+// ----------------------------------------------------------------------
+// Streaming input interfaces
+
+// Interface for the column reader to get the bytes. The interface is a stream
+// interface, meaning the bytes in order and once a byte is read, it does not
+// need to be read again.
+class InputStream {
+ public:
+  // Returns the next 'num_to_peek' without advancing the current position.
+  // *num_bytes will contain the number of bytes returned which can only be
+  // less than num_to_peek at end of stream cases.
+  // Since the position is not advanced, calls to this function are idempotent.
+  // The buffer returned to the caller is still owned by the input stream and must
+  // stay valid until the next call to Peek() or Read().
+  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0;
+
+  // Identical to Peek(), except the current position in the stream is advanced by
+  // *num_bytes.
+  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;
+
+  // Advance the stream without reading
+  virtual void Advance(int64_t num_bytes) = 0;
+
+  virtual ~InputStream() {}
+
+ protected:
+  InputStream() {}
+};
+
+// Implementation of an InputStream when all the bytes are in memory.
+class InMemoryInputStream : public InputStream {
+ public:
+  InMemoryInputStream(RandomAccessSource* source, int64_t start, int64_t end);
+  explicit InMemoryInputStream(const std::shared_ptr<Buffer>& buffer);
+  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+  virtual void Advance(int64_t num_bytes);
+
+ private:
+  std::shared_ptr<Buffer> buffer_;
+  int64_t len_;
+  int64_t offset_;
+};
+
+// Implementation of an InputStream when only some of the bytes are in memory.
+class BufferedInputStream : public InputStream {
+ public:
+  BufferedInputStream(MemoryAllocator* pool, int64_t buffer_size,
+      RandomAccessSource* source, int64_t start, int64_t end);
+  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+  virtual void Advance(int64_t num_bytes);
+
+ private:
+  std::shared_ptr<PoolBuffer> buffer_;
+  RandomAccessSource* source_;
+  int64_t stream_offset_;
+  int64_t stream_end_;
+  int64_t buffer_offset_;
+  int64_t buffer_size_;
+};
+
+std::shared_ptr<PoolBuffer> AllocateBuffer(MemoryAllocator* allocator, int64_t size = 0);
+
+std::unique_ptr<PoolBuffer> AllocateUniqueBuffer(
+    MemoryAllocator* allocator, int64_t size = 0);
+
+}  // namespace parquet
+
+#endif  // PARQUET_UTIL_MEMORY_H