You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/02/11 07:31:26 UTC

parquet-cpp git commit: PARQUET-501: Add OutputStream abstract interface, refactor encoding code paths

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 4d735876d -> c11e7d487


PARQUET-501: Add OutputStream abstract interface, refactor encoding code paths

I also did a bit of tidying / reorganization and giving interfaces more descriptive names.

Author: Wes McKinney <we...@cloudera.com>

Closes #46 from wesm/PARQUET-501 and squashes the following commits:

491aa89 [Wes McKinney] * Add a basic OutputStream abstract interface and an InMemoryOutputStream   implementation for testing. * Refactor to use OutputStream on data encoding paths, reduce some code   duplication in column-reader-test. * Collect all input/output classes into util/input.* and util/output.*. * Use int64_t in InputStream::Peek/Read.


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/c11e7d48
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/c11e7d48
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/c11e7d48

Branch: refs/heads/master
Commit: c11e7d487aba2ba91efb261fa20dda4dd6498ac7
Parents: 4d73587
Author: Wes McKinney <we...@cloudera.com>
Authored: Wed Feb 10 22:31:22 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Wed Feb 10 22:31:22 2016 -0800

----------------------------------------------------------------------
 example/parquet-dump-schema.cc               |   2 +-
 example/parquet_reader.cc                    |   2 +-
 src/parquet/column/column-reader-test.cc     |  55 +++-------
 src/parquet/column/serialized-page.cc        |   3 +-
 src/parquet/column/serialized-page.h         |   2 +-
 src/parquet/column/test-util.h               | 100 +++++++++--------
 src/parquet/encodings/encodings.h            |   8 +-
 src/parquet/encodings/plain-encoding-test.cc |  11 +-
 src/parquet/encodings/plain-encoding.h       |  39 ++++---
 src/parquet/parquet.h                        |   4 +-
 src/parquet/reader-test.cc                   |   3 +-
 src/parquet/reader.cc                        |  47 +-------
 src/parquet/reader.h                         |  47 +-------
 src/parquet/util/CMakeLists.txt              |   7 +-
 src/parquet/util/input.cc                    | 110 +++++++++++++++++++
 src/parquet/util/input.h                     | 128 ++++++++++++++++++++++
 src/parquet/util/input_stream.cc             |  63 -----------
 src/parquet/util/input_stream.h              |  80 --------------
 src/parquet/util/output-test.cc              |  44 ++++++++
 src/parquet/util/output.cc                   |  73 ++++++++++++
 src/parquet/util/output.h                    |  71 ++++++++++++
 src/parquet/util/test-common.h               |  23 ++++
 22 files changed, 564 insertions(+), 358 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/example/parquet-dump-schema.cc
----------------------------------------------------------------------
diff --git a/example/parquet-dump-schema.cc b/example/parquet-dump-schema.cc
index 9471225..09c715c 100644
--- a/example/parquet-dump-schema.cc
+++ b/example/parquet-dump-schema.cc
@@ -35,7 +35,7 @@ int main(int argc, char** argv) {
   std::string filename = argv[1];
 
   parquet_cpp::ParquetFileReader reader;
-  parquet_cpp::LocalFile file;
+  parquet_cpp::LocalFileSource file;
 
   file.Open(filename);
   if (!file.is_open()) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index 621f0ba..ca717df 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -40,7 +40,7 @@ int main(int argc, char** argv) {
   }
 
   parquet_cpp::ParquetFileReader reader;
-  parquet_cpp::LocalFile file;
+  parquet_cpp::LocalFileSource file;
 
   file.Open(filename);
   if (!file.is_open()) {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
index 0d4aea1..84a36db 100644
--- a/src/parquet/column/column-reader-test.cc
+++ b/src/parquet/column/column-reader-test.cc
@@ -29,6 +29,7 @@
 #include "parquet/column/reader.h"
 #include "parquet/column/test-util.h"
 
+#include "parquet/util/output.h"
 #include "parquet/util/test-common.h"
 
 using std::string;
@@ -60,31 +61,15 @@ class TestPrimitiveReader : public ::testing::Test {
   vector<shared_ptr<Page> > pages_;
 };
 
-template <typename T>
-static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
-  if (end < start) {
-    return vector<T>(0);
-  }
-
-  vector<T> out(end - start);
-  for (size_t i = start; i < end; ++i) {
-    out[i - start] = values[i];
-  }
-  return out;
-}
-
 
 TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
   vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
-  size_t num_values = values.size();
-  parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;
 
-  vector<uint8_t> page1;
-  test::DataPageBuilder<Type::INT32> page_builder(&page1);
-  page_builder.AppendValues(values, parquet::Encoding::PLAIN);
-  pages_.push_back(page_builder.Finish());
+  std::vector<uint8_t> buffer;
+  std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, {}, 0,
+    {}, 0, &buffer);
+  pages_.push_back(page);
 
-  // TODO: simplify this
   NodePtr type = schema::Int32("a", Repetition::REQUIRED);
   ColumnDescriptor descr(type, 0, 0);
   InitReader(&descr);
@@ -102,21 +87,16 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
   ASSERT_TRUE(vector_equal(result, values));
 }
 
+
 TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
   vector<int32_t> values = {1, 2, 3, 4, 5};
   vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};
 
-  size_t num_values = values.size();
-  parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;
-
-  vector<uint8_t> page1;
-  test::DataPageBuilder<Type::INT32> page_builder(&page1);
-
-  // Definition levels precede the values
-  page_builder.AppendDefLevels(def_levels, 1, parquet::Encoding::RLE);
-  page_builder.AppendValues(values, parquet::Encoding::PLAIN);
+  std::vector<uint8_t> buffer;
+  std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values, def_levels, 1,
+    {}, 0, &buffer);
 
-  pages_.push_back(page_builder.Finish());
+  pages_.push_back(page);
 
   NodePtr type = schema::Int32("a", Repetition::OPTIONAL);
   ColumnDescriptor descr(type, 1, 0);
@@ -159,18 +139,11 @@ TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
   vector<int16_t> def_levels = {2, 1, 1, 2, 2, 1, 1, 2, 2, 1};
   vector<int16_t> rep_levels = {0, 1, 1, 0, 0, 1, 1, 0, 0, 1};
 
-  size_t num_values = values.size();
-  parquet::Encoding::type value_encoding = parquet::Encoding::PLAIN;
-
-  vector<uint8_t> page1;
-  test::DataPageBuilder<Type::INT32> page_builder(&page1);
-
-  // Definition levels precede the values
-  page_builder.AppendRepLevels(rep_levels, 1, parquet::Encoding::RLE);
-  page_builder.AppendDefLevels(def_levels, 2, parquet::Encoding::RLE);
-  page_builder.AppendValues(values, parquet::Encoding::PLAIN);
+  std::vector<uint8_t> buffer;
+  std::shared_ptr<DataPage> page = MakeDataPage<Type::INT32>(values,
+      def_levels, 2, rep_levels, 1, &buffer);
 
-  pages_.push_back(page_builder.Finish());
+  pages_.push_back(page);
 
   NodePtr type = schema::Int32("a", Repetition::REPEATED);
   ColumnDescriptor descr(type, 2, 1);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/serialized-page.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc
index 1cbaf4d..b9d470c 100644
--- a/src/parquet/column/serialized-page.cc
+++ b/src/parquet/column/serialized-page.cc
@@ -21,7 +21,6 @@
 
 #include "parquet/exception.h"
 #include "parquet/thrift/util.h"
-#include "parquet/util/input_stream.h"
 
 using parquet::PageType;
 
@@ -52,7 +51,7 @@ std::shared_ptr<Page> SerializedPageReader::NextPage() {
   // Loop here because there may be unhandled page types that we skip until
   // finding a page that we do know what to do with
   while (true) {
-    int bytes_read = 0;
+    int64_t bytes_read = 0;
     const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
     if (bytes_read == 0) {
       return std::shared_ptr<Page>(nullptr);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/serialized-page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h
index 2735c3c..c02152f 100644
--- a/src/parquet/column/serialized-page.h
+++ b/src/parquet/column/serialized-page.h
@@ -27,7 +27,7 @@
 
 #include "parquet/column/page.h"
 #include "parquet/compression/codec.h"
-#include "parquet/util/input_stream.h"
+#include "parquet/util/input.h"
 #include "parquet/thrift/parquet_types.h"
 
 namespace parquet_cpp {

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
index 8861134..1cbcf8c 100644
--- a/src/parquet/column/test-util.h
+++ b/src/parquet/column/test-util.h
@@ -52,26 +52,22 @@ class MockPageReader : public PageReader {
   size_t page_index_;
 };
 
-// TODO(wesm): this is only used for testing for now
-
-static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024;
-static constexpr int INIT_BUFFER_SIZE = 1024;
+// TODO(wesm): this is only used for testing for now. Refactor to form part of
+// primary file write path
 
 template <int TYPE>
 class DataPageBuilder {
  public:
   typedef typename type_traits<TYPE>::value_type T;
 
-  // The passed vector is the owner of the page's data
-  explicit DataPageBuilder(std::vector<uint8_t>* out) :
-      out_(out),
-      buffer_size_(0),
+  // This class writes data and metadata to the passed inputs
+  explicit DataPageBuilder(InMemoryOutputStream* sink, parquet::DataPageHeader* header) :
+      sink_(sink),
+      header_(header),
       num_values_(0),
       have_def_levels_(false),
       have_rep_levels_(false),
       have_values_(false) {
-    out_->resize(INIT_BUFFER_SIZE);
-    buffer_capacity_ = INIT_BUFFER_SIZE;
   }
 
   void AppendDefLevels(const std::vector<int16_t>& levels,
@@ -79,7 +75,7 @@ class DataPageBuilder {
     AppendLevels(levels, max_level, encoding);
 
     num_values_ = std::max(levels.size(), num_values_);
-    header_.__set_definition_level_encoding(encoding);
+    header_->__set_definition_level_encoding(encoding);
     have_def_levels_ = true;
   }
 
@@ -88,7 +84,7 @@ class DataPageBuilder {
     AppendLevels(levels, max_level, encoding);
 
     num_values_ = std::max(levels.size(), num_values_);
-    header_.__set_repetition_level_encoding(encoding);
+    header_->__set_repetition_level_encoding(encoding);
     have_rep_levels_ = true;
   }
 
@@ -98,53 +94,31 @@ class DataPageBuilder {
       ParquetException::NYI("only plain encoding currently implemented");
     }
     size_t bytes_to_encode = values.size() * sizeof(T);
-    Reserve(bytes_to_encode);
 
     PlainEncoder<TYPE> encoder(nullptr);
-    size_t nbytes = encoder.Encode(&values[0], values.size(), Head());
-    // In case for some reason it's fewer than bytes_to_encode
-    buffer_size_ += nbytes;
+    encoder.Encode(&values[0], values.size(), sink_);
 
     num_values_ = std::max(values.size(), num_values_);
-    header_.__set_encoding(encoding);
+    header_->__set_encoding(encoding);
     have_values_ = true;
   }
 
-  std::shared_ptr<Page> Finish() {
+  void Finish() {
     if (!have_values_) {
       throw ParquetException("A data page must at least contain values");
     }
-    header_.__set_num_values(num_values_);
-    return std::make_shared<DataPage>(&(*out_)[0], buffer_size_, header_);
+    header_->__set_num_values(num_values_);
   }
 
  private:
-  std::vector<uint8_t>* out_;
-
-  size_t buffer_size_;
-  size_t buffer_capacity_;
-
-  parquet::DataPageHeader header_;
+  InMemoryOutputStream* sink_;
+  parquet::DataPageHeader* header_;
 
   size_t num_values_;
-
   bool have_def_levels_;
   bool have_rep_levels_;
   bool have_values_;
 
-  void Reserve(size_t nbytes) {
-    while ((nbytes + buffer_size_) > buffer_capacity_) {
-      // TODO(wesm): limit to one reserve when this loop runs more than once
-      size_t new_capacity = 2 * buffer_capacity_;
-      out_->resize(new_capacity);
-      buffer_capacity_ = new_capacity;
-    }
-  }
-
-  uint8_t* Head() {
-    return &(*out_)[buffer_size_];
-  }
-
   // Used internally for both repetition and definition levels
   void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
       parquet::Encoding::type encoding) {
@@ -153,9 +127,11 @@ class DataPageBuilder {
     }
 
     // TODO: compute a more precise maximum size for the encoded levels
-    std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE);
-
+    std::vector<uint8_t> encode_buffer(levels.size() * 4);
 
+    // We encode into separate memory from the output stream because the
+    // RLE-encoded bytes have to be preceded in the stream by their absolute
+    // size.
     LevelEncoder encoder;
     encoder.Init(encoding, max_level, levels.size(),
         encode_buffer.data(), encode_buffer.size());
@@ -163,15 +139,43 @@ class DataPageBuilder {
     encoder.Encode(levels.size(), levels.data());
 
     uint32_t rle_bytes = encoder.len();
-    size_t levels_footprint = sizeof(uint32_t) + rle_bytes;
-    Reserve(levels_footprint);
-
-    *reinterpret_cast<uint32_t*>(Head()) = rle_bytes;
-    memcpy(Head() + sizeof(uint32_t), encode_buffer.data(), rle_bytes);
-    buffer_size_ += levels_footprint;
+    sink_->Write(reinterpret_cast<const uint8_t*>(&rle_bytes), sizeof(uint32_t));
+    sink_->Write(encode_buffer.data(), rle_bytes);
   }
 };
 
+template <int TYPE, typename T>
+static std::shared_ptr<DataPage> MakeDataPage(const std::vector<T>& values,
+    const std::vector<int16_t>& def_levels, int16_t max_def_level,
+    const std::vector<int16_t>& rep_levels, int16_t max_rep_level,
+    std::vector<uint8_t>* out_buffer) {
+  size_t num_values = values.size();
+
+  InMemoryOutputStream page_stream;
+  parquet::DataPageHeader page_header;
+
+  test::DataPageBuilder<TYPE> page_builder(&page_stream, &page_header);
+
+  if (!rep_levels.empty()) {
+    page_builder.AppendRepLevels(rep_levels, max_rep_level,
+        parquet::Encoding::RLE);
+  }
+
+  if (!def_levels.empty()) {
+    page_builder.AppendDefLevels(def_levels, max_def_level,
+        parquet::Encoding::RLE);
+  }
+
+  page_builder.AppendValues(values, parquet::Encoding::PLAIN);
+  page_builder.Finish();
+
+  // Hand off the data stream to the passed std::vector
+  page_stream.Transfer(out_buffer);
+
+  return std::make_shared<DataPage>(&(*out_buffer)[0], out_buffer->size(), page_header);
+}
+
+
 } // namespace test
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/encodings.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h
index 21754d1..46c61b6 100644
--- a/src/parquet/encodings/encodings.h
+++ b/src/parquet/encodings/encodings.h
@@ -23,6 +23,7 @@
 #include "parquet/exception.h"
 #include "parquet/types.h"
 
+#include "parquet/util/output.h"
 #include "parquet/util/rle-encoding.h"
 #include "parquet/util/bit-stream-utils.inline.h"
 
@@ -82,14 +83,9 @@ class Encoder {
 
   virtual ~Encoder() {}
 
-  // TODO(wesm): use an output stream
-
   // Subclasses should override the ones they support
-  //
-  // @returns: the number of bytes written to dst
-  virtual size_t Encode(const T* src, int num_values, uint8_t* dst) {
+  virtual void Encode(const T* src, int num_values, OutputStream* dst) {
     throw ParquetException("Encoder does not implement this type.");
-    return 0;
   }
 
   const parquet::Encoding::type encoding() const { return encoding_; }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/plain-encoding-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding-test.cc b/src/parquet/encodings/plain-encoding-test.cc
index ca425dd..16862b8 100644
--- a/src/parquet/encodings/plain-encoding-test.cc
+++ b/src/parquet/encodings/plain-encoding-test.cc
@@ -43,15 +43,18 @@ TEST(BooleanTest, TestEncodeDecode) {
   PlainEncoder<Type::BOOLEAN> encoder(nullptr);
   PlainDecoder<Type::BOOLEAN> decoder(nullptr);
 
-  std::vector<uint8_t> encode_buffer(nbytes);
+  InMemoryOutputStream dst;
+  encoder.Encode(draws, nvalues, &dst);
 
-  size_t encoded_bytes = encoder.Encode(draws, nvalues, &encode_buffer[0]);
-  ASSERT_EQ(nbytes, encoded_bytes);
+  std::vector<uint8_t> encode_buffer;
+  dst.Transfer(&encode_buffer);
+
+  ASSERT_EQ(nbytes, encode_buffer.size());
 
   std::vector<uint8_t> decode_buffer(nbytes);
   const uint8_t* decode_data = &decode_buffer[0];
 
-  decoder.SetData(nvalues, &encode_buffer[0], encoded_bytes);
+  decoder.SetData(nvalues, &encode_buffer[0], encode_buffer.size());
   size_t values_decoded = decoder.Decode(&decode_buffer[0], nvalues);
   ASSERT_EQ(nvalues, values_decoded);
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index 03f5940..a450eb4 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -147,7 +147,7 @@ class PlainEncoder : public Encoder<TYPE> {
   explicit PlainEncoder(const ColumnDescriptor* descr) :
       Encoder<TYPE>(descr, parquet::Encoding::PLAIN) {}
 
-  virtual size_t Encode(const T* src, int num_values, uint8_t* dst);
+  virtual void Encode(const T* src, int num_values, OutputStream* dst);
 };
 
 template <>
@@ -156,43 +156,46 @@ class PlainEncoder<Type::BOOLEAN> : public Encoder<Type::BOOLEAN> {
   explicit PlainEncoder(const ColumnDescriptor* descr) :
       Encoder<Type::BOOLEAN>(descr, parquet::Encoding::PLAIN) {}
 
-  virtual size_t Encode(const bool* src, int num_values, uint8_t* dst) {
+  virtual void Encode(const bool* src, int num_values, OutputStream* dst) {
     throw ParquetException("this API for encoding bools not implemented");
-    return 0;
   }
 
-  size_t Encode(const std::vector<bool>& src, int num_values,
-      uint8_t* dst) {
+  void Encode(const std::vector<bool>& src, int num_values, OutputStream* dst) {
     size_t bytes_required = BitUtil::RoundUp(num_values, 8) / 8;
-    BitWriter bit_writer(dst, bytes_required);
+
+    // TODO(wesm)
+    // Use a temporary buffer for now and copy, because the BitWriter is not
+    // aware of OutputStream. Later we can add some kind of Request/Flush API
+    // to OutputStream
+    std::vector<uint8_t> tmp_buffer(bytes_required);
+
+    BitWriter bit_writer(&tmp_buffer[0], bytes_required);
     for (size_t i = 0; i < num_values; ++i) {
       bit_writer.PutValue(src[i], 1);
     }
     bit_writer.Flush();
-    return bit_writer.bytes_written();
+
+    // Write the result to the output stream
+    dst->Write(bit_writer.buffer(), bit_writer.bytes_written());
   }
 };
 
 template <int TYPE>
-inline size_t PlainEncoder<TYPE>::Encode(const T* buffer, int num_values,
-    uint8_t* dst) {
-  size_t nbytes = num_values * sizeof(T);
-  memcpy(dst, buffer, nbytes);
-  return nbytes;
+inline void PlainEncoder<TYPE>::Encode(const T* buffer, int num_values,
+    OutputStream* dst) {
+  dst->Write(reinterpret_cast<const uint8_t*>(buffer), num_values * sizeof(T));
 }
 
 template <>
-inline size_t PlainEncoder<Type::BYTE_ARRAY>::Encode(const ByteArray* src,
-    int num_values, uint8_t* dst) {
+inline void PlainEncoder<Type::BYTE_ARRAY>::Encode(const ByteArray* src,
+    int num_values, OutputStream* dst) {
   ParquetException::NYI("byte array encoding");
-  return 0;
 }
 
 template <>
-inline size_t PlainEncoder<Type::FIXED_LEN_BYTE_ARRAY>::Encode(
-    const FixedLenByteArray* src, int num_values, uint8_t* dst) {
+inline void PlainEncoder<Type::FIXED_LEN_BYTE_ARRAY>::Encode(
+    const FixedLenByteArray* src, int num_values, OutputStream* dst) {
   ParquetException::NYI("FLBA encoding");
-  return 0;
 }
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index 84a32f3..7030d0e 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -29,6 +29,8 @@
 #include "parquet/exception.h"
 #include "parquet/reader.h"
 #include "parquet/column/reader.h"
-#include "parquet/util/input_stream.h"
+
+#include "parquet/util/input.h"
+#include "parquet/util/output.h"
 
 #endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index ffc882c..8da8b99 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -25,6 +25,7 @@
 #include "parquet/reader.h"
 #include "parquet/column/reader.h"
 #include "parquet/column/scanner.h"
+#include "parquet/util/input.h"
 
 using std::string;
 
@@ -47,7 +48,7 @@ class TestAllTypesPlain : public ::testing::Test {
   void TearDown() {}
 
  protected:
-  LocalFile file_;
+  LocalFileSource file_;
   ParquetFileReader reader_;
 };
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index 2f30ebf..3fcce90 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -31,7 +31,6 @@
 #include "parquet/exception.h"
 #include "parquet/schema/converter.h"
 #include "parquet/thrift/util.h"
-#include "parquet/util/input_stream.h"
 
 using std::string;
 using std::vector;
@@ -39,48 +38,6 @@ using std::vector;
 namespace parquet_cpp {
 
 // ----------------------------------------------------------------------
-// LocalFile methods
-
-LocalFile::~LocalFile() {
-  CloseFile();
-}
-
-void LocalFile::Open(const std::string& path) {
-  path_ = path;
-  file_ = fopen(path_.c_str(), "r");
-  is_open_ = true;
-}
-
-void LocalFile::Close() {
-  // Pure virtual
-  CloseFile();
-}
-
-void LocalFile::CloseFile() {
-  if (is_open_) {
-    fclose(file_);
-    is_open_ = false;
-  }
-}
-
-size_t LocalFile::Size() {
-  fseek(file_, 0L, SEEK_END);
-  return Tell();
-}
-
-void LocalFile::Seek(size_t pos) {
-  fseek(file_, pos, SEEK_SET);
-}
-
-size_t LocalFile::Tell() {
-  return ftell(file_);
-}
-
-size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) {
-  return fread(buffer, 1, nbytes, file_);
-}
-
-// ----------------------------------------------------------------------
 // RowGroupReader
 
 std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
@@ -102,7 +59,7 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
   std::unique_ptr<InputStream> input(
       new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
 
-  FileLike* source = this->parent_->buffer_;
+  RandomAccessSource* source = this->parent_->buffer_;
 
   source->Seek(col_start);
 
@@ -141,7 +98,7 @@ ParquetFileReader::ParquetFileReader() :
 
 ParquetFileReader::~ParquetFileReader() {}
 
-void ParquetFileReader::Open(FileLike* buffer) {
+void ParquetFileReader::Open(RandomAccessSource* buffer) {
   buffer_ = buffer;
 }
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/reader.h b/src/parquet/reader.h
index ea23182..3a9dc5d 100644
--- a/src/parquet/reader.h
+++ b/src/parquet/reader.h
@@ -27,53 +27,12 @@
 #include "parquet/thrift/parquet_types.h"
 
 #include "parquet/types.h"
-
 #include "parquet/schema/descriptor.h"
+#include "parquet/util/input.h"
 
 namespace parquet_cpp {
 
 class ColumnReader;
-
-class FileLike {
- public:
-  virtual ~FileLike() {}
-
-  virtual void Close() = 0;
-  virtual size_t Size() = 0;
-  virtual size_t Tell() = 0;
-  virtual void Seek(size_t pos) = 0;
-
-  // Returns actual number of bytes read
-  virtual size_t Read(size_t nbytes, uint8_t* out) = 0;
-};
-
-
-class LocalFile : public FileLike {
- public:
-  LocalFile() : file_(nullptr), is_open_(false) {}
-  virtual ~LocalFile();
-
-  void Open(const std::string& path);
-
-  virtual void Close();
-  virtual size_t Size();
-  virtual size_t Tell();
-  virtual void Seek(size_t pos);
-
-  // Returns actual number of bytes read
-  virtual size_t Read(size_t nbytes, uint8_t* out);
-
-  bool is_open() const { return is_open_;}
-  const std::string& path() const { return path_;}
-
- private:
-  void CloseFile();
-
-  std::string path_;
-  FILE* file_;
-  bool is_open_;
-};
-
 class ParquetFileReader;
 
 class RowGroupReader {
@@ -112,7 +71,7 @@ class ParquetFileReader {
 
   // This class does _not_ take ownership of the file. You must manage its
   // lifetime separately
-  void Open(FileLike* buffer);
+  void Open(RandomAccessSource* buffer);
 
   void Close();
 
@@ -150,7 +109,7 @@ class ParquetFileReader {
   // Row group index -> RowGroupReader
   std::unordered_map<int, std::shared_ptr<RowGroupReader> > row_group_readers_;
 
-  FileLike* buffer_;
+  RandomAccessSource* buffer_;
 };
 
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/util/CMakeLists.txt b/src/parquet/util/CMakeLists.txt
index 046a7c9..504069f 100644
--- a/src/parquet/util/CMakeLists.txt
+++ b/src/parquet/util/CMakeLists.txt
@@ -27,11 +27,13 @@ install(FILES
   macros.h
   rle-encoding.h
   stopwatch.h
-  input_stream.h
+  input.h
+  output.h
   DESTINATION include/parquet/util)
 
 add_library(parquet_util STATIC
-  input_stream.cc
+  input.cc
+  output.cc
   cpu-info.cc
 )
 
@@ -54,4 +56,5 @@ if(PARQUET_BUILD_TESTS)
 endif()
 
 ADD_PARQUET_TEST(bit-util-test)
+ADD_PARQUET_TEST(output-test)
 ADD_PARQUET_TEST(rle-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.cc b/src/parquet/util/input.cc
new file mode 100644
index 0000000..0e4b833
--- /dev/null
+++ b/src/parquet/util/input.cc
@@ -0,0 +1,110 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/input.h"
+
+#include <algorithm>
+#include <string>
+
+#include "parquet/exception.h"
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// LocalFileSource
+
+LocalFileSource::~LocalFileSource() {
+  CloseFile();
+}
+
+void LocalFileSource::Open(const std::string& path) {
+  path_ = path;
+  file_ = fopen(path_.c_str(), "r");
+  is_open_ = true;
+}
+
+void LocalFileSource::Close() {
+  // Pure virtual
+  CloseFile();
+}
+
+void LocalFileSource::CloseFile() {
+  if (is_open_) {
+    fclose(file_);
+    is_open_ = false;
+  }
+}
+
+size_t LocalFileSource::Size() {
+  fseek(file_, 0L, SEEK_END);
+  return Tell();
+}
+
+void LocalFileSource::Seek(size_t pos) {
+  fseek(file_, pos, SEEK_SET);
+}
+
+size_t LocalFileSource::Tell() {
+  return ftell(file_);
+}
+
+size_t LocalFileSource::Read(size_t nbytes, uint8_t* buffer) {
+  return fread(buffer, 1, nbytes, file_);
+}
+
+// ----------------------------------------------------------------------
+// InMemoryInputStream
+
+InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
+    buffer_(buffer), len_(len), offset_(0) {}
+
+const uint8_t* InMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+  *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
+  return buffer_ + offset_;
+}
+
+const uint8_t* InMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+  const uint8_t* result = Peek(num_to_read, num_bytes);
+  offset_ += *num_bytes;
+  return result;
+}
+
+// ----------------------------------------------------------------------
+// ScopedInMemoryInputStream:: like InMemoryInputStream but owns its memory
+
+ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
+  buffer_.resize(len);
+  stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
+}
+
+uint8_t* ScopedInMemoryInputStream::data() {
+  return buffer_.data();
+}
+
+int64_t ScopedInMemoryInputStream::size() {
+  return buffer_.size();
+}
+
+const uint8_t* ScopedInMemoryInputStream::Peek(int64_t num_to_peek, int64_t* num_bytes) {
+  return stream_->Peek(num_to_peek, num_bytes);
+}
+
+const uint8_t* ScopedInMemoryInputStream::Read(int64_t num_to_read, int64_t* num_bytes) {
+  return stream_->Read(num_to_read, num_bytes);
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input.h b/src/parquet/util/input.h
new file mode 100644
index 0000000..4fd9cd7
--- /dev/null
+++ b/src/parquet/util/input.h
@@ -0,0 +1,128 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_INPUT_H
+#define PARQUET_UTIL_INPUT_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+#include <vector>
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// Random access input (e.g. file-like)
+
+// Random
+class RandomAccessSource {
+ public:
+  virtual ~RandomAccessSource() {}
+
+  virtual void Close() = 0;
+  virtual size_t Size() = 0;
+  virtual size_t Tell() = 0;
+  virtual void Seek(size_t pos) = 0;
+
+  // Returns actual number of bytes read
+  virtual size_t Read(size_t nbytes, uint8_t* out) = 0;
+};
+
+
+class LocalFileSource : public RandomAccessSource {
+ public:
+  LocalFileSource() : file_(nullptr), is_open_(false) {}
+  virtual ~LocalFileSource();
+
+  void Open(const std::string& path);
+
+  virtual void Close();
+  virtual size_t Size();
+  virtual size_t Tell();
+  virtual void Seek(size_t pos);
+
+  // Returns actual number of bytes read
+  virtual size_t Read(size_t nbytes, uint8_t* out);
+
+  bool is_open() const { return is_open_;}
+  const std::string& path() const { return path_;}
+
+ private:
+  void CloseFile();
+
+  std::string path_;
+  FILE* file_;
+  bool is_open_;
+};
+
+// ----------------------------------------------------------------------
+// Streaming input interfaces
+
+// Interface for the column reader to get the bytes. The interface is a stream
+// interface, meaning the bytes in order and once a byte is read, it does not
+// need to be read again.
+class InputStream {
+ public:
+  // Returns the next 'num_to_peek' without advancing the current position.
+  // *num_bytes will contain the number of bytes returned which can only be
+  // less than num_to_peek at end of stream cases.
+  // Since the position is not advanced, calls to this function are idempotent.
+  // The buffer returned to the caller is still owned by the input stream and must
+  // stay valid until the next call to Peek() or Read().
+  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes) = 0;
+
+  // Identical to Peek(), except the current position in the stream is advanced by
+  // *num_bytes.
+  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes) = 0;
+
+  virtual ~InputStream() {}
+
+ protected:
+  InputStream() {}
+};
+
+// Implementation of an InputStream when all the bytes are in memory.
+class InMemoryInputStream : public InputStream {
+ public:
+  InMemoryInputStream(const uint8_t* buffer, int64_t len);
+  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+ private:
+  const uint8_t* buffer_;
+  int64_t len_;
+  int64_t offset_;
+};
+
+
+// A wrapper for InMemoryInputStream to manage the memory.
+class ScopedInMemoryInputStream : public InputStream {
+ public:
+  explicit ScopedInMemoryInputStream(int64_t len);
+  uint8_t* data();
+  int64_t size();
+  virtual const uint8_t* Peek(int64_t num_to_peek, int64_t* num_bytes);
+  virtual const uint8_t* Read(int64_t num_to_read, int64_t* num_bytes);
+
+ private:
+  std::vector<uint8_t> buffer_;
+  std::unique_ptr<InMemoryInputStream> stream_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_INPUT_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input_stream.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/input_stream.cc b/src/parquet/util/input_stream.cc
deleted file mode 100644
index 281a342..0000000
--- a/src/parquet/util/input_stream.cc
+++ /dev/null
@@ -1,63 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/util/input_stream.h"
-
-#include <algorithm>
-
-#include "parquet/exception.h"
-
-namespace parquet_cpp {
-
-InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
-    buffer_(buffer), len_(len), offset_(0) {}
-
-const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) {
-  *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
-  return buffer_ + offset_;
-}
-
-const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) {
-  const uint8_t* result = Peek(num_to_read, num_bytes);
-  offset_ += *num_bytes;
-  return result;
-}
-
-ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
-  buffer_.resize(len);
-  stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
-}
-
-uint8_t* ScopedInMemoryInputStream::data() {
-  return buffer_.data();
-}
-
-int64_t ScopedInMemoryInputStream::size() {
-  return buffer_.size();
-}
-
-const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek,
-                                               int* num_bytes) {
-  return stream_->Peek(num_to_peek, num_bytes);
-}
-
-const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read,
-                                               int* num_bytes) {
-  return stream_->Read(num_to_read, num_bytes);
-}
-
-} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/input_stream.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/input_stream.h b/src/parquet/util/input_stream.h
deleted file mode 100644
index ece2488..0000000
--- a/src/parquet/util/input_stream.h
+++ /dev/null
@@ -1,80 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_INPUT_STREAM_H
-#define PARQUET_INPUT_STREAM_H
-
-#include <cstdint>
-#include <memory>
-#include <vector>
-
-namespace parquet_cpp {
-
-// Interface for the column reader to get the bytes. The interface is a stream
-// interface, meaning the bytes in order and once a byte is read, it does not
-// need to be read again.
-class InputStream {
- public:
-  // Returns the next 'num_to_peek' without advancing the current position.
-  // *num_bytes will contain the number of bytes returned which can only be
-  // less than num_to_peek at end of stream cases.
-  // Since the position is not advanced, calls to this function are idempotent.
-  // The buffer returned to the caller is still owned by the input stream and must
-  // stay valid until the next call to Peek() or Read().
-  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0;
-
-  // Identical to Peek(), except the current position in the stream is advanced by
-  // *num_bytes.
-  virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0;
-
-  virtual ~InputStream() {}
-
- protected:
-  InputStream() {}
-};
-
-// Implementation of an InputStream when all the bytes are in memory.
-class InMemoryInputStream : public InputStream {
- public:
-  InMemoryInputStream(const uint8_t* buffer, int64_t len);
-  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
-  virtual const uint8_t* Read(int num_to_read, int* num_bytes);
-
- private:
-  const uint8_t* buffer_;
-  int64_t len_;
-  int64_t offset_;
-};
-
-
-// A wrapper for InMemoryInputStream to manage the memory.
-class ScopedInMemoryInputStream : public InputStream {
- public:
-  explicit ScopedInMemoryInputStream(int64_t len);
-  uint8_t* data();
-  int64_t size();
-  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
-  virtual const uint8_t* Read(int num_to_read, int* num_bytes);
-
- private:
-  std::vector<uint8_t> buffer_;
-  std::unique_ptr<InMemoryInputStream> stream_;
-};
-
-} // namespace parquet_cpp
-
-#endif // PARQUET_INPUT_STREAM_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output-test.cc b/src/parquet/util/output-test.cc
new file mode 100644
index 0000000..84f5b57
--- /dev/null
+++ b/src/parquet/util/output-test.cc
@@ -0,0 +1,44 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <memory>
+
+#include <gtest/gtest.h>
+
+#include "parquet/util/output.h"
+#include "parquet/util/test-common.h"
+
+namespace parquet_cpp {
+
+TEST(TestInMemoryOutputStream, Basics) {
+  std::unique_ptr<InMemoryOutputStream> stream(new InMemoryOutputStream(8));
+
+  std::vector<uint8_t> data = {0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12};
+
+  stream->Write(&data[0], 4);
+  ASSERT_EQ(4, stream->Tell());
+  stream->Write(&data[4], data.size() - 4);
+
+  std::vector<uint8_t> out;
+  stream->Transfer(&out);
+
+  test::assert_vector_equal(data, out);
+
+  ASSERT_EQ(0, stream->Tell());
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output.cc
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.cc b/src/parquet/util/output.cc
new file mode 100644
index 0000000..9748a69
--- /dev/null
+++ b/src/parquet/util/output.cc
@@ -0,0 +1,73 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/util/output.h"
+
+#include <algorithm>
+#include <cstring>
+#include <sstream>
+
+#include "parquet/exception.h"
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// In-memory output stream
+
+static constexpr int64_t IN_MEMORY_DEFAULT_CAPACITY = 1024;
+
+InMemoryOutputStream::InMemoryOutputStream(int64_t initial_capacity) :
+    size_(0),
+    capacity_(initial_capacity) {
+  if (initial_capacity == 0) {
+    initial_capacity = IN_MEMORY_DEFAULT_CAPACITY;
+  }
+  buffer_.resize(initial_capacity);
+}
+
+InMemoryOutputStream::InMemoryOutputStream() :
+    InMemoryOutputStream(IN_MEMORY_DEFAULT_CAPACITY) {}
+
+uint8_t* InMemoryOutputStream::Head() {
+  return &buffer_[size_];
+}
+
+void InMemoryOutputStream::Write(const uint8_t* data, int64_t length) {
+  if (size_ + length > capacity_) {
+    int64_t new_capacity = capacity_ * 2;
+    while (new_capacity < size_ + length) {
+      new_capacity *= 2;
+    }
+    buffer_.resize(new_capacity);
+    capacity_ = new_capacity;
+  }
+  memcpy(Head(), data, length);
+  size_ += length;
+}
+
+int64_t InMemoryOutputStream::Tell() {
+  return size_;
+}
+
+void InMemoryOutputStream::Transfer(std::vector<uint8_t>* out) {
+  buffer_.resize(size_);
+  buffer_.swap(*out);
+  size_ = 0;
+  capacity_ = buffer_.size();
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/output.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/output.h b/src/parquet/util/output.h
new file mode 100644
index 0000000..e83b261
--- /dev/null
+++ b/src/parquet/util/output.h
@@ -0,0 +1,71 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_OUTPUT_H
+#define PARQUET_UTIL_OUTPUT_H
+
+#include <cstdint>
+#include <memory>
+#include <vector>
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// Output stream classes
+
+// Abstract output stream
+class OutputStream {
+ public:
+  // Close the output stream
+  virtual void Close() = 0;
+
+  // Return the current position in the output stream relative to the start
+  virtual int64_t Tell() = 0;
+
+  // Copy bytes into the output stream
+  virtual void Write(const uint8_t* data, int64_t length) = 0;
+};
+
+
+// An output stream that is an in-memory
+class InMemoryOutputStream : public OutputStream {
+ public:
+  InMemoryOutputStream();
+  explicit InMemoryOutputStream(int64_t initial_capacity);
+
+  // Close is currently a no-op with the in-memory stream
+  virtual void Close() {}
+
+  virtual int64_t Tell();
+
+  virtual void Write(const uint8_t* data, int64_t length);
+
+  // Hand off the in-memory data to a (preferably-empty) std::vector owner
+  void Transfer(std::vector<uint8_t>* out);
+
+ private:
+  // Mutable pointer to the current write position in the stream
+  uint8_t* Head();
+
+  std::vector<uint8_t> buffer_;
+  int64_t size_;
+  int64_t capacity_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_OUTPUT_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/c11e7d48/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
index 3cf82f5..84519d6 100644
--- a/src/parquet/util/test-common.h
+++ b/src/parquet/util/test-common.h
@@ -29,6 +29,16 @@ namespace parquet_cpp {
 namespace test {
 
 template <typename T>
+static inline void assert_vector_equal(const vector<T>& left,
+    const vector<T>& right) {
+  ASSERT_EQ(left.size(), right.size());
+
+  for (size_t i = 0; i < left.size(); ++i) {
+    ASSERT_EQ(left[i], right[i]) << i;
+  }
+}
+
+template <typename T>
 static inline bool vector_equal(const vector<T>& left, const vector<T>& right) {
   if (left.size() != right.size()) {
     return false;
@@ -47,6 +57,19 @@ static inline bool vector_equal(const vector<T>& left, const vector<T>& right) {
   return true;
 }
 
+template <typename T>
+static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
+  if (end < start) {
+    return vector<T>(0);
+  }
+
+  vector<T> out(end - start);
+  for (size_t i = start; i < end; ++i) {
+    out[i - start] = values[i];
+  }
+  return out;
+}
+
 static inline vector<bool> flip_coins_seed(size_t n, double p, uint32_t seed) {
   std::mt19937 gen(seed);
   std::bernoulli_distribution d(p);