You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/06/26 07:05:26 UTC

[3/6] parquet-cpp git commit: PARQUET-858: Flatten column directory, minor code consolidation

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column/writer.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.h b/src/parquet/column/writer.h
deleted file mode 100644
index 407e808..0000000
--- a/src/parquet/column/writer.h
+++ /dev/null
@@ -1,250 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_WRITER_H
-#define PARQUET_COLUMN_WRITER_H
-
-#include <vector>
-
-#include "parquet/column/levels.h"
-#include "parquet/column/page.h"
-#include "parquet/column/properties.h"
-#include "parquet/column/statistics.h"
-#include "parquet/encoding.h"
-#include "parquet/file/metadata.h"
-#include "parquet/schema.h"
-#include "parquet/types.h"
-#include "parquet/util/memory.h"
-#include "parquet/util/visibility.h"
-
-namespace parquet {
-
-static constexpr int WRITE_BATCH_SIZE = 1000;
-class PARQUET_EXPORT ColumnWriter {
- public:
-  ColumnWriter(ColumnChunkMetaDataBuilder*, std::unique_ptr<PageWriter>,
-      int64_t expected_rows, bool has_dictionary, Encoding::type encoding,
-      const WriterProperties* properties);
-
-  static std::shared_ptr<ColumnWriter> Make(ColumnChunkMetaDataBuilder*,
-      std::unique_ptr<PageWriter>, int64_t expected_rows,
-      const WriterProperties* properties);
-
-  Type::type type() const { return descr_->physical_type(); }
-
-  const ColumnDescriptor* descr() const { return descr_; }
-
-  /**
-   * Closes the ColumnWriter, commits any buffered values to pages.
-   *
-   * @return Total size of the column in bytes
-   */
-  int64_t Close();
-
- protected:
-  virtual std::shared_ptr<Buffer> GetValuesBuffer() = 0;
-
-  // Serializes Dictionary Page if enabled
-  virtual void WriteDictionaryPage() = 0;
-
-  // Checks if the Dictionary Page size limit is reached
-  // If the limit is reached, the Dictionary and Data Pages are serialized
-  // The encoding is switched to PLAIN
-
-  virtual void CheckDictionarySizeLimit() = 0;
-
-  // Plain-encoded statistics of the current page
-  virtual EncodedStatistics GetPageStatistics() = 0;
-
-  // Plain-encoded statistics of the whole chunk
-  virtual EncodedStatistics GetChunkStatistics() = 0;
-
-  // Merges page statistics into chunk statistics, then resets the values
-  virtual void ResetPageStatistics() = 0;
-
-  // Adds Data Pages to an in memory buffer in dictionary encoding mode
-  // Serializes the Data Pages in other encoding modes
-  void AddDataPage();
-
-  // Serializes Data Pages
-  void WriteDataPage(const CompressedDataPage& page);
-
-  // Write multiple definition levels
-  void WriteDefinitionLevels(int64_t num_levels, const int16_t* levels);
-
-  // Write multiple repetition levels
-  void WriteRepetitionLevels(int64_t num_levels, const int16_t* levels);
-
-  // RLE encode the src_buffer into dest_buffer and return the encoded size
-  int64_t RleEncodeLevels(
-      const Buffer& src_buffer, ResizableBuffer* dest_buffer, int16_t max_level);
-
-  // Serialize the buffered Data Pages
-  void FlushBufferedDataPages();
-
-  ColumnChunkMetaDataBuilder* metadata_;
-  const ColumnDescriptor* descr_;
-
-  std::unique_ptr<PageWriter> pager_;
-
-  // The number of rows that should be written in this column chunk.
-  int64_t expected_rows_;
-  bool has_dictionary_;
-  Encoding::type encoding_;
-  const WriterProperties* properties_;
-
-  LevelEncoder level_encoder_;
-
-  ::arrow::MemoryPool* allocator_;
-  ChunkedAllocator pool_;
-
-  // The total number of values stored in the data page. This is the maximum of
-  // the number of encoded definition levels or encoded values. For
-  // non-repeated, required columns, this is equal to the number of encoded
-  // values. For repeated or optional values, there may be fewer data values
-  // than levels, and this tells you how many encoded levels there are in that
-  // case.
-  int64_t num_buffered_values_;
-
-  // The total number of stored values. For repeated or optional values, this
-  // number may be lower than num_buffered_values_.
-  int64_t num_buffered_encoded_values_;
-
-  // Total number of rows written with this ColumnWriter
-  int num_rows_;
-
-  // Records the total number of bytes written by the serializer
-  int64_t total_bytes_written_;
-
-  // Flag to check if the Writer has been closed
-  bool closed_;
-
-  // Flag to infer if dictionary encoding has fallen back to PLAIN
-  bool fallback_;
-
-  std::unique_ptr<InMemoryOutputStream> definition_levels_sink_;
-  std::unique_ptr<InMemoryOutputStream> repetition_levels_sink_;
-
-  std::shared_ptr<ResizableBuffer> definition_levels_rle_;
-  std::shared_ptr<ResizableBuffer> repetition_levels_rle_;
-
-  std::shared_ptr<ResizableBuffer> uncompressed_data_;
-  std::shared_ptr<ResizableBuffer> compressed_data_;
-
-  std::vector<CompressedDataPage> data_pages_;
-
- private:
-  void InitSinks();
-};
-
-// API to write values to a single column. This is the main client facing API.
-template <typename DType>
-class PARQUET_EXPORT TypedColumnWriter : public ColumnWriter {
- public:
-  typedef typename DType::c_type T;
-
-  TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
-      std::unique_ptr<PageWriter> pager, int64_t expected_rows, Encoding::type encoding,
-      const WriterProperties* properties);
-
-  // Write a batch of repetition levels, definition levels, and values to the
-  // column.
-  void WriteBatch(int64_t num_values, const int16_t* def_levels,
-      const int16_t* rep_levels, const T* values);
-
-  /// Write a batch of repetition levels, definition levels, and values to the
-  /// column.
-  ///
-  /// In comparision to WriteBatch the length of repetition and definition levels
-  /// is the same as of the number of values read for max_definition_level == 1.
-  /// In the case of max_definition_level > 1, the repetition and definition
-  /// levels are larger than the values but the values include the null entries
-  /// with definition_level == (max_definition_level - 1). Thus we have to differentiate
-  /// in the parameters of this function if the input has the length of num_values or the
-  /// _number of rows in the lowest nesting level_.
-  ///
-  /// In the case that the most inner node in the Parquet is required, the _number of rows
-  /// in the lowest nesting level_ is equal to the number of non-null values. If the
-  /// inner-most schema node is optional, the _number of rows in the lowest nesting level_
-  /// also includes all values with definition_level == (max_definition_level - 1).
-  ///
-  /// @param num_values number of levels to write.
-  /// @param def_levels The Parquet definiton levels, length is num_values
-  /// @param rep_levels The Parquet repetition levels, length is num_values
-  /// @param valid_bits Bitmap that indicates if the row is null on the lowest nesting
-  ///   level. The length is number of rows in the lowest nesting level.
-  /// @param valid_bits_offset The offset in bits of the valid_bits where the
-  ///   first relevant bit resides.
-  /// @param values The values in the lowest nested level including
-  ///   spacing for nulls on the lowest levels; input has the length
-  ///   of the number of rows on the lowest nesting level.
-  void WriteBatchSpaced(int64_t num_values, const int16_t* def_levels,
-      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
-      const T* values);
-
- protected:
-  std::shared_ptr<Buffer> GetValuesBuffer() override {
-    return current_encoder_->FlushValues();
-  }
-  void WriteDictionaryPage() override;
-  void CheckDictionarySizeLimit() override;
-  EncodedStatistics GetPageStatistics() override;
-  EncodedStatistics GetChunkStatistics() override;
-  void ResetPageStatistics() override;
-
- private:
-  int64_t WriteMiniBatch(int64_t num_values, const int16_t* def_levels,
-      const int16_t* rep_levels, const T* values);
-
-  int64_t WriteMiniBatchSpaced(int64_t num_values, const int16_t* def_levels,
-      const int16_t* rep_levels, const uint8_t* valid_bits, int64_t valid_bits_offset,
-      const T* values, int64_t* num_spaced_written);
-
-  typedef Encoder<DType> EncoderType;
-
-  // Write values to a temporary buffer before they are encoded into pages
-  void WriteValues(int64_t num_values, const T* values);
-  void WriteValuesSpaced(int64_t num_values, const uint8_t* valid_bits,
-      int64_t valid_bits_offset, const T* values);
-  std::unique_ptr<EncoderType> current_encoder_;
-
-  typedef TypedRowGroupStatistics<DType> TypedStats;
-  std::unique_ptr<TypedStats> page_statistics_;
-  std::unique_ptr<TypedStats> chunk_statistics_;
-};
-
-typedef TypedColumnWriter<BooleanType> BoolWriter;
-typedef TypedColumnWriter<Int32Type> Int32Writer;
-typedef TypedColumnWriter<Int64Type> Int64Writer;
-typedef TypedColumnWriter<Int96Type> Int96Writer;
-typedef TypedColumnWriter<FloatType> FloatWriter;
-typedef TypedColumnWriter<DoubleType> DoubleWriter;
-typedef TypedColumnWriter<ByteArrayType> ByteArrayWriter;
-typedef TypedColumnWriter<FLBAType> FixedLenByteArrayWriter;
-
-extern template class PARQUET_EXPORT TypedColumnWriter<BooleanType>;
-extern template class PARQUET_EXPORT TypedColumnWriter<Int32Type>;
-extern template class PARQUET_EXPORT TypedColumnWriter<Int64Type>;
-extern template class PARQUET_EXPORT TypedColumnWriter<Int96Type>;
-extern template class PARQUET_EXPORT TypedColumnWriter<FloatType>;
-extern template class PARQUET_EXPORT TypedColumnWriter<DoubleType>;
-extern template class PARQUET_EXPORT TypedColumnWriter<ByteArrayType>;
-extern template class PARQUET_EXPORT TypedColumnWriter<FLBAType>;
-
-}  // namespace parquet
-
-#endif  // PARQUET_COLUMN_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_page.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_page.h b/src/parquet/column_page.h
new file mode 100644
index 0000000..7840612
--- /dev/null
+++ b/src/parquet/column_page.h
@@ -0,0 +1,201 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// This module defines an abstract interface for iterating through pages in a
+// Parquet column chunk within a row group. It could be extended in the future
+// to iterate through all data pages in all chunks in a file.
+
+#ifndef PARQUET_COLUMN_PAGE_H
+#define PARQUET_COLUMN_PAGE_H
+
+#include <cstdint>
+#include <memory>
+#include <string>
+
+#include "parquet/statistics.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+
+namespace parquet {
+
+// TODO: Parallel processing is not yet safe because of memory-ownership
+// semantics (the PageReader may or may not own the memory referenced by a
+// page)
+//
+// TODO(wesm): In the future Parquet implementations may store the crc code
+// in format::PageHeader. parquet-mr currently does not, so we also skip it
+// here, both on the read and write path
+class Page {
+ public:
+  Page(const std::shared_ptr<Buffer>& buffer, PageType::type type)
+      : buffer_(buffer), type_(type) {}
+
+  PageType::type type() const { return type_; }
+
+  std::shared_ptr<Buffer> buffer() const { return buffer_; }
+
+  // @returns: a pointer to the page's data
+  const uint8_t* data() const { return buffer_->data(); }
+
+  // @returns: the total size in bytes of the page's data buffer
+  int32_t size() const { return static_cast<int32_t>(buffer_->size()); }
+
+ private:
+  std::shared_ptr<Buffer> buffer_;
+  PageType::type type_;
+};
+
+class DataPage : public Page {
+ public:
+  DataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+      Encoding::type encoding, Encoding::type definition_level_encoding,
+      Encoding::type repetition_level_encoding,
+      const EncodedStatistics& statistics = EncodedStatistics())
+      : Page(buffer, PageType::DATA_PAGE),
+        num_values_(num_values),
+        encoding_(encoding),
+        definition_level_encoding_(definition_level_encoding),
+        repetition_level_encoding_(repetition_level_encoding),
+        statistics_(statistics) {}
+
+  int32_t num_values() const { return num_values_; }
+
+  Encoding::type encoding() const { return encoding_; }
+
+  Encoding::type repetition_level_encoding() const { return repetition_level_encoding_; }
+
+  Encoding::type definition_level_encoding() const { return definition_level_encoding_; }
+
+  const EncodedStatistics& statistics() const { return statistics_; }
+
+ private:
+  int32_t num_values_;
+  Encoding::type encoding_;
+  Encoding::type definition_level_encoding_;
+  Encoding::type repetition_level_encoding_;
+  EncodedStatistics statistics_;
+};
+
+class CompressedDataPage : public DataPage {
+ public:
+  CompressedDataPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+      Encoding::type encoding, Encoding::type definition_level_encoding,
+      Encoding::type repetition_level_encoding, int64_t uncompressed_size,
+      const EncodedStatistics& statistics = EncodedStatistics())
+      : DataPage(buffer, num_values, encoding, definition_level_encoding,
+            repetition_level_encoding, statistics),
+        uncompressed_size_(uncompressed_size) {}
+
+  int64_t uncompressed_size() const { return uncompressed_size_; }
+
+ private:
+  int64_t uncompressed_size_;
+};
+
+class DataPageV2 : public Page {
+ public:
+  DataPageV2(const std::shared_ptr<Buffer>& buffer, int32_t num_values, int32_t num_nulls,
+      int32_t num_rows, Encoding::type encoding, int32_t definition_levels_byte_length,
+      int32_t repetition_levels_byte_length, bool is_compressed = false)
+      : Page(buffer, PageType::DATA_PAGE_V2),
+        num_values_(num_values),
+        num_nulls_(num_nulls),
+        num_rows_(num_rows),
+        encoding_(encoding),
+        definition_levels_byte_length_(definition_levels_byte_length),
+        repetition_levels_byte_length_(repetition_levels_byte_length),
+        is_compressed_(is_compressed) {}
+
+  int32_t num_values() const { return num_values_; }
+
+  int32_t num_nulls() const { return num_nulls_; }
+
+  int32_t num_rows() const { return num_rows_; }
+
+  Encoding::type encoding() const { return encoding_; }
+
+  int32_t definition_levels_byte_length() const { return definition_levels_byte_length_; }
+
+  int32_t repetition_levels_byte_length() const { return repetition_levels_byte_length_; }
+
+  bool is_compressed() const { return is_compressed_; }
+
+ private:
+  int32_t num_values_;
+  int32_t num_nulls_;
+  int32_t num_rows_;
+  Encoding::type encoding_;
+  int32_t definition_levels_byte_length_;
+  int32_t repetition_levels_byte_length_;
+  bool is_compressed_;
+
+  // TODO(wesm): format::DataPageHeaderV2.statistics
+};
+
+class DictionaryPage : public Page {
+ public:
+  DictionaryPage(const std::shared_ptr<Buffer>& buffer, int32_t num_values,
+      Encoding::type encoding, bool is_sorted = false)
+      : Page(buffer, PageType::DICTIONARY_PAGE),
+        num_values_(num_values),
+        encoding_(encoding),
+        is_sorted_(is_sorted) {}
+
+  int32_t num_values() const { return num_values_; }
+
+  Encoding::type encoding() const { return encoding_; }
+
+  bool is_sorted() const { return is_sorted_; }
+
+ private:
+  int32_t num_values_;
+  Encoding::type encoding_;
+  bool is_sorted_;
+};
+
+// Abstract page iterator interface. This way, we can feed column pages to the
+// ColumnReader through whatever mechanism we choose
+class PageReader {
+ public:
+  virtual ~PageReader() {}
+
+  // @returns: shared_ptr<Page>(nullptr) on EOS, std::shared_ptr<Page>
+  // containing new Page otherwise
+  virtual std::shared_ptr<Page> NextPage() = 0;
+};
+
+class PageWriter {
+ public:
+  virtual ~PageWriter() {}
+
+  // The Column Writer decides if dictionary encoding is used if set and
+  // if the dictionary encoding has fallen back to default encoding on reaching dictionary
+  // page limit
+  virtual void Close(bool has_dictionary, bool fallback) = 0;
+
+  virtual int64_t WriteDataPage(const CompressedDataPage& page) = 0;
+
+  virtual int64_t WriteDictionaryPage(const DictionaryPage& page) = 0;
+
+  virtual bool has_compressor() = 0;
+
+  virtual void Compress(const Buffer& src_buffer, ResizableBuffer* dest_buffer) = 0;
+};
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_PAGE_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader-test.cc b/src/parquet/column_reader-test.cc
new file mode 100644
index 0000000..84d1e37
--- /dev/null
+++ b/src/parquet/column_reader-test.cc
@@ -0,0 +1,371 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <limits>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "parquet/column_page.h"
+#include "parquet/column_reader.h"
+#include "parquet/schema.h"
+#include "parquet/test-util.h"
+#include "parquet/types.h"
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+using std::shared_ptr;
+
+namespace parquet {
+
+using schema::NodePtr;
+
+namespace test {
+
+template <typename T>
+static inline bool vector_equal_with_def_levels(const vector<T>& left,
+    const vector<int16_t>& def_levels, int16_t max_def_levels, int16_t max_rep_levels,
+    const vector<T>& right) {
+  size_t i_left = 0;
+  size_t i_right = 0;
+  for (size_t i = 0; i < def_levels.size(); i++) {
+    if (def_levels[i] == max_def_levels) {
+      // Compare
+      if (left[i_left] != right[i_right]) {
+        std::cerr << "index " << i << " left was " << left[i_left] << " right was "
+                  << right[i] << std::endl;
+        return false;
+      }
+      i_left++;
+      i_right++;
+    } else if (def_levels[i] == (max_def_levels - 1)) {
+      // Null entry on the lowest nested level
+      i_right++;
+    } else if (def_levels[i] < (max_def_levels - 1)) {
+      // Null entry on a higher nesting level, only supported for non-repeating data
+      if (max_rep_levels == 0) { i_right++; }
+    }
+  }
+
+  return true;
+}
+
+class TestPrimitiveReader : public ::testing::Test {
+ public:
+  void InitReader(const ColumnDescriptor* d) {
+    std::unique_ptr<PageReader> pager_;
+    pager_.reset(new test::MockPageReader(pages_));
+    reader_ = ColumnReader::Make(d, std::move(pager_));
+  }
+
+  void CheckResults() {
+    vector<int32_t> vresult(num_values_, -1);
+    vector<int16_t> dresult(num_levels_, -1);
+    vector<int16_t> rresult(num_levels_, -1);
+    int64_t values_read = 0;
+    int total_values_read = 0;
+    int batch_actual = 0;
+
+    Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+    int32_t batch_size = 8;
+    int batch = 0;
+    // This will cover both the cases
+    // 1) batch_size < page_size (multiple ReadBatch from a single page)
+    // 2) batch_size > page_size (BatchRead limits to a single page)
+    do {
+      batch = static_cast<int>(reader->ReadBatch(batch_size, &dresult[0] + batch_actual,
+          &rresult[0] + batch_actual, &vresult[0] + total_values_read, &values_read));
+      total_values_read += static_cast<int>(values_read);
+      batch_actual += batch;
+      batch_size = std::max(batch_size * 2, 4096);
+    } while (batch > 0);
+
+    ASSERT_EQ(num_levels_, batch_actual);
+    ASSERT_EQ(num_values_, total_values_read);
+    ASSERT_TRUE(vector_equal(values_, vresult));
+    if (max_def_level_ > 0) { ASSERT_TRUE(vector_equal(def_levels_, dresult)); }
+    if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
+    // catch improper writes at EOS
+    batch_actual =
+        static_cast<int>(reader->ReadBatch(5, nullptr, nullptr, nullptr, &values_read));
+    ASSERT_EQ(0, batch_actual);
+    ASSERT_EQ(0, values_read);
+  }
+
+  void CheckResultsSpaced() {
+    vector<int32_t> vresult(num_levels_, -1);
+    vector<int16_t> dresult(num_levels_, -1);
+    vector<int16_t> rresult(num_levels_, -1);
+    vector<uint8_t> valid_bits(num_levels_, 255);
+    int total_values_read = 0;
+    int batch_actual = 0;
+    int levels_actual = 0;
+    int64_t null_count = -1;
+    int64_t levels_read = 0;
+    int64_t values_read;
+
+    Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+    int32_t batch_size = 8;
+    int batch = 0;
+    // This will cover both the cases
+    // 1) batch_size < page_size (multiple ReadBatch from a single page)
+    // 2) batch_size > page_size (BatchRead limits to a single page)
+    do {
+      batch = static_cast<int>(reader->ReadBatchSpaced(batch_size,
+          dresult.data() + levels_actual, rresult.data() + levels_actual,
+          vresult.data() + batch_actual, valid_bits.data() + batch_actual, 0,
+          &levels_read, &values_read, &null_count));
+      total_values_read += batch - static_cast<int>(null_count);
+      batch_actual += batch;
+      levels_actual += static_cast<int>(levels_read);
+      batch_size = std::max(batch_size * 2, 4096);
+    } while ((batch > 0) || (levels_read > 0));
+
+    ASSERT_EQ(num_levels_, levels_actual);
+    ASSERT_EQ(num_values_, total_values_read);
+    if (max_def_level_ > 0) {
+      ASSERT_TRUE(vector_equal(def_levels_, dresult));
+      ASSERT_TRUE(vector_equal_with_def_levels(
+          values_, dresult, max_def_level_, max_rep_level_, vresult));
+    } else {
+      ASSERT_TRUE(vector_equal(values_, vresult));
+    }
+    if (max_rep_level_ > 0) { ASSERT_TRUE(vector_equal(rep_levels_, rresult)); }
+    // catch improper writes at EOS
+    batch_actual = static_cast<int>(reader->ReadBatchSpaced(5, nullptr, nullptr, nullptr,
+        valid_bits.data(), 0, &levels_read, &values_read, &null_count));
+    ASSERT_EQ(0, batch_actual);
+    ASSERT_EQ(0, null_count);
+  }
+
+  void Clear() {
+    values_.clear();
+    def_levels_.clear();
+    rep_levels_.clear();
+    pages_.clear();
+    reader_.reset();
+  }
+
+  void ExecutePlain(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
+    num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+        rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
+    num_levels_ = num_pages * levels_per_page;
+    InitReader(d);
+    CheckResults();
+    Clear();
+
+    num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+        rep_levels_, values_, data_buffer_, pages_, Encoding::PLAIN);
+    num_levels_ = num_pages * levels_per_page;
+    InitReader(d);
+    CheckResultsSpaced();
+    Clear();
+  }
+
+  void ExecuteDict(int num_pages, int levels_per_page, const ColumnDescriptor* d) {
+    num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+        rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
+    num_levels_ = num_pages * levels_per_page;
+    InitReader(d);
+    CheckResults();
+    Clear();
+
+    num_values_ = MakePages<Int32Type>(d, num_pages, levels_per_page, def_levels_,
+        rep_levels_, values_, data_buffer_, pages_, Encoding::RLE_DICTIONARY);
+    num_levels_ = num_pages * levels_per_page;
+    InitReader(d);
+    CheckResultsSpaced();
+    Clear();
+  }
+
+ protected:
+  int num_levels_;
+  int num_values_;
+  int16_t max_def_level_;
+  int16_t max_rep_level_;
+  vector<shared_ptr<Page>> pages_;
+  std::shared_ptr<ColumnReader> reader_;
+  vector<int32_t> values_;
+  vector<int16_t> def_levels_;
+  vector<int16_t> rep_levels_;
+  vector<uint8_t> data_buffer_;  // For BA and FLBA
+};
+
+TEST_F(TestPrimitiveReader, TestInt32FlatRequired) {
+  int levels_per_page = 100;
+  int num_pages = 50;
+  max_def_level_ = 0;
+  max_rep_level_ = 0;
+  NodePtr type = schema::Int32("a", Repetition::REQUIRED);
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  ExecutePlain(num_pages, levels_per_page, &descr);
+  ExecuteDict(num_pages, levels_per_page, &descr);
+}
+
+TEST_F(TestPrimitiveReader, TestInt32FlatOptional) {
+  int levels_per_page = 100;
+  int num_pages = 50;
+  max_def_level_ = 4;
+  max_rep_level_ = 0;
+  NodePtr type = schema::Int32("b", Repetition::OPTIONAL);
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  ExecutePlain(num_pages, levels_per_page, &descr);
+  ExecuteDict(num_pages, levels_per_page, &descr);
+}
+
+TEST_F(TestPrimitiveReader, TestInt32FlatRepeated) {
+  int levels_per_page = 100;
+  int num_pages = 50;
+  max_def_level_ = 4;
+  max_rep_level_ = 2;
+  NodePtr type = schema::Int32("c", Repetition::REPEATED);
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  ExecutePlain(num_pages, levels_per_page, &descr);
+  ExecuteDict(num_pages, levels_per_page, &descr);
+}
+
+TEST_F(TestPrimitiveReader, TestInt32FlatRequiredSkip) {
+  int levels_per_page = 100;
+  int num_pages = 5;
+  max_def_level_ = 0;
+  max_rep_level_ = 0;
+  NodePtr type = schema::Int32("b", Repetition::REQUIRED);
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  MakePages<Int32Type>(&descr, num_pages, levels_per_page, def_levels_, rep_levels_,
+      values_, data_buffer_, pages_, Encoding::PLAIN);
+  InitReader(&descr);
+  vector<int32_t> vresult(levels_per_page / 2, -1);
+  vector<int16_t> dresult(levels_per_page / 2, -1);
+  vector<int16_t> rresult(levels_per_page / 2, -1);
+
+  Int32Reader* reader = static_cast<Int32Reader*>(reader_.get());
+  int64_t values_read = 0;
+
+  // 1) skip_size > page_size (multiple pages skipped)
+  // Skip first 2 pages
+  int64_t levels_skipped = reader->Skip(2 * levels_per_page);
+  ASSERT_EQ(2 * levels_per_page, levels_skipped);
+  // Read half a page
+  reader->ReadBatch(
+      levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+  vector<int32_t> sub_values(values_.begin() + 2 * levels_per_page,
+      values_.begin() + static_cast<int>(2.5 * static_cast<double>(levels_per_page)));
+  ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+  // 2) skip_size == page_size (skip across two pages)
+  levels_skipped = reader->Skip(levels_per_page);
+  ASSERT_EQ(levels_per_page, levels_skipped);
+  // Read half a page
+  reader->ReadBatch(
+      levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+  sub_values.clear();
+  sub_values.insert(sub_values.end(),
+      values_.begin() + static_cast<int>(3.5 * static_cast<double>(levels_per_page)),
+      values_.begin() + 4 * levels_per_page);
+  ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+  // 3) skip_size < page_size (skip limited to a single page)
+  // Skip half a page
+  levels_skipped = reader->Skip(levels_per_page / 2);
+  ASSERT_EQ(0.5 * levels_per_page, levels_skipped);
+  // Read half a page
+  reader->ReadBatch(
+      levels_per_page / 2, dresult.data(), rresult.data(), vresult.data(), &values_read);
+  sub_values.clear();
+  sub_values.insert(sub_values.end(),
+      values_.begin() + static_cast<int>(4.5 * static_cast<double>(levels_per_page)),
+      values_.end());
+  ASSERT_TRUE(vector_equal(sub_values, vresult));
+
+  values_.clear();
+  def_levels_.clear();
+  rep_levels_.clear();
+  pages_.clear();
+  reader_.reset();
+}
+
+TEST_F(TestPrimitiveReader, TestDictionaryEncodedPages) {
+  max_def_level_ = 0;
+  max_rep_level_ = 0;
+  NodePtr type = schema::Int32("a", Repetition::REQUIRED);
+  const ColumnDescriptor descr(type, max_def_level_, max_rep_level_);
+  shared_ptr<PoolBuffer> dummy = std::make_shared<PoolBuffer>();
+
+  shared_ptr<DictionaryPage> dict_page =
+      std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
+  shared_ptr<DataPage> data_page = MakeDataPage<Int32Type>(
+      &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
+  pages_.push_back(dict_page);
+  pages_.push_back(data_page);
+  InitReader(&descr);
+  // Tests Dict : PLAIN, Data : RLE_DICTIONARY
+  ASSERT_NO_THROW(reader_->HasNext());
+  pages_.clear();
+
+  dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
+  data_page = MakeDataPage<Int32Type>(
+      &descr, {}, 0, Encoding::PLAIN_DICTIONARY, {}, 0, {}, 0, {}, 0);
+  pages_.push_back(dict_page);
+  pages_.push_back(data_page);
+  InitReader(&descr);
+  // Tests Dict : PLAIN_DICTIONARY, Data : PLAIN_DICTIONARY
+  ASSERT_NO_THROW(reader_->HasNext());
+  pages_.clear();
+
+  data_page = MakeDataPage<Int32Type>(
+      &descr, {}, 0, Encoding::RLE_DICTIONARY, {}, 0, {}, 0, {}, 0);
+  pages_.push_back(data_page);
+  InitReader(&descr);
+  // Tests dictionary page must occur before data page
+  ASSERT_THROW(reader_->HasNext(), ParquetException);
+  pages_.clear();
+
+  dict_page = std::make_shared<DictionaryPage>(dummy, 0, Encoding::DELTA_BYTE_ARRAY);
+  pages_.push_back(dict_page);
+  InitReader(&descr);
+  // Tests only RLE_DICTIONARY is supported
+  ASSERT_THROW(reader_->HasNext(), ParquetException);
+  pages_.clear();
+
+  shared_ptr<DictionaryPage> dict_page1 =
+      std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN_DICTIONARY);
+  shared_ptr<DictionaryPage> dict_page2 =
+      std::make_shared<DictionaryPage>(dummy, 0, Encoding::PLAIN);
+  pages_.push_back(dict_page1);
+  pages_.push_back(dict_page2);
+  InitReader(&descr);
+  // Column cannot have more than one dictionary
+  ASSERT_THROW(reader_->HasNext(), ParquetException);
+  pages_.clear();
+
+  data_page = MakeDataPage<Int32Type>(
+      &descr, {}, 0, Encoding::DELTA_BYTE_ARRAY, {}, 0, {}, 0, {}, 0);
+  pages_.push_back(data_page);
+  InitReader(&descr);
+  // unsupported encoding
+  ASSERT_THROW(reader_->HasNext(), ParquetException);
+  pages_.clear();
+}
+
+}  // namespace test
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
new file mode 100644
index 0000000..f63f6f1
--- /dev/null
+++ b/src/parquet/column_reader.cc
@@ -0,0 +1,289 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column_reader.h"
+
+#include <algorithm>
+#include <cstdint>
+#include <memory>
+
+#include "parquet/column_page.h"
+#include "parquet/encoding-internal.h"
+#include "parquet/properties.h"
+#include "parquet/util/rle-encoding.h"
+
+using arrow::MemoryPool;
+
+namespace parquet {
+
+LevelDecoder::LevelDecoder() : num_values_remaining_(0) {}
+
+LevelDecoder::~LevelDecoder() {}
+
+int LevelDecoder::SetData(Encoding::type encoding, int16_t max_level,
+    int num_buffered_values, const uint8_t* data) {
+  int32_t num_bytes = 0;
+  encoding_ = encoding;
+  num_values_remaining_ = num_buffered_values;
+  bit_width_ = BitUtil::Log2(max_level + 1);
+  switch (encoding) {
+    case Encoding::RLE: {
+      num_bytes = *reinterpret_cast<const int32_t*>(data);
+      const uint8_t* decoder_data = data + sizeof(int32_t);
+      if (!rle_decoder_) {
+        rle_decoder_.reset(new RleDecoder(decoder_data, num_bytes, bit_width_));
+      } else {
+        rle_decoder_->Reset(decoder_data, num_bytes, bit_width_);
+      }
+      return sizeof(int32_t) + num_bytes;
+    }
+    case Encoding::BIT_PACKED: {
+      num_bytes =
+          static_cast<int32_t>(BitUtil::Ceil(num_buffered_values * bit_width_, 8));
+      if (!bit_packed_decoder_) {
+        bit_packed_decoder_.reset(new BitReader(data, num_bytes));
+      } else {
+        bit_packed_decoder_->Reset(data, num_bytes);
+      }
+      return num_bytes;
+    }
+    default:
+      throw ParquetException("Unknown encoding type for levels.");
+  }
+  return -1;
+}
+
+int LevelDecoder::Decode(int batch_size, int16_t* levels) {
+  int num_decoded = 0;
+
+  int num_values = std::min(num_values_remaining_, batch_size);
+  if (encoding_ == Encoding::RLE) {
+    num_decoded = rle_decoder_->GetBatch(levels, num_values);
+  } else {
+    num_decoded = bit_packed_decoder_->GetBatch(bit_width_, levels, num_values);
+  }
+  num_values_remaining_ -= num_decoded;
+  return num_decoded;
+}
+
+ReaderProperties default_reader_properties() {
+  static ReaderProperties default_reader_properties;
+  return default_reader_properties;
+}
+
+ColumnReader::ColumnReader(
+    const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool)
+    : descr_(descr),
+      pager_(std::move(pager)),
+      num_buffered_values_(0),
+      num_decoded_values_(0),
+      pool_(pool) {}
+
+ColumnReader::~ColumnReader() {}
+
+template <typename DType>
+void TypedColumnReader<DType>::ConfigureDictionary(const DictionaryPage* page) {
+  int encoding = static_cast<int>(page->encoding());
+  if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+      page->encoding() == Encoding::PLAIN) {
+    encoding = static_cast<int>(Encoding::RLE_DICTIONARY);
+  }
+
+  auto it = decoders_.find(encoding);
+  if (it != decoders_.end()) {
+    throw ParquetException("Column cannot have more than one dictionary.");
+  }
+
+  if (page->encoding() == Encoding::PLAIN_DICTIONARY ||
+      page->encoding() == Encoding::PLAIN) {
+    PlainDecoder<DType> dictionary(descr_);
+    dictionary.SetData(page->num_values(), page->data(), page->size());
+
+    // The dictionary is fully decoded during DictionaryDecoder::Init, so the
+    // DictionaryPage buffer is no longer required after this step
+    //
+    // TODO(wesm): investigate whether this all-or-nothing decoding of the
+    // dictionary makes sense and whether performance can be improved
+
+    auto decoder = std::make_shared<DictionaryDecoder<DType>>(descr_, pool_);
+    decoder->SetDict(&dictionary);
+    decoders_[encoding] = decoder;
+  } else {
+    ParquetException::NYI("only plain dictionary encoding has been implemented");
+  }
+
+  current_decoder_ = decoders_[encoding].get();
+}
+
+// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
+// encoding.
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+  return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
+}
+
+template <typename DType>
+bool TypedColumnReader<DType>::ReadNewPage() {
+  // Loop until we find the next data page.
+  const uint8_t* buffer;
+
+  while (true) {
+    current_page_ = pager_->NextPage();
+    if (!current_page_) {
+      // EOS
+      return false;
+    }
+
+    if (current_page_->type() == PageType::DICTIONARY_PAGE) {
+      ConfigureDictionary(static_cast<const DictionaryPage*>(current_page_.get()));
+      continue;
+    } else if (current_page_->type() == PageType::DATA_PAGE) {
+      const DataPage* page = static_cast<const DataPage*>(current_page_.get());
+
+      // Read a data page.
+      num_buffered_values_ = page->num_values();
+
+      // Have not decoded any values from the data page yet
+      num_decoded_values_ = 0;
+
+      buffer = page->data();
+
+      // If the data page includes repetition and definition levels, we
+      // initialize the level decoder and subtract the encoded level bytes from
+      // the page size to determine the number of bytes in the encoded data.
+      int64_t data_size = page->size();
+
+      // Data page Layout: Repetition Levels - Definition Levels - encoded values.
+      // Levels are encoded as rle or bit-packed.
+      // Init repetition levels
+      if (descr_->max_repetition_level() > 0) {
+        int64_t rep_levels_bytes = repetition_level_decoder_.SetData(
+            page->repetition_level_encoding(), descr_->max_repetition_level(),
+            static_cast<int>(num_buffered_values_), buffer);
+        buffer += rep_levels_bytes;
+        data_size -= rep_levels_bytes;
+      }
+      // TODO figure a way to set max_definition_level_ to 0
+      // if the initial value is invalid
+
+      // Init definition levels
+      if (descr_->max_definition_level() > 0) {
+        int64_t def_levels_bytes = definition_level_decoder_.SetData(
+            page->definition_level_encoding(), descr_->max_definition_level(),
+            static_cast<int>(num_buffered_values_), buffer);
+        buffer += def_levels_bytes;
+        data_size -= def_levels_bytes;
+      }
+
+      // Get a decoder object for this page or create a new decoder if this is the
+      // first page with this encoding.
+      Encoding::type encoding = page->encoding();
+
+      if (IsDictionaryIndexEncoding(encoding)) { encoding = Encoding::RLE_DICTIONARY; }
+
+      auto it = decoders_.find(static_cast<int>(encoding));
+      if (it != decoders_.end()) {
+        if (encoding == Encoding::RLE_DICTIONARY) {
+          DCHECK(current_decoder_->encoding() == Encoding::RLE_DICTIONARY);
+        }
+        current_decoder_ = it->second.get();
+      } else {
+        switch (encoding) {
+          case Encoding::PLAIN: {
+            std::shared_ptr<DecoderType> decoder(new PlainDecoder<DType>(descr_));
+            decoders_[static_cast<int>(encoding)] = decoder;
+            current_decoder_ = decoder.get();
+            break;
+          }
+          case Encoding::RLE_DICTIONARY:
+            throw ParquetException("Dictionary page must be before data page.");
+
+          case Encoding::DELTA_BINARY_PACKED:
+          case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+          case Encoding::DELTA_BYTE_ARRAY:
+            ParquetException::NYI("Unsupported encoding");
+
+          default:
+            throw ParquetException("Unknown encoding type.");
+        }
+      }
+      current_decoder_->SetData(
+          static_cast<int>(num_buffered_values_), buffer, static_cast<int>(data_size));
+      return true;
+    } else {
+      // We don't know what this page type is. We're allowed to skip non-data
+      // pages.
+      continue;
+    }
+  }
+  return true;
+}
+
+// ----------------------------------------------------------------------
+// Batch read APIs
+
+int64_t ColumnReader::ReadDefinitionLevels(int64_t batch_size, int16_t* levels) {
+  if (descr_->max_definition_level() == 0) { return 0; }
+  return definition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+}
+
+int64_t ColumnReader::ReadRepetitionLevels(int64_t batch_size, int16_t* levels) {
+  if (descr_->max_repetition_level() == 0) { return 0; }
+  return repetition_level_decoder_.Decode(static_cast<int>(batch_size), levels);
+}
+
+// ----------------------------------------------------------------------
+// Dynamic column reader constructor
+
+std::shared_ptr<ColumnReader> ColumnReader::Make(
+    const ColumnDescriptor* descr, std::unique_ptr<PageReader> pager, MemoryPool* pool) {
+  switch (descr->physical_type()) {
+    case Type::BOOLEAN:
+      return std::make_shared<BoolReader>(descr, std::move(pager), pool);
+    case Type::INT32:
+      return std::make_shared<Int32Reader>(descr, std::move(pager), pool);
+    case Type::INT64:
+      return std::make_shared<Int64Reader>(descr, std::move(pager), pool);
+    case Type::INT96:
+      return std::make_shared<Int96Reader>(descr, std::move(pager), pool);
+    case Type::FLOAT:
+      return std::make_shared<FloatReader>(descr, std::move(pager), pool);
+    case Type::DOUBLE:
+      return std::make_shared<DoubleReader>(descr, std::move(pager), pool);
+    case Type::BYTE_ARRAY:
+      return std::make_shared<ByteArrayReader>(descr, std::move(pager), pool);
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_shared<FixedLenByteArrayReader>(descr, std::move(pager), pool);
+    default:
+      ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return std::shared_ptr<ColumnReader>(nullptr);
+}
+
+// ----------------------------------------------------------------------
+// Instantiate templated classes
+
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<BooleanType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int32Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int64Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<Int96Type>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FloatType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<DoubleType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<ByteArrayType>;
+template class PARQUET_TEMPLATE_EXPORT TypedColumnReader<FLBAType>;
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
new file mode 100644
index 0000000..f4b8b02
--- /dev/null
+++ b/src/parquet/column_reader.h
@@ -0,0 +1,475 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_READER_H
+#define PARQUET_COLUMN_READER_H
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <iostream>
+#include <memory>
+#include <unordered_map>
+#include <vector>
+
+#include <arrow/util/bit-util.h>
+
+#include "parquet/column_page.h"
+#include "parquet/encoding.h"
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+class BitReader;
+class RleDecoder;
+
+class PARQUET_EXPORT LevelDecoder {
+ public:
+  LevelDecoder();
+  ~LevelDecoder();
+
+  // Initialize the LevelDecoder state with new data
+  // and return the number of bytes consumed
+  int SetData(Encoding::type encoding, int16_t max_level, int num_buffered_values,
+      const uint8_t* data);
+
+  // Decodes a batch of levels into an array and returns the number of levels decoded
+  int Decode(int batch_size, int16_t* levels);
+
+ private:
+  int bit_width_;
+  int num_values_remaining_;
+  Encoding::type encoding_;
+  std::unique_ptr<RleDecoder> rle_decoder_;
+  std::unique_ptr<BitReader> bit_packed_decoder_;
+};
+
+class PARQUET_EXPORT ColumnReader {
+ public:
+  ColumnReader(const ColumnDescriptor*, std::unique_ptr<PageReader>,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+  virtual ~ColumnReader();
+
+  static std::shared_ptr<ColumnReader> Make(const ColumnDescriptor* descr,
+      std::unique_ptr<PageReader> pager,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  // Returns true if there are still values in this column.
+  bool HasNext() {
+    // Either there is no data page available yet, or the data page has been
+    // exhausted
+    if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
+      if (!ReadNewPage() || num_buffered_values_ == 0) { return false; }
+    }
+    return true;
+  }
+
+  Type::type type() const { return descr_->physical_type(); }
+
+  const ColumnDescriptor* descr() const { return descr_; }
+
+ protected:
+  virtual bool ReadNewPage() = 0;
+
+  // Read multiple definition levels into preallocated memory
+  //
+  // Returns the number of decoded definition levels
+  int64_t ReadDefinitionLevels(int64_t batch_size, int16_t* levels);
+
+  // Read multiple repetition levels into preallocated memory
+  // Returns the number of decoded repetition levels
+  int64_t ReadRepetitionLevels(int64_t batch_size, int16_t* levels);
+
+  const ColumnDescriptor* descr_;
+
+  std::unique_ptr<PageReader> pager_;
+  std::shared_ptr<Page> current_page_;
+
+  // Not set if full schema for this field has no optional or repeated elements
+  LevelDecoder definition_level_decoder_;
+
+  // Not set for flat schemas.
+  LevelDecoder repetition_level_decoder_;
+
+  // The total number of values stored in the data page. This is the maximum of
+  // the number of encoded definition levels or encoded values. For
+  // non-repeated, required columns, this is equal to the number of encoded
+  // values. For repeated or optional values, there may be fewer data values
+  // than levels, and this tells you how many encoded levels there are in that
+  // case.
+  int64_t num_buffered_values_;
+
+  // The number of values from the current data page that have been decoded
+  // into memory
+  int64_t num_decoded_values_;
+
+  ::arrow::MemoryPool* pool_;
+};
+
+// API to read values from a single column. This is the main client facing API.
+template <typename DType>
+class PARQUET_EXPORT TypedColumnReader : public ColumnReader {
+ public:
+  typedef typename DType::c_type T;
+
+  TypedColumnReader(const ColumnDescriptor* schema, std::unique_ptr<PageReader> pager,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : ColumnReader(schema, std::move(pager), pool), current_decoder_(NULL) {}
+  virtual ~TypedColumnReader() {}
+
+  // Read a batch of repetition levels, definition levels, and values from the
+  // column.
+  //
+  // Since null values are not stored in the values, the number of values read
+  // may be less than the number of repetition and definition levels. With
+  // nested data this is almost certainly true.
+  //
+  // Set def_levels or rep_levels to nullptr if you want to skip reading them.
+  // This is only safe if you know through some other source that there are no
+  // undefined values.
+  //
+  // To fully exhaust a row group, you must read batches until the number of
+  // values read reaches the number of stored values according to the metadata.
+  //
+  // This API is the same for both V1 and V2 of the DataPage
+  //
+  // @returns: actual number of levels read (see values_read for number of values read)
+  int64_t ReadBatch(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
+      T* values, int64_t* values_read);
+
+  /// Read a batch of repetition levels, definition levels, and values from the
+  /// column and leave spaces for null entries on the lowest level in the values
+  /// buffer.
+  ///
+  /// In comparision to ReadBatch the length of repetition and definition levels
+  /// is the same as of the number of values read for max_definition_level == 1.
+  /// In the case of max_definition_level > 1, the repetition and definition
+  /// levels are larger than the values but the values include the null entries
+  /// with definition_level == (max_definition_level - 1).
+  ///
+  /// To fully exhaust a row group, you must read batches until the number of
+  /// values read reaches the number of stored values according to the metadata.
+  ///
+  /// @param batch_size the number of levels to read
+  /// @param[out] def_levels The Parquet definition levels, output has
+  ///   the length levels_read.
+  /// @param[out] rep_levels The Parquet repetition levels, output has
+  ///   the length levels_read.
+  /// @param[out] values The values in the lowest nested level including
+  ///   spacing for nulls on the lowest levels; output has the length
+  ///   values_read.
+  /// @param[out] valid_bits Memory allocated for a bitmap that indicates if
+  ///   the row is null or on the maximum definition level. For performance
+  ///   reasons the underlying buffer should be able to store 1 bit more than
+  ///   required. If this requires an additional byte, this byte is only read
+  ///   but never written to.
+  /// @param valid_bits_offset The offset in bits of the valid_bits where the
+  ///   first relevant bit resides.
+  /// @param[out] levels_read The number of repetition/definition levels that were read.
+  /// @param[out] values_read The number of values read, this includes all
+  ///   non-null entries as well as all null-entries on the lowest level
+  ///   (i.e. definition_level == max_definition_level - 1)
+  /// @param[out] null_count The number of nulls on the lowest levels.
+  ///   (i.e. (values_read - null_count) is total number of non-null entries)
+  int64_t ReadBatchSpaced(int64_t batch_size, int16_t* def_levels, int16_t* rep_levels,
+      T* values, uint8_t* valid_bits, int64_t valid_bits_offset, int64_t* levels_read,
+      int64_t* values_read, int64_t* null_count);
+
+  // Skip reading levels
+  // Returns the number of levels skipped
+  int64_t Skip(int64_t num_rows_to_skip);
+
+ private:
+  typedef Decoder<DType> DecoderType;
+
+  // Advance to the next data page
+  virtual bool ReadNewPage();
+
+  // Read up to batch_size values from the current data page into the
+  // pre-allocated memory T*
+  //
+  // @returns: the number of values read into the out buffer
+  int64_t ReadValues(int64_t batch_size, T* out);
+
+  // Read up to batch_size values from the current data page into the
+  // pre-allocated memory T*, leaving spaces for null entries according
+  // to the def_levels.
+  //
+  // @returns: the number of values read into the out buffer
+  int64_t ReadValuesSpaced(int64_t batch_size, T* out, int null_count,
+      uint8_t* valid_bits, int64_t valid_bits_offset);
+
+  // Map of encoding type to the respective decoder object. For example, a
+  // column chunk's data pages may include both dictionary-encoded and
+  // plain-encoded data.
+  std::unordered_map<int, std::shared_ptr<DecoderType>> decoders_;
+
+  void ConfigureDictionary(const DictionaryPage* page);
+
+  DecoderType* current_decoder_;
+};
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadValues(int64_t batch_size, T* out) {
+  int64_t num_decoded = current_decoder_->Decode(out, static_cast<int>(batch_size));
+  return num_decoded;
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadValuesSpaced(int64_t batch_size, T* out,
+    int null_count, uint8_t* valid_bits, int64_t valid_bits_offset) {
+  return current_decoder_->DecodeSpaced(
+      out, static_cast<int>(batch_size), null_count, valid_bits, valid_bits_offset);
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadBatch(int64_t batch_size,
+    int16_t* def_levels, int16_t* rep_levels, T* values, int64_t* values_read) {
+  // HasNext invokes ReadNewPage
+  if (!HasNext()) {
+    *values_read = 0;
+    return 0;
+  }
+
+  // TODO(wesm): keep reading data pages until batch_size is reached, or the
+  // row group is finished
+  batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
+
+  int64_t num_def_levels = 0;
+  int64_t num_rep_levels = 0;
+
+  int64_t values_to_read = 0;
+
+  // If the field is required and non-repeated, there are no definition levels
+  if (descr_->max_definition_level() > 0 && def_levels) {
+    num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
+    // TODO(wesm): this tallying of values-to-decode can be performed with better
+    // cache-efficiency if fused with the level decoding.
+    for (int64_t i = 0; i < num_def_levels; ++i) {
+      if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
+    }
+  } else {
+    // Required field, read all values
+    values_to_read = batch_size;
+  }
+
+  // Not present for non-repeated fields
+  if (descr_->max_repetition_level() > 0 && rep_levels) {
+    num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
+    if (def_levels && num_def_levels != num_rep_levels) {
+      throw ParquetException("Number of decoded rep / def levels did not match");
+    }
+  }
+
+  *values_read = ReadValues(values_to_read, values);
+  int64_t total_values = std::max(num_def_levels, *values_read);
+  num_decoded_values_ += total_values;
+
+  return total_values;
+}
+
+inline void DefinitionLevelsToBitmap(const int16_t* def_levels, int64_t num_def_levels,
+    int16_t max_definition_level, int16_t max_repetition_level, int64_t* values_read,
+    int64_t* null_count, uint8_t* valid_bits, int64_t valid_bits_offset) {
+  int byte_offset = static_cast<int>(valid_bits_offset) / 8;
+  int bit_offset = static_cast<int>(valid_bits_offset) % 8;
+  uint8_t bitset = valid_bits[byte_offset];
+
+  // TODO(itaiin): As an interim solution we are splitting the code path here
+  // between repeated+flat column reads, and non-repeated+nested reads.
+  // Those paths need to be merged in the future
+  for (int i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level) {
+      bitset |= (1 << bit_offset);
+    } else if (max_repetition_level > 0) {
+      // repetition+flat case
+      if (def_levels[i] == (max_definition_level - 1)) {
+        bitset &= ~(1 << bit_offset);
+        *null_count += 1;
+      } else {
+        continue;
+      }
+    } else {
+      // non-repeated+nested case
+      if (def_levels[i] < max_definition_level) {
+        bitset &= ~(1 << bit_offset);
+        *null_count += 1;
+      } else {
+        throw ParquetException("definition level exceeds maximum");
+      }
+    }
+
+    bit_offset++;
+    if (bit_offset == 8) {
+      bit_offset = 0;
+      valid_bits[byte_offset] = bitset;
+      byte_offset++;
+      // TODO: Except for the last byte, this shouldn't be needed
+      bitset = valid_bits[byte_offset];
+    }
+  }
+  if (bit_offset != 0) { valid_bits[byte_offset] = bitset; }
+  *values_read = (bit_offset + byte_offset * 8 - valid_bits_offset);
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::ReadBatchSpaced(int64_t batch_size,
+    int16_t* def_levels, int16_t* rep_levels, T* values, uint8_t* valid_bits,
+    int64_t valid_bits_offset, int64_t* levels_read, int64_t* values_read,
+    int64_t* null_count_out) {
+  // HasNext invokes ReadNewPage
+  if (!HasNext()) {
+    *levels_read = 0;
+    *values_read = 0;
+    *null_count_out = 0;
+    return 0;
+  }
+
+  int64_t total_values;
+  // TODO(wesm): keep reading data pages until batch_size is reached, or the
+  // row group is finished
+  batch_size = std::min(batch_size, num_buffered_values_ - num_decoded_values_);
+
+  // If the field is required and non-repeated, there are no definition levels
+  if (descr_->max_definition_level() > 0) {
+    int64_t num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
+
+    // Not present for non-repeated fields
+    if (descr_->max_repetition_level() > 0) {
+      int64_t num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
+      if (num_def_levels != num_rep_levels) {
+        throw ParquetException("Number of decoded rep / def levels did not match");
+      }
+    }
+
+    // TODO(itaiin): another code path split to merge when the general case is done
+    bool has_spaced_values;
+    if (descr_->max_repetition_level() > 0) {
+      // repeated+flat case
+      has_spaced_values = !descr_->schema_node()->is_required();
+    } else {
+      // non-repeated+nested case
+      // Find if a node forces nulls in the lowest level along the hierarchy
+      const schema::Node* node = descr_->schema_node().get();
+      has_spaced_values = false;
+      while (node) {
+        auto parent = node->parent();
+        if (node->is_optional()) {
+          has_spaced_values = true;
+          break;
+        }
+        node = parent;
+      }
+    }
+
+    int64_t null_count = 0;
+    if (!has_spaced_values) {
+      int values_to_read = 0;
+      for (int64_t i = 0; i < num_def_levels; ++i) {
+        if (def_levels[i] == descr_->max_definition_level()) { ++values_to_read; }
+      }
+      total_values = ReadValues(values_to_read, values);
+      for (int64_t i = 0; i < total_values; i++) {
+        ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
+      }
+      *values_read = total_values;
+    } else {
+      int16_t max_definition_level = descr_->max_definition_level();
+      int16_t max_repetition_level = descr_->max_repetition_level();
+      DefinitionLevelsToBitmap(def_levels, num_def_levels, max_definition_level,
+          max_repetition_level, values_read, &null_count, valid_bits, valid_bits_offset);
+      total_values = ReadValuesSpaced(*values_read, values, static_cast<int>(null_count),
+          valid_bits, valid_bits_offset);
+    }
+    *levels_read = num_def_levels;
+    *null_count_out = null_count;
+
+  } else {
+    // Required field, read all values
+    total_values = ReadValues(batch_size, values);
+    for (int64_t i = 0; i < total_values; i++) {
+      ::arrow::BitUtil::SetBit(valid_bits, valid_bits_offset + i);
+    }
+    *null_count_out = 0;
+    *levels_read = total_values;
+  }
+
+  num_decoded_values_ += *levels_read;
+  return total_values;
+}
+
+template <typename DType>
+inline int64_t TypedColumnReader<DType>::Skip(int64_t num_rows_to_skip) {
+  int64_t rows_to_skip = num_rows_to_skip;
+  while (HasNext() && rows_to_skip > 0) {
+    // If the number of rows to skip is more than the number of undecoded values, skip the
+    // Page.
+    if (rows_to_skip > (num_buffered_values_ - num_decoded_values_)) {
+      rows_to_skip -= num_buffered_values_ - num_decoded_values_;
+      num_decoded_values_ = num_buffered_values_;
+    } else {
+      // We need to read this Page
+      // Jump to the right offset in the Page
+      int64_t batch_size = 1024;  // ReadBatch with a smaller memory footprint
+      int64_t values_read = 0;
+
+      std::shared_ptr<PoolBuffer> vals = AllocateBuffer(
+          this->pool_, batch_size * type_traits<DType::type_num>::value_byte_size);
+      std::shared_ptr<PoolBuffer> def_levels =
+          AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
+
+      std::shared_ptr<PoolBuffer> rep_levels =
+          AllocateBuffer(this->pool_, batch_size * sizeof(int16_t));
+
+      do {
+        batch_size = std::min(batch_size, rows_to_skip);
+        values_read = ReadBatch(static_cast<int>(batch_size),
+            reinterpret_cast<int16_t*>(def_levels->mutable_data()),
+            reinterpret_cast<int16_t*>(rep_levels->mutable_data()),
+            reinterpret_cast<T*>(vals->mutable_data()), &values_read);
+        rows_to_skip -= values_read;
+      } while (values_read > 0 && rows_to_skip > 0);
+    }
+  }
+  return num_rows_to_skip - rows_to_skip;
+}
+
+typedef TypedColumnReader<BooleanType> BoolReader;
+typedef TypedColumnReader<Int32Type> Int32Reader;
+typedef TypedColumnReader<Int64Type> Int64Reader;
+typedef TypedColumnReader<Int96Type> Int96Reader;
+typedef TypedColumnReader<FloatType> FloatReader;
+typedef TypedColumnReader<DoubleType> DoubleReader;
+typedef TypedColumnReader<ByteArrayType> ByteArrayReader;
+typedef TypedColumnReader<FLBAType> FixedLenByteArrayReader;
+
+extern template class PARQUET_EXPORT TypedColumnReader<BooleanType>;
+extern template class PARQUET_EXPORT TypedColumnReader<Int32Type>;
+extern template class PARQUET_EXPORT TypedColumnReader<Int64Type>;
+extern template class PARQUET_EXPORT TypedColumnReader<Int96Type>;
+extern template class PARQUET_EXPORT TypedColumnReader<FloatType>;
+extern template class PARQUET_EXPORT TypedColumnReader<DoubleType>;
+extern template class PARQUET_EXPORT TypedColumnReader<ByteArrayType>;
+extern template class PARQUET_EXPORT TypedColumnReader<FLBAType>;
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_scanner-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner-test.cc b/src/parquet/column_scanner-test.cc
new file mode 100644
index 0000000..086722b
--- /dev/null
+++ b/src/parquet/column_scanner-test.cc
@@ -0,0 +1,232 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include <gtest/gtest.h>
+
+#include <algorithm>
+#include <cstdint>
+#include <cstdlib>
+#include <memory>
+#include <string>
+#include <vector>
+
+#include "parquet/column_page.h"
+#include "parquet/column_scanner.h"
+#include "parquet/schema.h"
+#include "parquet/test-specialization.h"
+#include "parquet/test-util.h"
+#include "parquet/types.h"
+#include "parquet/util/test-common.h"
+
+using std::string;
+using std::vector;
+using std::shared_ptr;
+
+namespace parquet {
+
+using schema::NodePtr;
+
+namespace test {
+
+template <>
+void InitDictValues<bool>(
+    int num_values, int dict_per_page, vector<bool>& values, vector<uint8_t>& buffer) {
+  // No op for bool
+}
+
+template <typename Type>
+class TestFlatScanner : public ::testing::Test {
+ public:
+  typedef typename Type::c_type T;
+
+  void InitScanner(const ColumnDescriptor* d) {
+    std::unique_ptr<PageReader> pager(new test::MockPageReader(pages_));
+    scanner_ = Scanner::Make(ColumnReader::Make(d, std::move(pager)));
+  }
+
+  void CheckResults(int batch_size, const ColumnDescriptor* d) {
+    TypedScanner<Type>* scanner = reinterpret_cast<TypedScanner<Type>*>(scanner_.get());
+    T val;
+    bool is_null = false;
+    int16_t def_level;
+    int16_t rep_level;
+    int j = 0;
+    scanner->SetBatchSize(batch_size);
+    for (int i = 0; i < num_levels_; i++) {
+      ASSERT_TRUE(scanner->Next(&val, &def_level, &rep_level, &is_null)) << i << j;
+      if (!is_null) {
+        ASSERT_EQ(values_[j], val) << i << "V" << j;
+        j++;
+      }
+      if (d->max_definition_level() > 0) {
+        ASSERT_EQ(def_levels_[i], def_level) << i << "D" << j;
+      }
+      if (d->max_repetition_level() > 0) {
+        ASSERT_EQ(rep_levels_[i], rep_level) << i << "R" << j;
+      }
+    }
+    ASSERT_EQ(num_values_, j);
+    ASSERT_FALSE(scanner->Next(&val, &def_level, &rep_level, &is_null));
+  }
+
+  void Clear() {
+    pages_.clear();
+    values_.clear();
+    def_levels_.clear();
+    rep_levels_.clear();
+  }
+
+  void Execute(int num_pages, int levels_per_page, int batch_size,
+      const ColumnDescriptor* d, Encoding::type encoding) {
+    num_values_ = MakePages<Type>(d, num_pages, levels_per_page, def_levels_, rep_levels_,
+        values_, data_buffer_, pages_, encoding);
+    num_levels_ = num_pages * levels_per_page;
+    InitScanner(d);
+    CheckResults(batch_size, d);
+    Clear();
+  }
+
+  void InitDescriptors(std::shared_ptr<ColumnDescriptor>& d1,
+      std::shared_ptr<ColumnDescriptor>& d2, std::shared_ptr<ColumnDescriptor>& d3,
+      int length) {
+    NodePtr type;
+    type = schema::PrimitiveNode::Make(
+        "c1", Repetition::REQUIRED, Type::type_num, LogicalType::NONE, length);
+    d1.reset(new ColumnDescriptor(type, 0, 0));
+    type = schema::PrimitiveNode::Make(
+        "c2", Repetition::OPTIONAL, Type::type_num, LogicalType::NONE, length);
+    d2.reset(new ColumnDescriptor(type, 4, 0));
+    type = schema::PrimitiveNode::Make(
+        "c3", Repetition::REPEATED, Type::type_num, LogicalType::NONE, length);
+    d3.reset(new ColumnDescriptor(type, 4, 2));
+  }
+
+  void ExecuteAll(int num_pages, int num_levels, int batch_size, int type_length,
+      Encoding::type encoding = Encoding::PLAIN) {
+    std::shared_ptr<ColumnDescriptor> d1;
+    std::shared_ptr<ColumnDescriptor> d2;
+    std::shared_ptr<ColumnDescriptor> d3;
+    InitDescriptors(d1, d2, d3, type_length);
+    // evaluate REQUIRED pages
+    Execute(num_pages, num_levels, batch_size, d1.get(), encoding);
+    // evaluate OPTIONAL pages
+    Execute(num_pages, num_levels, batch_size, d2.get(), encoding);
+    // evaluate REPEATED pages
+    Execute(num_pages, num_levels, batch_size, d3.get(), encoding);
+  }
+
+ protected:
+  int num_levels_;
+  int num_values_;
+  vector<shared_ptr<Page>> pages_;
+  std::shared_ptr<Scanner> scanner_;
+  vector<T> values_;
+  vector<int16_t> def_levels_;
+  vector<int16_t> rep_levels_;
+  vector<uint8_t> data_buffer_;  // For BA and FLBA
+};
+
+static int num_levels_per_page = 100;
+static int num_pages = 20;
+static int batch_size = 32;
+
+typedef ::testing::Types<Int32Type, Int64Type, Int96Type, FloatType, DoubleType,
+    ByteArrayType>
+    TestTypes;
+
+using TestBooleanFlatScanner = TestFlatScanner<BooleanType>;
+using TestFLBAFlatScanner = TestFlatScanner<FLBAType>;
+
+TYPED_TEST_CASE(TestFlatScanner, TestTypes);
+
+TYPED_TEST(TestFlatScanner, TestPlainScanner) {
+  this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0, Encoding::PLAIN);
+}
+
+TYPED_TEST(TestFlatScanner, TestDictScanner) {
+  this->ExecuteAll(
+      num_pages, num_levels_per_page, batch_size, 0, Encoding::RLE_DICTIONARY);
+}
+
+TEST_F(TestBooleanFlatScanner, TestPlainScanner) {
+  this->ExecuteAll(num_pages, num_levels_per_page, batch_size, 0);
+}
+
+TEST_F(TestFLBAFlatScanner, TestPlainScanner) {
+  this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH);
+}
+
+TEST_F(TestFLBAFlatScanner, TestDictScanner) {
+  this->ExecuteAll(
+      num_pages, num_levels_per_page, batch_size, FLBA_LENGTH, Encoding::RLE_DICTIONARY);
+}
+
+TEST_F(TestFLBAFlatScanner, TestPlainDictScanner) {
+  this->ExecuteAll(num_pages, num_levels_per_page, batch_size, FLBA_LENGTH,
+      Encoding::PLAIN_DICTIONARY);
+}
+
+// PARQUET 502
+TEST_F(TestFLBAFlatScanner, TestSmallBatch) {
+  NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::REQUIRED,
+      Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+  const ColumnDescriptor d(type, 0, 0);
+  num_values_ = MakePages<FLBAType>(
+      &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+  num_levels_ = 1 * 100;
+  InitScanner(&d);
+  CheckResults(1, &d);
+}
+
+TEST_F(TestFLBAFlatScanner, TestDescriptorAPI) {
+  NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
+      Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+  const ColumnDescriptor d(type, 4, 0);
+  num_values_ = MakePages<FLBAType>(
+      &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+  num_levels_ = 1 * 100;
+  InitScanner(&d);
+  TypedScanner<FLBAType>* scanner =
+      reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
+  ASSERT_EQ(10, scanner->descr()->type_precision());
+  ASSERT_EQ(2, scanner->descr()->type_scale());
+  ASSERT_EQ(FLBA_LENGTH, scanner->descr()->type_length());
+}
+
+TEST_F(TestFLBAFlatScanner, TestFLBAPrinterNext) {
+  NodePtr type = schema::PrimitiveNode::Make("c1", Repetition::OPTIONAL,
+      Type::FIXED_LEN_BYTE_ARRAY, LogicalType::DECIMAL, FLBA_LENGTH, 10, 2);
+  const ColumnDescriptor d(type, 4, 0);
+  num_values_ = MakePages<FLBAType>(
+      &d, 1, 100, def_levels_, rep_levels_, values_, data_buffer_, pages_);
+  num_levels_ = 1 * 100;
+  InitScanner(&d);
+  TypedScanner<FLBAType>* scanner =
+      reinterpret_cast<TypedScanner<FLBAType>*>(scanner_.get());
+  scanner->SetBatchSize(batch_size);
+  std::stringstream ss_fail;
+  for (int i = 0; i < num_levels_; i++) {
+    std::stringstream ss;
+    scanner->PrintNext(ss, 17);
+    std::string result = ss.str();
+    ASSERT_LE(17, result.size()) << i;
+  }
+  ASSERT_THROW(scanner->PrintNext(ss_fail, 17), ParquetException);
+}
+
+}  // namespace test
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner.cc b/src/parquet/column_scanner.cc
new file mode 100644
index 0000000..a67af71
--- /dev/null
+++ b/src/parquet/column_scanner.cc
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column_scanner.h"
+
+#include <cstdint>
+#include <memory>
+
+#include "parquet/column_reader.h"
+#include "parquet/util/memory.h"
+
+using arrow::MemoryPool;
+
+namespace parquet {
+
+std::shared_ptr<Scanner> Scanner::Make(
+    std::shared_ptr<ColumnReader> col_reader, int64_t batch_size, MemoryPool* pool) {
+  switch (col_reader->type()) {
+    case Type::BOOLEAN:
+      return std::make_shared<BoolScanner>(col_reader, batch_size, pool);
+    case Type::INT32:
+      return std::make_shared<Int32Scanner>(col_reader, batch_size, pool);
+    case Type::INT64:
+      return std::make_shared<Int64Scanner>(col_reader, batch_size, pool);
+    case Type::INT96:
+      return std::make_shared<Int96Scanner>(col_reader, batch_size, pool);
+    case Type::FLOAT:
+      return std::make_shared<FloatScanner>(col_reader, batch_size, pool);
+    case Type::DOUBLE:
+      return std::make_shared<DoubleScanner>(col_reader, batch_size, pool);
+    case Type::BYTE_ARRAY:
+      return std::make_shared<ByteArrayScanner>(col_reader, batch_size, pool);
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size, pool);
+    default:
+      ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return std::shared_ptr<Scanner>(nullptr);
+}
+
+int64_t ScanAllValues(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
+    uint8_t* values, int64_t* values_buffered, parquet::ColumnReader* reader) {
+  switch (reader->type()) {
+    case parquet::Type::BOOLEAN:
+      return ScanAll<parquet::BoolReader>(
+          batch_size, def_levels, rep_levels, values, values_buffered, reader);
+    case parquet::Type::INT32:
+      return ScanAll<parquet::Int32Reader>(
+          batch_size, def_levels, rep_levels, values, values_buffered, reader);
+    case parquet::Type::INT64:
+      return ScanAll<parquet::Int64Reader>(
+          batch_size, def_levels, rep_levels, values, values_buffered, reader);
+    case parquet::Type::INT96:
+      return ScanAll<parquet::Int96Reader>(
+          batch_size, def_levels, rep_levels, values, values_buffered, reader);
+    case parquet::Type::FLOAT:
+      return ScanAll<parquet::FloatReader>(
+          batch_size, def_levels, rep_levels, values, values_buffered, reader);
+    case parquet::Type::DOUBLE:
+      return ScanAll<parquet::DoubleReader>(
+          batch_size, def_levels, rep_levels, values, values_buffered, reader);
+    case parquet::Type::BYTE_ARRAY:
+      return ScanAll<parquet::ByteArrayReader>(
+          batch_size, def_levels, rep_levels, values, values_buffered, reader);
+    case parquet::Type::FIXED_LEN_BYTE_ARRAY:
+      return ScanAll<parquet::FixedLenByteArrayReader>(
+          batch_size, def_levels, rep_levels, values, values_buffered, reader);
+    default:
+      parquet::ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return 0;
+}
+
+}  // namespace parquet

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/84db929e/src/parquet/column_scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_scanner.h b/src/parquet/column_scanner.h
new file mode 100644
index 0000000..4be0b7f
--- /dev/null
+++ b/src/parquet/column_scanner.h
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_SCANNER_H
+#define PARQUET_COLUMN_SCANNER_H
+
+#include <cstdint>
+#include <memory>
+#include <ostream>
+#include <stdio.h>
+#include <string>
+#include <vector>
+
+#include "parquet/column_reader.h"
+#include "parquet/exception.h"
+#include "parquet/schema.h"
+#include "parquet/types.h"
+#include "parquet/util/memory.h"
+#include "parquet/util/visibility.h"
+
+namespace parquet {
+
+static constexpr int64_t DEFAULT_SCANNER_BATCH_SIZE = 128;
+
+class PARQUET_EXPORT Scanner {
+ public:
+  explicit Scanner(std::shared_ptr<ColumnReader> reader,
+      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : batch_size_(batch_size),
+        level_offset_(0),
+        levels_buffered_(0),
+        value_buffer_(std::make_shared<PoolBuffer>(pool)),
+        value_offset_(0),
+        values_buffered_(0),
+        reader_(reader) {
+    def_levels_.resize(descr()->max_definition_level() > 0 ? batch_size_ : 0);
+    rep_levels_.resize(descr()->max_repetition_level() > 0 ? batch_size_ : 0);
+  }
+
+  virtual ~Scanner() {}
+
+  static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
+      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool());
+
+  virtual void PrintNext(std::ostream& out, int width) = 0;
+
+  bool HasNext() { return level_offset_ < levels_buffered_ || reader_->HasNext(); }
+
+  const ColumnDescriptor* descr() const { return reader_->descr(); }
+
+  int64_t batch_size() const { return batch_size_; }
+
+  void SetBatchSize(int64_t batch_size) { batch_size_ = batch_size; }
+
+ protected:
+  int64_t batch_size_;
+
+  std::vector<int16_t> def_levels_;
+  std::vector<int16_t> rep_levels_;
+  int level_offset_;
+  int levels_buffered_;
+
+  std::shared_ptr<PoolBuffer> value_buffer_;
+  int value_offset_;
+  int64_t values_buffered_;
+
+ private:
+  std::shared_ptr<ColumnReader> reader_;
+};
+
+template <typename DType>
+class PARQUET_EXPORT TypedScanner : public Scanner {
+ public:
+  typedef typename DType::c_type T;
+
+  explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
+      int64_t batch_size = DEFAULT_SCANNER_BATCH_SIZE,
+      ::arrow::MemoryPool* pool = ::arrow::default_memory_pool())
+      : Scanner(reader, batch_size, pool) {
+    typed_reader_ = static_cast<TypedColumnReader<DType>*>(reader.get());
+    int value_byte_size = type_traits<DType::type_num>::value_byte_size;
+    PARQUET_THROW_NOT_OK(value_buffer_->Resize(batch_size_ * value_byte_size));
+    values_ = reinterpret_cast<T*>(value_buffer_->mutable_data());
+  }
+
+  virtual ~TypedScanner() {}
+
+  bool NextLevels(int16_t* def_level, int16_t* rep_level) {
+    if (level_offset_ == levels_buffered_) {
+      levels_buffered_ =
+          static_cast<int>(typed_reader_->ReadBatch(static_cast<int>(batch_size_),
+              def_levels_.data(), rep_levels_.data(), values_, &values_buffered_));
+
+      value_offset_ = 0;
+      level_offset_ = 0;
+      if (!levels_buffered_) { return false; }
+    }
+    *def_level = descr()->max_definition_level() > 0 ? def_levels_[level_offset_] : 0;
+    *rep_level = descr()->max_repetition_level() > 0 ? rep_levels_[level_offset_] : 0;
+    level_offset_++;
+    return true;
+  }
+
+  bool Next(T* val, int16_t* def_level, int16_t* rep_level, bool* is_null) {
+    if (level_offset_ == levels_buffered_) {
+      if (!HasNext()) {
+        // Out of data pages
+        return false;
+      }
+    }
+
+    NextLevels(def_level, rep_level);
+    *is_null = *def_level < descr()->max_definition_level();
+
+    if (*is_null) { return true; }
+
+    if (value_offset_ == values_buffered_) {
+      throw ParquetException("Value was non-null, but has not been buffered");
+    }
+    *val = values_[value_offset_++];
+    return true;
+  }
+
+  // Returns true if there is a next value
+  bool NextValue(T* val, bool* is_null) {
+    if (level_offset_ == levels_buffered_) {
+      if (!HasNext()) {
+        // Out of data pages
+        return false;
+      }
+    }
+
+    // Out of values
+    int16_t def_level = -1;
+    int16_t rep_level = -1;
+    NextLevels(&def_level, &rep_level);
+    *is_null = def_level < descr()->max_definition_level();
+
+    if (*is_null) { return true; }
+
+    if (value_offset_ == values_buffered_) {
+      throw ParquetException("Value was non-null, but has not been buffered");
+    }
+    *val = values_[value_offset_++];
+    return true;
+  }
+
+  virtual void PrintNext(std::ostream& out, int width) {
+    T val;
+    bool is_null = false;
+    char buffer[25];
+
+    if (!NextValue(&val, &is_null)) { throw ParquetException("No more values buffered"); }
+
+    if (is_null) {
+      std::string null_fmt = format_fwf<ByteArrayType>(width);
+      snprintf(buffer, sizeof(buffer), null_fmt.c_str(), "NULL");
+    } else {
+      FormatValue(&val, buffer, sizeof(buffer), width);
+    }
+    out << buffer;
+  }
+
+ private:
+  // The ownership of this object is expressed through the reader_ variable in the base
+  TypedColumnReader<DType>* typed_reader_;
+
+  inline void FormatValue(void* val, char* buffer, int bufsize, int width);
+
+  T* values_;
+};
+
+template <typename DType>
+inline void TypedScanner<DType>::FormatValue(
+    void* val, char* buffer, int bufsize, int width) {
+  std::string fmt = format_fwf<DType>(width);
+  snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
+}
+
+template <>
+inline void TypedScanner<Int96Type>::FormatValue(
+    void* val, char* buffer, int bufsize, int width) {
+  std::string fmt = format_fwf<Int96Type>(width);
+  std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
+  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+template <>
+inline void TypedScanner<ByteArrayType>::FormatValue(
+    void* val, char* buffer, int bufsize, int width) {
+  std::string fmt = format_fwf<ByteArrayType>(width);
+  std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
+  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+template <>
+inline void TypedScanner<FLBAType>::FormatValue(
+    void* val, char* buffer, int bufsize, int width) {
+  std::string fmt = format_fwf<FLBAType>(width);
+  std::string result = FixedLenByteArrayToString(
+      *reinterpret_cast<FixedLenByteArray*>(val), descr()->type_length());
+  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+typedef TypedScanner<BooleanType> BoolScanner;
+typedef TypedScanner<Int32Type> Int32Scanner;
+typedef TypedScanner<Int64Type> Int64Scanner;
+typedef TypedScanner<Int96Type> Int96Scanner;
+typedef TypedScanner<FloatType> FloatScanner;
+typedef TypedScanner<DoubleType> DoubleScanner;
+typedef TypedScanner<ByteArrayType> ByteArrayScanner;
+typedef TypedScanner<FLBAType> FixedLenByteArrayScanner;
+
+template <typename RType>
+int64_t ScanAll(int32_t batch_size, int16_t* def_levels, int16_t* rep_levels,
+    uint8_t* values, int64_t* values_buffered, parquet::ColumnReader* reader) {
+  typedef typename RType::T Type;
+  auto typed_reader = static_cast<RType*>(reader);
+  auto vals = reinterpret_cast<Type*>(&values[0]);
+  return typed_reader->ReadBatch(
+      batch_size, def_levels, rep_levels, vals, values_buffered);
+}
+
+int64_t PARQUET_EXPORT ScanAllValues(int32_t batch_size, int16_t* def_levels,
+    int16_t* rep_levels, uint8_t* values, int64_t* values_buffered,
+    parquet::ColumnReader* reader);
+
+}  // namespace parquet
+
+#endif  // PARQUET_COLUMN_SCANNER_H