You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by no...@apache.org on 2016/02/02 19:50:19 UTC

parquet-cpp git commit: PARQUET-485: Decouple page deserialization from column reader to facilitate unit testing

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 2b935ae96 -> 08088af76


PARQUET-485: Decouple page deserialization from column reader to facilitate unit testing

Several things in this patch

* Adds PageReader abstraction, and a SerializedPageReader implementation
  according to the Parquet file format
* Adds a MockPageReader and a couple unit tests demonstrating end-to-end test
  without creating a Parquet file
* Adds a DataPageBuilder test fixture tool, may become part of the main write
  path later
* Adds PlainEncoder implementation for a few primitive types
* Fixes a few ColumnReader bugs exposed by the unit tests

Author: Wes McKinney <we...@cloudera.com>

Closes #32 from wesm/PARQUET-485 and squashes the following commits:

aa33078 [Wes McKinney] Fix function doc
e897a81 [Wes McKinney] Restore NumRequiredBits function after rebase
ee4d97a [Wes McKinney] Change PageReader::NextPage API to return shared_ptr<Page>(nullptr) on eos
0324021 [Wes McKinney] Clarify some comments
ec871c4 [Wes McKinney] Add include guards
e63bbdd [Wes McKinney] Move vector_equal to util/test-common.h
44a78a1 [Wes McKinney] Refactor to decouple page deserialization from column reader so that mock data pages cna be constructed in unit tests.


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/08088af7
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/08088af7
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/08088af7

Branch: refs/heads/master
Commit: 08088af76ec2357318e045f0696901e2e6e79fbf
Parents: 2b935ae
Author: Wes McKinney <we...@cloudera.com>
Authored: Tue Feb 2 10:50:17 2016 -0800
Committer: Nong Li <no...@gmail.com>
Committed: Tue Feb 2 10:50:17 2016 -0800

----------------------------------------------------------------------
 CMakeLists.txt                           |   2 +
 src/parquet/column/CMakeLists.txt        |   4 +
 src/parquet/column/column-reader-test.cc | 165 +++++++++++++++++++++++
 src/parquet/column/page.h                | 132 ++++++++++++++++++
 src/parquet/column/reader.cc             | 164 +++++++++++------------
 src/parquet/column/reader.h              |  80 +++++------
 src/parquet/column/serialized-page.cc    | 103 ++++++++++++++
 src/parquet/column/serialized-page.h     |  61 +++++++++
 src/parquet/column/test-util.h           | 184 ++++++++++++++++++++++++++
 src/parquet/encodings/encodings.h        |  34 +++++
 src/parquet/encodings/plain-encoding.h   |  56 +++++++-
 src/parquet/reader.cc                    |  13 +-
 src/parquet/reader.h                     |   4 +
 src/parquet/util/bit-util.h              |   8 ++
 src/parquet/util/test-common.h           |  53 ++++++++
 15 files changed, 924 insertions(+), 139 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index 173d676..d379e3d 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -214,6 +214,7 @@ set(PARQUET_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS})
 # Library config
 
 set(LIBPARQUET_SRCS
+  src/parquet/column/serialized-page.cc
   src/parquet/column/reader.cc
   src/parquet/column/scanner.cc
   src/parquet/reader.cc
@@ -246,6 +247,7 @@ if(APPLE)
 endif()
 
 add_subdirectory(src/parquet)
+add_subdirectory(src/parquet/column)
 add_subdirectory(src/parquet/compression)
 add_subdirectory(src/parquet/encodings)
 add_subdirectory(src/parquet/thrift)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
index 20f6167..7eb334e 100644
--- a/src/parquet/column/CMakeLists.txt
+++ b/src/parquet/column/CMakeLists.txt
@@ -17,6 +17,10 @@
 
 # Headers: top level
 install(FILES
+  page.h
   reader.h
+  serialized-page.h
   scanner.h
   DESTINATION include/parquet/column)
+
+ADD_PARQUET_TEST(column-reader-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/column-reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/column-reader-test.cc b/src/parquet/column/column-reader-test.cc
new file mode 100644
index 0000000..88f4465
--- /dev/null
+++ b/src/parquet/column/column-reader-test.cc
@@ -0,0 +1,165 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <cstdint>
+#include <cstdlib>
+#include <iostream>
+#include <sstream>
+#include <string>
+#include <vector>
+
+#include <gtest/gtest.h>
+
+#include "parquet/types.h"
+#include "parquet/column/page.h"
+#include "parquet/column/reader.h"
+#include "parquet/column/test-util.h"
+
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+using std::shared_ptr;
+using parquet::FieldRepetitionType;
+using parquet::SchemaElement;
+using parquet::Encoding;
+using parquet::Type;
+
+namespace parquet_cpp {
+
+namespace test {
+
+class TestPrimitiveReader : public ::testing::Test {
+ public:
+  void SetUp() {}
+
+  void TearDown() {}
+
+  void InitReader(const SchemaElement* element) {
+    pager_.reset(new test::MockPageReader(pages_));
+    reader_ = ColumnReader::Make(element, std::move(pager_));
+  }
+
+ protected:
+  std::shared_ptr<ColumnReader> reader_;
+  std::unique_ptr<PageReader> pager_;
+  vector<shared_ptr<Page> > pages_;
+};
+
+template <typename T>
+static vector<T> slice(const vector<T>& values, size_t start, size_t end) {
+  if (end < start) {
+    return vector<T>(0);
+  }
+
+  vector<T> out(end - start);
+  for (size_t i = start; i < end; ++i) {
+    out[i - start] = values[i];
+  }
+  return out;
+}
+
+
+TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
+  vector<int32_t> values = {1, 2, 3, 4, 5, 6, 7, 8, 9, 10};
+  size_t num_values = values.size();
+  Encoding::type value_encoding = Encoding::PLAIN;
+
+  vector<uint8_t> page1;
+  test::DataPageBuilder<Type::INT32> page_builder(&page1);
+  page_builder.AppendValues(values, Encoding::PLAIN);
+  pages_.push_back(page_builder.Finish());
+
+  // TODO: simplify this
+  SchemaElement element;
+  element.__set_type(Type::INT32);
+  element.__set_repetition_type(FieldRepetitionType::REQUIRED);
+  InitReader(&element);
+
+  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+
+  vector<int32_t> result(10, -1);
+
+  size_t values_read = 0;
+  size_t batch_actual = reader->ReadBatch(10, nullptr, nullptr,
+      &result[0], &values_read);
+  ASSERT_EQ(10, batch_actual);
+  ASSERT_EQ(10, values_read);
+
+  ASSERT_TRUE(vector_equal(result, values));
+}
+
+TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
+  vector<int32_t> values = {1, 2, 3, 4, 5};
+  vector<int16_t> def_levels = {1, 0, 0, 1, 1, 0, 0, 0, 1, 1};
+
+  size_t num_values = values.size();
+  Encoding::type value_encoding = Encoding::PLAIN;
+
+  vector<uint8_t> page1;
+  test::DataPageBuilder<Type::INT32> page_builder(&page1);
+
+  // Definition levels precede the values
+  page_builder.AppendDefLevels(def_levels, 1, Encoding::RLE);
+  page_builder.AppendValues(values, Encoding::PLAIN);
+
+  pages_.push_back(page_builder.Finish());
+
+  // TODO: simplify this
+  SchemaElement element;
+  element.__set_type(Type::INT32);
+  element.__set_repetition_type(FieldRepetitionType::OPTIONAL);
+  InitReader(&element);
+
+  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+
+  std::vector<int32_t> vexpected;
+  std::vector<int16_t> dexpected;
+
+  size_t values_read = 0;
+  size_t batch_actual = 0;
+
+  vector<int32_t> vresult(3, -1);
+  vector<int16_t> dresult(5, -1);
+
+  batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
+      &vresult[0], &values_read);
+  ASSERT_EQ(5, batch_actual);
+  ASSERT_EQ(3, values_read);
+
+  ASSERT_TRUE(vector_equal(vresult, slice(values, 0, 3)));
+  ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 0, 5)));
+
+  batch_actual = reader->ReadBatch(5, &dresult[0], nullptr,
+      &vresult[0], &values_read);
+  ASSERT_EQ(5, batch_actual);
+  ASSERT_EQ(2, values_read);
+
+  ASSERT_TRUE(vector_equal(slice(vresult, 0, 2), slice(values, 3, 5)));
+  ASSERT_TRUE(vector_equal(dresult, slice(def_levels, 5, 10)));
+
+  // EOS, pass all nullptrs to check for improper writes. Do not segfault /
+  // core dump
+  batch_actual = reader->ReadBatch(5, nullptr, nullptr,
+      nullptr, &values_read);
+  ASSERT_EQ(0, batch_actual);
+  ASSERT_EQ(0, values_read);
+}
+
+} // namespace test
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/page.h b/src/parquet/column/page.h
new file mode 100644
index 0000000..46f5d62
--- /dev/null
+++ b/src/parquet/column/page.h
@@ -0,0 +1,132 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_PAGE_H
+#define PARQUET_COLUMN_PAGE_H
+
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+// Note: Copying the specific page header Thrift metadata to the Page object
+// (instead of using a pointer) presently so that data pages can be
+// decompressed and processed in parallel. We can turn the header members of
+// these classes into pointers at some point, but the downside is that
+// applications materializing multiple data pages at once will have to have a
+// data container that manages the lifetime of the deserialized
+// parquet::PageHeader structs.
+//
+// TODO: Parallel processing is not yet safe because of memory-ownership
+// semantics (the PageReader may or may not own the memory referenced by a
+// page)
+class Page {
+  // TODO(wesm): In the future Parquet implementations may store the crc code
+  // in parquet::PageHeader. parquet-mr currently does not, so we also skip it
+  // here, both on the read and write path
+ public:
+  Page(const uint8_t* buffer, size_t buffer_size, parquet::PageType::type type) :
+      buffer_(buffer),
+      buffer_size_(buffer_size),
+      type_(type) {}
+
+  parquet::PageType::type type() const {
+    return type_;
+  }
+
+  // @returns: a pointer to the page's data
+  const uint8_t* data() const {
+    return buffer_;
+  }
+
+  // @returns: the total size in bytes of the page's data buffer
+  size_t size() const {
+    return buffer_size_;
+  }
+
+ private:
+  const uint8_t* buffer_;
+  size_t buffer_size_;
+
+  parquet::PageType::type type_;
+};
+
+
+class DataPage : public Page {
+ public:
+  DataPage(const uint8_t* buffer, size_t buffer_size,
+      const parquet::DataPageHeader& header) :
+      Page(buffer, buffer_size, parquet::PageType::DATA_PAGE),
+      header_(header) {}
+
+  size_t num_values() const {
+    return header_.num_values;
+  }
+
+  parquet::Encoding::type encoding() const {
+    return header_.encoding;
+  }
+
+ private:
+  parquet::DataPageHeader header_;
+};
+
+
+class DataPageV2 : public Page {
+ public:
+  DataPageV2(const uint8_t* buffer, size_t buffer_size,
+      const parquet::DataPageHeaderV2& header) :
+      Page(buffer, buffer_size, parquet::PageType::DATA_PAGE_V2),
+      header_(header) {}
+
+ private:
+  parquet::DataPageHeaderV2 header_;
+};
+
+
+class DictionaryPage : public Page {
+ public:
+  DictionaryPage(const uint8_t* buffer, size_t buffer_size,
+      const parquet::DictionaryPageHeader& header) :
+      Page(buffer, buffer_size, parquet::PageType::DICTIONARY_PAGE),
+      header_(header) {}
+
+  size_t num_values() const {
+    return header_.num_values;
+  }
+
+ private:
+  parquet::DictionaryPageHeader header_;
+};
+
+// Abstract page iterator interface. This way, we can feed column pages to the
+// ColumnReader through whatever mechanism we choose
+class PageReader {
+ public:
+  virtual ~PageReader() {}
+
+  // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
+  // containing new Page otherwise
+  virtual std::shared_ptr<Page> NextPage() = 0;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_PAGE_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
index edfea49..91e026a 100644
--- a/src/parquet/column/reader.cc
+++ b/src/parquet/column/reader.cc
@@ -18,45 +18,62 @@
 #include "parquet/column/reader.h"
 
 #include <algorithm>
+#include <memory>
 #include <string>
 #include <string.h>
 
-#include "parquet/compression/codec.h"
-#include "parquet/encodings/encodings.h"
-#include "parquet/thrift/util.h"
-#include "parquet/util/input_stream.h"
+#include "parquet/column/page.h"
 
-const int DATA_PAGE_SIZE = 64 * 1024;
+#include "parquet/encodings/encodings.h"
 
 namespace parquet_cpp {
 
-using parquet::CompressionCodec;
 using parquet::Encoding;
 using parquet::FieldRepetitionType;
 using parquet::PageType;
 using parquet::Type;
 
-ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
-    const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream)
-  : metadata_(metadata),
-    schema_(schema),
-    stream_(std::move(stream)),
+ColumnReader::ColumnReader(const parquet::SchemaElement* schema,
+    std::unique_ptr<PageReader> pager)
+  : schema_(schema),
+    pager_(std::move(pager)),
     num_buffered_values_(0),
-    num_decoded_values_(0) {
-  switch (metadata->codec) {
-    case CompressionCodec::UNCOMPRESSED:
-      break;
-    case CompressionCodec::SNAPPY:
-      decompressor_.reset(new SnappyCodec());
-      break;
-    default:
-      ParquetException::NYI("Reading compressed data");
+    num_decoded_values_(0) {}
+
+template <int TYPE>
+void TypedColumnReader<TYPE>::ConfigureDictionary(const DictionaryPage* page) {
+  auto it = decoders_.find(Encoding::RLE_DICTIONARY);
+  if (it != decoders_.end()) {
+    throw ParquetException("Column cannot have more than one dictionary.");
   }
 
-  config_ = Config::DefaultConfig();
+  PlainDecoder<TYPE> dictionary(schema_);
+  dictionary.SetData(page->num_values(), page->data(), page->size());
+
+  // The dictionary is fully decoded during DictionaryDecoder::Init, so the
+  // DictionaryPage buffer is no longer required after this step
+  //
+  // TODO(wesm): investigate whether this all-or-nothing decoding of the
+  // dictionary makes sense and whether performance can be improved
+  std::shared_ptr<DecoderType> decoder(
+      new DictionaryDecoder<TYPE>(schema_, &dictionary));
+
+  decoders_[Encoding::RLE_DICTIONARY] = decoder;
+  current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
 }
 
 
+static size_t InitializeLevelDecoder(const uint8_t* buffer,
+    int16_t max_level, std::unique_ptr<RleDecoder>& decoder) {
+  int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
+
+  decoder.reset(new RleDecoder(buffer + sizeof(uint32_t),
+          num_definition_bytes,
+          BitUtil::NumRequiredBits(max_level)));
+
+  return sizeof(uint32_t) + num_definition_bytes;
+}
+
 // PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
 // encoding.
 static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
@@ -66,68 +83,44 @@ static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
 template <int TYPE>
 bool TypedColumnReader<TYPE>::ReadNewPage() {
   // Loop until we find the next data page.
+  const uint8_t* buffer;
 
   while (true) {
-    int bytes_read = 0;
-    const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
-    if (bytes_read == 0) return false;
-    uint32_t header_size = bytes_read;
-    DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
-    stream_->Read(header_size, &bytes_read);
-
-    int compressed_len = current_page_header_.compressed_page_size;
-    int uncompressed_len = current_page_header_.uncompressed_page_size;
-
-    // Read the compressed data page.
-    buffer = stream_->Read(compressed_len, &bytes_read);
-    if (bytes_read != compressed_len) ParquetException::EofException();
-
-    // Uncompress it if we need to
-    if (decompressor_ != NULL) {
-      // Grow the uncompressed buffer if we need to.
-      if (uncompressed_len > decompression_buffer_.size()) {
-        decompression_buffer_.resize(uncompressed_len);
-      }
-      decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
-          &decompression_buffer_[0]);
-      buffer = &decompression_buffer_[0];
+    current_page_ = pager_->NextPage();
+    if (!current_page_) {
+      // EOS
+      return false;
     }
 
-    if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
-      auto it = decoders_.find(Encoding::RLE_DICTIONARY);
-      if (it != decoders_.end()) {
-        throw ParquetException("Column cannot have more than one dictionary.");
-      }
-
-      PlainDecoder<TYPE> dictionary(schema_);
-      dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
-          buffer, uncompressed_len);
-      std::shared_ptr<DecoderType> decoder(
-          new DictionaryDecoder<TYPE>(schema_, &dictionary));
-
-      decoders_[Encoding::RLE_DICTIONARY] = decoder;
-      current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
+    if (current_page_->type() == PageType::DICTIONARY_PAGE) {
+      ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
       continue;
-    } else if (current_page_header_.type == PageType::DATA_PAGE) {
+    } else if (current_page_->type() == PageType::DATA_PAGE) {
+      const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+
       // Read a data page.
-      num_buffered_values_ = current_page_header_.data_page_header.num_values;
+      num_buffered_values_ = page->num_values();
 
       // Have not decoded any values from the data page yet
       num_decoded_values_ = 0;
 
+      buffer = page->data();
+
+      // If the data page includes repetition and definition levels, we
+      // initialize the level decoder and subtract the encoded level bytes from
+      // the page size to determine the number of bytes in the encoded data.
+      size_t data_size = page->size();
+
       // Read definition levels.
       if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
-        int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
-
-        // Temporary hack until schema resolution
+        // Temporary hack until schema resolution implemented
         max_definition_level_ = 1;
 
-        buffer += sizeof(uint32_t);
-        definition_level_decoder_.reset(
-            new RleDecoder(buffer, num_definition_bytes, 1));
-        buffer += num_definition_bytes;
-        uncompressed_len -= sizeof(uint32_t);
-        uncompressed_len -= num_definition_bytes;
+        size_t def_levels_bytes = InitializeLevelDecoder(buffer,
+            max_definition_level_, definition_level_decoder_);
+
+        buffer += def_levels_bytes;
+        data_size -= def_levels_bytes;
       } else {
         // REQUIRED field
         max_definition_level_ = 0;
@@ -137,7 +130,8 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
 
       // Get a decoder object for this page or create a new decoder if this is the
       // first page with this encoding.
-      Encoding::type encoding = current_page_header_.data_page_header.encoding;
+      Encoding::type encoding = page->encoding();
+
       if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
 
       auto it = decoders_.find(encoding);
@@ -163,10 +157,11 @@ bool TypedColumnReader<TYPE>::ReadNewPage() {
             throw ParquetException("Unknown encoding type.");
         }
       }
-      current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
+      current_decoder_->SetData(num_buffered_values_, buffer, data_size);
       return true;
     } else {
-      // We don't know what this page type is. We're allowed to skip non-data pages.
+      // We don't know what this page type is. We're allowed to skip non-data
+      // pages.
       continue;
     }
   }
@@ -206,27 +201,26 @@ size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) {
 // ----------------------------------------------------------------------
 // Dynamic column reader constructor
 
-std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
-    const parquet::SchemaElement* element, std::unique_ptr<InputStream> stream) {
-  switch (metadata->type) {
+std::shared_ptr<ColumnReader> ColumnReader::Make(
+    const parquet::SchemaElement* element,
+    std::unique_ptr<PageReader> pager) {
+  switch (element->type) {
     case Type::BOOLEAN:
-      return std::make_shared<BoolReader>(metadata, element, std::move(stream));
+      return std::make_shared<BoolReader>(element, std::move(pager));
     case Type::INT32:
-      return std::make_shared<Int32Reader>(metadata, element, std::move(stream));
+      return std::make_shared<Int32Reader>(element, std::move(pager));
     case Type::INT64:
-      return std::make_shared<Int64Reader>(metadata, element, std::move(stream));
+      return std::make_shared<Int64Reader>(element, std::move(pager));
     case Type::INT96:
-      return std::make_shared<Int96Reader>(metadata, element, std::move(stream));
+      return std::make_shared<Int96Reader>(element, std::move(pager));
     case Type::FLOAT:
-      return std::make_shared<FloatReader>(metadata, element, std::move(stream));
+      return std::make_shared<FloatReader>(element, std::move(pager));
     case Type::DOUBLE:
-      return std::make_shared<DoubleReader>(metadata, element, std::move(stream));
+      return std::make_shared<DoubleReader>(element, std::move(pager));
     case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayReader>(metadata, element,
-          std::move(stream));
+      return std::make_shared<ByteArrayReader>(element, std::move(pager));
     case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayReader>(metadata, element,
-          std::move(stream));
+      return std::make_shared<FixedLenByteArrayReader>(element, std::move(pager));
     default:
       ParquetException::NYI("type reader not implemented");
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
index 8f857c4..27ff678 100644
--- a/src/parquet/column/reader.h
+++ b/src/parquet/column/reader.h
@@ -28,9 +28,11 @@
 
 #include "parquet/exception.h"
 #include "parquet/types.h"
+
+#include "parquet/column/page.h"
+
 #include "parquet/thrift/parquet_constants.h"
 #include "parquet/thrift/parquet_types.h"
-#include "parquet/util/input_stream.h"
 #include "parquet/encodings/encodings.h"
 #include "parquet/util/rle-encoding.h"
 
@@ -52,21 +54,10 @@ class Scanner;
 
 class ColumnReader {
  public:
-  struct Config {
-    int batch_size;
-
-    static Config DefaultConfig() {
-      Config config;
-      config.batch_size = 128;
-      return config;
-    }
-  };
-
-  ColumnReader(const parquet::ColumnMetaData*, const parquet::SchemaElement*,
-      std::unique_ptr<InputStream> stream);
+  ColumnReader(const parquet::SchemaElement*, std::unique_ptr<PageReader>);
 
-  static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
-      const parquet::SchemaElement*, std::unique_ptr<InputStream> stream);
+  static std::shared_ptr<ColumnReader> Make(const parquet::SchemaElement*,
+      std::unique_ptr<PageReader>);
 
   // Returns true if there are still values in this column.
   bool HasNext() {
@@ -81,11 +72,7 @@ class ColumnReader {
   }
 
   parquet::Type::type type() const {
-    return metadata_->type;
-  }
-
-  const parquet::ColumnMetaData* metadata() const {
-    return metadata_;
+    return schema_->type;
   }
 
   const parquet::SchemaElement* schema() const {
@@ -105,17 +92,10 @@ class ColumnReader {
   // Returns the number of decoded repetition levels
   size_t ReadRepetitionLevels(size_t batch_size, int16_t* levels);
 
-  Config config_;
-
-  const parquet::ColumnMetaData* metadata_;
   const parquet::SchemaElement* schema_;
-  std::unique_ptr<InputStream> stream_;
 
-  // Compression codec to use.
-  std::unique_ptr<Codec> decompressor_;
-  std::vector<uint8_t> decompression_buffer_;
-
-  parquet::PageHeader current_page_header_;
+  std::unique_ptr<PageReader> pager_;
+  std::shared_ptr<Page> current_page_;
 
   // Not set if full schema for this field has no optional or repeated elements
   std::unique_ptr<RleDecoder> definition_level_decoder_;
@@ -145,12 +125,10 @@ class TypedColumnReader : public ColumnReader {
  public:
   typedef typename type_traits<TYPE>::value_type T;
 
-  TypedColumnReader(const parquet::ColumnMetaData* metadata,
-      const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) :
-      ColumnReader(metadata, schema, std::move(stream)),
+  TypedColumnReader(const parquet::SchemaElement* schema,
+      std::unique_ptr<PageReader> pager) :
+      ColumnReader(schema, std::move(pager)),
       current_decoder_(NULL) {
-    size_t value_byte_size = type_traits<TYPE>::value_byte_size;
-    values_buffer_.resize(config_.batch_size * value_byte_size);
   }
 
   // Read a batch of repetition levels, definition levels, and values from the
@@ -181,18 +159,20 @@ class TypedColumnReader : public ColumnReader {
   // @returns: the number of values read into the out buffer
   size_t ReadValues(size_t batch_size, T* out);
 
-  // Map of compression type to decompressor object.
+  // Map of encoding type to the respective decoder object. For example, a
+  // column chunk's data pages may include both dictionary-encoded and
+  // plain-encoded data.
   std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;
 
+  void ConfigureDictionary(const DictionaryPage* page);
+
   DecoderType* current_decoder_;
-  std::vector<uint8_t> values_buffer_;
 };
 
 
 template <int TYPE>
 inline size_t TypedColumnReader<TYPE>::ReadValues(size_t batch_size, T* out) {
   size_t num_decoded = current_decoder_->Decode(out, batch_size);
-  num_decoded_values_ += num_decoded;
   return num_decoded;
 }
 
@@ -212,9 +192,22 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le
   size_t num_def_levels = 0;
   size_t num_rep_levels = 0;
 
+  size_t values_to_read = 0;
+
   // If the field is required and non-repeated, there are no definition levels
   if (definition_level_decoder_) {
     num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
+
+    // TODO(wesm): this tallying of values-to-decode can be performed with better
+    // cache-efficiency if fused with the level decoding.
+    for (size_t i = 0; i < num_def_levels; ++i) {
+      if (def_levels[i] == max_definition_level_) {
+        ++values_to_read;
+      }
+    }
+  } else {
+    // Required field, read all values
+    values_to_read = batch_size;
   }
 
   // Not present for non-repeated fields
@@ -226,18 +219,11 @@ inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_le
     }
   }
 
-  // TODO(wesm): this tallying of values-to-decode can be performed with better
-  // cache-efficiency if fused with the level decoding.
-  size_t values_to_read = 0;
-  for (size_t i = 0; i < num_def_levels; ++i) {
-    if (def_levels[i] == max_definition_level_) {
-      ++values_to_read;
-    }
-  }
-
   *values_read = ReadValues(values_to_read, values);
+  size_t total_values = std::max(num_def_levels, *values_read);
+  num_decoded_values_ += total_values;
 
-  return num_def_levels;
+  return total_values;
 }
 
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/serialized-page.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.cc b/src/parquet/column/serialized-page.cc
new file mode 100644
index 0000000..1cbaf4d
--- /dev/null
+++ b/src/parquet/column/serialized-page.cc
@@ -0,0 +1,103 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column/serialized-page.h"
+
+#include <memory>
+
+#include "parquet/exception.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/input_stream.h"
+
+using parquet::PageType;
+
+namespace parquet_cpp {
+
+// ----------------------------------------------------------------------
+// SerializedPageReader deserializes Thrift metadata and pages that have been
+// assembled in a serialized stream for storing in a Parquet files
+
+SerializedPageReader::SerializedPageReader(std::unique_ptr<InputStream> stream,
+    parquet::CompressionCodec::type codec) :
+    stream_(std::move(stream)) {
+  switch (codec) {
+    case parquet::CompressionCodec::UNCOMPRESSED:
+      break;
+    case parquet::CompressionCodec::SNAPPY:
+      decompressor_.reset(new SnappyCodec());
+      break;
+    default:
+      ParquetException::NYI("Reading compressed data");
+  }
+}
+
+// TODO(wesm): this may differ from file to file
+static constexpr int DATA_PAGE_SIZE = 64 * 1024;
+
+std::shared_ptr<Page> SerializedPageReader::NextPage() {
+  // Loop here because there may be unhandled page types that we skip until
+  // finding a page that we do know what to do with
+  while (true) {
+    int bytes_read = 0;
+    const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
+    if (bytes_read == 0) {
+      return std::shared_ptr<Page>(nullptr);
+    }
+
+    // This gets used, then set by DeserializeThriftMsg
+    uint32_t header_size = bytes_read;
+    DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
+
+    // Advance the stream offset
+    stream_->Read(header_size, &bytes_read);
+
+    int compressed_len = current_page_header_.compressed_page_size;
+    int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+    // Read the compressed data page.
+    buffer = stream_->Read(compressed_len, &bytes_read);
+    if (bytes_read != compressed_len) ParquetException::EofException();
+
+    // Uncompress it if we need to
+    if (decompressor_ != NULL) {
+      // Grow the uncompressed buffer if we need to.
+      if (uncompressed_len > decompression_buffer_.size()) {
+        decompression_buffer_.resize(uncompressed_len);
+      }
+      decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+          &decompression_buffer_[0]);
+      buffer = &decompression_buffer_[0];
+    }
+
+    if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
+      return std::make_shared<DictionaryPage>(buffer, uncompressed_len,
+          current_page_header_.dictionary_page_header);
+    } else if (current_page_header_.type == PageType::DATA_PAGE) {
+      return std::make_shared<DataPage>(buffer, uncompressed_len,
+          current_page_header_.data_page_header);
+    } else if (current_page_header_.type == PageType::DATA_PAGE_V2) {
+      ParquetException::NYI("data page v2");
+    } else {
+      // We don't know what this page type is. We're allowed to skip non-data
+      // pages.
+      continue;
+    }
+  }
+  return std::shared_ptr<Page>(nullptr);
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/serialized-page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/serialized-page.h b/src/parquet/column/serialized-page.h
new file mode 100644
index 0000000..2735c3c
--- /dev/null
+++ b/src/parquet/column/serialized-page.h
@@ -0,0 +1,61 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_SERIALIZED_PAGE_H
+#define PARQUET_COLUMN_SERIALIZED_PAGE_H
+
+#include <memory>
+#include <vector>
+
+#include "parquet/column/page.h"
+#include "parquet/compression/codec.h"
+#include "parquet/util/input_stream.h"
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+// This subclass delimits pages appearing in a serialized stream, each preceded
+// by a serialized Thrift parquet::PageHeader indicating the type of each page
+// and the page metadata.
+class SerializedPageReader : public PageReader {
+ public:
+  SerializedPageReader(std::unique_ptr<InputStream> stream,
+      parquet::CompressionCodec::type codec);
+
+  virtual ~SerializedPageReader() {}
+
+  // Implement the PageReader interface
+  virtual std::shared_ptr<Page> NextPage();
+
+ private:
+  std::unique_ptr<InputStream> stream_;
+
+  parquet::PageHeader current_page_header_;
+  std::shared_ptr<Page> current_page_;
+
+  // Compression codec to use.
+  std::unique_ptr<Codec> decompressor_;
+  std::vector<uint8_t> decompression_buffer_;
+};
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_SERIALIZED_PAGE_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/column/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/test-util.h b/src/parquet/column/test-util.h
new file mode 100644
index 0000000..80f3fa1
--- /dev/null
+++ b/src/parquet/column/test-util.h
@@ -0,0 +1,184 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_TEST_UTIL_H
+#define PARQUET_COLUMN_TEST_UTIL_H
+
+#include <algorithm>
+#include <memory>
+#include <vector>
+
+#include "parquet/column/page.h"
+
+using parquet::Encoding;
+
+namespace parquet_cpp {
+
+namespace test {
+
+class MockPageReader : public PageReader {
+ public:
+  explicit MockPageReader(const std::vector<std::shared_ptr<Page> >& pages) :
+      pages_(pages),
+      page_index_(0) {}
+
+  // Implement the PageReader interface
+  virtual std::shared_ptr<Page> NextPage() {
+    if (page_index_ == pages_.size()) {
+      // EOS to consumer
+      return std::shared_ptr<Page>(nullptr);
+    }
+    return pages_[page_index_++];
+  }
+
+ private:
+  std::vector<std::shared_ptr<Page> > pages_;
+  size_t page_index_;
+};
+
+// TODO(wesm): this is only used for testing for now
+
+static constexpr int DEFAULT_DATA_PAGE_SIZE = 64 * 1024;
+static constexpr int INIT_BUFFER_SIZE = 1024;
+
+template <int TYPE>
+class DataPageBuilder {
+ public:
+  typedef typename type_traits<TYPE>::value_type T;
+
+  // The passed vector is the owner of the page's data
+  explicit DataPageBuilder(std::vector<uint8_t>* out) :
+      out_(out),
+      buffer_size_(0),
+      num_values_(0),
+      have_def_levels_(false),
+      have_rep_levels_(false),
+      have_values_(false) {
+    out_->resize(INIT_BUFFER_SIZE);
+    buffer_capacity_ = INIT_BUFFER_SIZE;
+  }
+
+  void AppendDefLevels(const std::vector<int16_t>& levels,
+      int16_t max_level, parquet::Encoding::type encoding) {
+    AppendLevels(levels, max_level, encoding);
+
+    num_values_ = std::max(levels.size(), num_values_);
+    header_.__set_definition_level_encoding(encoding);
+    have_def_levels_ = true;
+  }
+
+  void AppendRepLevels(const std::vector<int16_t>& levels,
+      int16_t max_level, parquet::Encoding::type encoding) {
+    AppendLevels(levels, max_level, encoding);
+
+    num_values_ = std::max(levels.size(), num_values_);
+    header_.__set_repetition_level_encoding(encoding);
+    have_rep_levels_ = true;
+  }
+
+  void AppendValues(const std::vector<T>& values,
+      parquet::Encoding::type encoding) {
+    if (encoding != Encoding::PLAIN) {
+      ParquetException::NYI("only plain encoding currently implemented");
+    }
+    size_t bytes_to_encode = values.size() * sizeof(T);
+    Reserve(bytes_to_encode);
+
+    PlainEncoder<TYPE> encoder(nullptr);
+    size_t nbytes = encoder.Encode(&values[0], values.size(), Head());
+    // In case for some reason it's fewer than bytes_to_encode
+    buffer_size_ += nbytes;
+
+    num_values_ = std::max(values.size(), num_values_);
+    header_.__set_encoding(encoding);
+    have_values_ = true;
+  }
+
+  std::shared_ptr<Page> Finish() {
+    if (!have_values_) {
+      throw ParquetException("A data page must at least contain values");
+    }
+    header_.__set_num_values(num_values_);
+    return std::make_shared<DataPage>(&(*out_)[0], buffer_size_, header_);
+  }
+
+ private:
+  std::vector<uint8_t>* out_;
+
+  size_t buffer_size_;
+  size_t buffer_capacity_;
+
+  parquet::DataPageHeader header_;
+
+  size_t num_values_;
+
+  bool have_def_levels_;
+  bool have_rep_levels_;
+  bool have_values_;
+
+  void Reserve(size_t nbytes) {
+    while ((nbytes + buffer_size_) > buffer_capacity_) {
+      // TODO(wesm): limit to one reserve when this loop runs more than once
+      size_t new_capacity = 2 * buffer_capacity_;
+      out_->resize(new_capacity);
+      buffer_capacity_ = new_capacity;
+    }
+  }
+
+  uint8_t* Head() {
+    return &(*out_)[buffer_size_];
+  }
+
+  // Used internally for both repetition and definition levels
+  void AppendLevels(const std::vector<int16_t>& levels, int16_t max_level,
+      parquet::Encoding::type encoding) {
+    if (encoding != Encoding::RLE) {
+      ParquetException::NYI("only rle encoding currently implemented");
+    }
+
+    // TODO: compute a more precise maximum size for the encoded levels
+    std::vector<uint8_t> encode_buffer(DEFAULT_DATA_PAGE_SIZE);
+
+    RleEncoder encoder(&encode_buffer[0], encode_buffer.size(),
+        BitUtil::NumRequiredBits(max_level));
+
+    // TODO(wesm): push down vector encoding
+    for (int16_t level : levels) {
+      if (!encoder.Put(level)) {
+        throw ParquetException("out of space");
+      }
+    }
+
+    uint32_t rle_bytes = encoder.Flush();
+    size_t levels_footprint = sizeof(uint32_t) + rle_bytes;
+    Reserve(levels_footprint);
+
+    *reinterpret_cast<uint32_t*>(Head()) = rle_bytes;
+    memcpy(Head() + sizeof(uint32_t), encoder.buffer(), rle_bytes);
+    buffer_size_ += levels_footprint;
+  }
+};
+
+} // namespace test
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_TEST_UTIL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/encodings/encodings.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h
index b30146a..4fb3d9a 100644
--- a/src/parquet/encodings/encodings.h
+++ b/src/parquet/encodings/encodings.h
@@ -67,6 +67,40 @@ class Decoder {
   int num_values_;
 };
 
+
+// Base class for value encoders. Since encoders may or not have state (e.g.,
+// dictionary encoding) we use a class instance to maintain any state.
+//
+// TODO(wesm): Encode interface API is temporary
+template <int TYPE>
+class Encoder {
+ public:
+  typedef typename type_traits<TYPE>::value_type T;
+
+  virtual ~Encoder() {}
+
+  // TODO(wesm): use an output stream
+
+  // Subclasses should override the ones they support
+  //
+  // @returns: the number of bytes written to dst
+  virtual size_t Encode(const T* src, int num_values, uint8_t* dst) {
+    throw ParquetException("Encoder does not implement this type.");
+    return 0;
+  }
+
+  const parquet::Encoding::type encoding() const { return encoding_; }
+
+ protected:
+  explicit Encoder(const parquet::SchemaElement* schema,
+      const parquet::Encoding::type& encoding)
+      : schema_(schema), encoding_(encoding) {}
+
+  // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
+  const parquet::SchemaElement* schema_;
+  const parquet::Encoding::type encoding_;
+};
+
 } // namespace parquet_cpp
 
 #include "parquet/encodings/plain-encoding.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index e8f8977..11e70c7 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -22,8 +22,13 @@
 
 #include <algorithm>
 
+using parquet::Type;
+
 namespace parquet_cpp {
 
+// ----------------------------------------------------------------------
+// Encoding::PLAIN decoder implementation
+
 template <int TYPE>
 class PlainDecoder : public Decoder<TYPE> {
  public:
@@ -60,7 +65,7 @@ inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
 
 // Template specialization for BYTE_ARRAY
 template <>
-inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
+inline int PlainDecoder<Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
     int max_values) {
   max_values = std::min(max_values, num_values_);
   for (int i = 0; i < max_values; ++i) {
@@ -76,7 +81,7 @@ inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
 
 // Template specialization for FIXED_LEN_BYTE_ARRAY
 template <>
-inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode(
+inline int PlainDecoder<Type::FIXED_LEN_BYTE_ARRAY>::Decode(
     FixedLenByteArray* buffer, int max_values) {
   max_values = std::min(max_values, num_values_);
   int len = schema_->type_length;
@@ -91,10 +96,10 @@ inline int PlainDecoder<parquet::Type::FIXED_LEN_BYTE_ARRAY>::Decode(
 }
 
 template <>
-class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> {
+class PlainDecoder<Type::BOOLEAN> : public Decoder<Type::BOOLEAN> {
  public:
   explicit PlainDecoder(const parquet::SchemaElement* schema) :
-      Decoder<parquet::Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {}
+      Decoder<Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {}
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;
@@ -113,6 +118,49 @@ class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLE
   RleDecoder decoder_;
 };
 
+// ----------------------------------------------------------------------
+// Encoding::PLAIN encoder implementation
+
+template <int TYPE>
+class PlainEncoder : public Encoder<TYPE> {
+ public:
+  typedef typename type_traits<TYPE>::value_type T;
+
+  explicit PlainEncoder(const parquet::SchemaElement* schema) :
+      Encoder<TYPE>(schema, parquet::Encoding::PLAIN) {}
+
+  virtual size_t Encode(const T* src, int num_values, uint8_t* dst);
+};
+
+template <int TYPE>
+inline size_t PlainEncoder<TYPE>::Encode(const T* buffer, int num_values,
+    uint8_t* dst) {
+  size_t nbytes = num_values * sizeof(T);
+  memcpy(dst, buffer, nbytes);
+  return nbytes;
+}
+
+template <>
+inline size_t PlainEncoder<Type::BOOLEAN>::Encode(
+    const bool* src, int num_values, uint8_t* dst) {
+  ParquetException::NYI("bool encoding");
+  return 0;
+}
+
+template <>
+inline size_t PlainEncoder<Type::BYTE_ARRAY>::Encode(const ByteArray* src,
+    int num_values, uint8_t* dst) {
+  ParquetException::NYI("byte array encoding");
+  return 0;
+}
+
+template <>
+inline size_t PlainEncoder<Type::FIXED_LEN_BYTE_ARRAY>::Encode(
+    const FixedLenByteArray* src, int num_values, uint8_t* dst) {
+  ParquetException::NYI("FLBA encoding");
+  return 0;
+}
+
 } // namespace parquet_cpp
 
 #endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index a43a2a5..a4e767e 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -25,7 +25,9 @@
 #include <vector>
 
 #include "parquet/column/reader.h"
+#include "parquet/column/serialized-page.h"
 #include "parquet/column/scanner.h"
+
 #include "parquet/exception.h"
 #include "parquet/thrift/util.h"
 #include "parquet/util/input_stream.h"
@@ -115,8 +117,13 @@ std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
   }
 
   // TODO(wesm): This presumes a flat schema
-  std::shared_ptr<ColumnReader> reader = ColumnReader::Make(&col.meta_data,
-      &this->parent_->metadata_.schema[i + 1], std::move(input));
+  const parquet::SchemaElement* schema = &parent_->metadata_.schema[i + 1];
+
+  std::unique_ptr<PageReader> pager(
+      new SerializedPageReader(std::move(input), col.meta_data.codec));
+
+  std::shared_ptr<ColumnReader> reader = ColumnReader::Make(schema,
+      std::move(pager));
   column_readers_[i] = reader;
 
   return reader;
@@ -269,7 +276,7 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
     size_t nColumns = group_reader->num_columns();
 
     for (int c = 0; c < group_reader->num_columns(); ++c) {
-      const parquet::ColumnMetaData* meta_data = group_reader->Column(c)->metadata();
+      const parquet::ColumnMetaData* meta_data = group_reader->column_metadata(c);
       stream << "Column " << c
              << ": " << meta_data->num_values << " rows, "
              << meta_data->statistics.null_count << " null values, "

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/reader.h b/src/parquet/reader.h
index 4c92119..16927a7 100644
--- a/src/parquet/reader.h
+++ b/src/parquet/reader.h
@@ -83,6 +83,10 @@ class RowGroupReader {
   // column. Ownership is shared with the RowGroupReader.
   std::shared_ptr<ColumnReader> Column(size_t i);
 
+  const parquet::ColumnMetaData* column_metadata(size_t i) const {
+    return &row_group_->columns[i].meta_data;
+  }
+
   size_t num_columns() const {
     return row_group_->columns.size();
   }

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/util/bit-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/bit-util.h b/src/parquet/util/bit-util.h
index 4db585a..7a2e921 100644
--- a/src/parquet/util/bit-util.h
+++ b/src/parquet/util/bit-util.h
@@ -276,6 +276,14 @@ class BitUtil {
   static T UnsetBit(T v, int bitpos) {
     return v & ~(static_cast<T>(0x1) << bitpos);
   }
+
+  // Returns the minimum number of bits needed to represent the value of 'x'
+  static inline int NumRequiredBits(uint64_t x) {
+    for (int i = 63; i >= 0; --i) {
+      if (x & 1L << i) return i + 1;
+    }
+    return 0;
+  }
 };
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/08088af7/src/parquet/util/test-common.h
----------------------------------------------------------------------
diff --git a/src/parquet/util/test-common.h b/src/parquet/util/test-common.h
new file mode 100644
index 0000000..38bc32c
--- /dev/null
+++ b/src/parquet/util/test-common.h
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_UTIL_TEST_COMMON_H
+#define PARQUET_UTIL_TEST_COMMON_H
+
+#include <iostream>
+#include <vector>
+
+using std::vector;
+
+namespace parquet_cpp {
+
+namespace test {
+
+template <typename T>
+static inline bool vector_equal(const vector<T>& left, const vector<T>& right) {
+  if (left.size() != right.size()) {
+    return false;
+  }
+
+  for (size_t i = 0; i < left.size(); ++i) {
+    if (left[i] != right[i]) {
+      std::cerr << "index " << i
+                << " left was " << left[i]
+                << " right was " << right[i]
+                << std::endl;
+      return false;
+    }
+  }
+
+  return true;
+}
+
+} // namespace test
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_UTIL_TEST_COMMON_H