You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by ju...@apache.org on 2016/01/30 22:15:44 UTC

parquet-cpp git commit: PARQUET-435: Change column reader methods to be array-oriented rather than scalar

Repository: parquet-cpp
Updated Branches:
  refs/heads/master 5e938171b -> dbf1cf1cc


PARQUET-435: Change column reader methods to be array-oriented rather than scalar

Column scanning and record reconstruction is independent of the Parquet file format and depends, among other things, on the data structures where the reconstructed data will end up. This is a work-in progress, but the basic idea is:

- APIs for reading a batch of repetition `ReadRepetitionLevels` or definition levels `ReadDefinitionLevels` into a preallocated `int16_t*`
- APIs for reading arrays of decoded values into preallocated memory (`ReadValues`)

These methods are only able to read data within a particular data page. Once you exhaust the data available in the data page (`ReadValues` returns 0), you must call `ReadNewPage`, which returns `true` is there is more data available.

Separately, I added a simple `Scanner` class that emulates the scalar value iteration functionality that existed previously. I used this to reimplement the `DebugPrint` method in `parquet_scanner.cc`. This obviously only works currently for flat data.

I would like to keep the `ColumnReader` low level and primitive, concerned only with providing access to the raw data in a Parquet file as fast as possible. We can devise separate algorithms for inferring nested record structure by examining the arrays of decoded values and repetition/definition levels. The major benefit of separating raw data access from structure inference is that this can be pipelined with threads: one thread decompresses and decodes values and levels, and another thread can turn batches into a nested record- or column-oriented structure.

Author: Wes McKinney <we...@cloudera.com>

Closes #26 from wesm/PARQUET-435 and squashes the following commits:

4bf5cd4 [Wes McKinney] Fix cpplint
852f4ec [Wes McKinney] Address review comments, also be sure to use Scanner::HasNext
7ea261e [Wes McKinney] Add TODO comment
4999719 [Wes McKinney] Make ColumnReader::ReadNewPage private and call HasNext() in ReadBatch
0d2e111 [Wes McKinney] Fix function description. Change #define to constexpr
111ef13 [Wes McKinney] Incorporate review comments and add some better comments
e16f7fd [Wes McKinney] Typo
ef52404 [Wes McKinney] Fix function doc
5e95cda [Wes McKinney] Configurable scanner batch size. Do not use printf in DebugPrint
1b4eca0 [Wes McKinney] New batch read API which reads levels and values in one shot
de4d6b6 [Wes McKinney] Move column_* files into parquet/column folder
aad4a86 [Wes McKinney] Finish refactoring scanner API with shared pointers
4506748 [Wes McKinney] Refactoring, do not have shared_from_this working yet
6489b15 [Wes McKinney] Batch level/value read interface on ColumnReader. Add Scanner class for flat columns. Add a couple smoke unit tests


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/dbf1cf1c
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/dbf1cf1c
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/dbf1cf1c

Branch: refs/heads/master
Commit: dbf1cf1cc0a856e796121d4309ad6f5a1769a42e
Parents: 5e93817
Author: Wes McKinney <we...@cloudera.com>
Authored: Sat Jan 30 13:15:39 2016 -0800
Committer: Julien Le Dem <ju...@dremio.com>
Committed: Sat Jan 30 13:15:39 2016 -0800

----------------------------------------------------------------------
 CMakeLists.txt                    |   3 +-
 src/parquet/CMakeLists.txt        |   1 -
 src/parquet/column/CMakeLists.txt |  22 +++
 src/parquet/column/reader.cc      | 235 ++++++++++++++++++++++++++++++
 src/parquet/column/reader.h       | 256 +++++++++++++++++++++++++++++++++
 src/parquet/column/scanner.cc     |  57 ++++++++
 src/parquet/column/scanner.h      | 215 +++++++++++++++++++++++++++
 src/parquet/column_reader.cc      | 194 -------------------------
 src/parquet/column_reader.h       | 182 -----------------------
 src/parquet/parquet.h             |   2 +-
 src/parquet/reader-test.cc        |  59 ++++++++
 src/parquet/reader.cc             | 126 ++++------------
 src/parquet/reader.h              |   6 +-
 src/parquet/types.h               |  21 ++-
 14 files changed, 896 insertions(+), 483 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index b66e296..94e73dd 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -213,7 +213,8 @@ set(PARQUET_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS})
 # Library config
 
 set(LIBPARQUET_SRCS
-  src/parquet/column_reader.cc
+  src/parquet/column/reader.cc
+  src/parquet/column/scanner.cc
   src/parquet/reader.cc
 )
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index 1809ea1..2d69ba0 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -18,7 +18,6 @@
 # Headers: top level
 install(FILES
   parquet.h
-  column_reader.h
   reader.h
   exception.h
   types.h

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/column/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/column/CMakeLists.txt b/src/parquet/column/CMakeLists.txt
new file mode 100644
index 0000000..20f6167
--- /dev/null
+++ b/src/parquet/column/CMakeLists.txt
@@ -0,0 +1,22 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Headers: top level
+install(FILES
+  reader.h
+  scanner.h
+  DESTINATION include/parquet/column)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/column/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.cc b/src/parquet/column/reader.cc
new file mode 100644
index 0000000..bc19464
--- /dev/null
+++ b/src/parquet/column/reader.cc
@@ -0,0 +1,235 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column/reader.h"
+
+#include <algorithm>
+#include <string>
+#include <string.h>
+
+#include "parquet/compression/codec.h"
+#include "parquet/encodings/encodings.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/input_stream.h"
+
+const int DATA_PAGE_SIZE = 64 * 1024;
+
+namespace parquet_cpp {
+
+using parquet::CompressionCodec;
+using parquet::Encoding;
+using parquet::FieldRepetitionType;
+using parquet::PageType;
+using parquet::Type;
+
+ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
+    const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream)
+  : metadata_(metadata),
+    schema_(schema),
+    stream_(std::move(stream)),
+    num_buffered_values_(0),
+    num_decoded_values_(0) {
+
+  switch (metadata->codec) {
+    case CompressionCodec::UNCOMPRESSED:
+      break;
+    case CompressionCodec::SNAPPY:
+      decompressor_.reset(new SnappyCodec());
+      break;
+    default:
+      ParquetException::NYI("Reading compressed data");
+  }
+
+  config_ = Config::DefaultConfig();
+}
+
+
+// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
+// encoding.
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+  return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
+}
+
+template <int TYPE>
+bool TypedColumnReader<TYPE>::ReadNewPage() {
+  // Loop until we find the next data page.
+
+  while (true) {
+    int bytes_read = 0;
+    const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
+    if (bytes_read == 0) return false;
+    uint32_t header_size = bytes_read;
+    DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
+    stream_->Read(header_size, &bytes_read);
+
+    int compressed_len = current_page_header_.compressed_page_size;
+    int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+    // Read the compressed data page.
+    buffer = stream_->Read(compressed_len, &bytes_read);
+    if (bytes_read != compressed_len) ParquetException::EofException();
+
+    // Uncompress it if we need to
+    if (decompressor_ != NULL) {
+      // Grow the uncompressed buffer if we need to.
+      if (uncompressed_len > decompression_buffer_.size()) {
+        decompression_buffer_.resize(uncompressed_len);
+      }
+      decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+          &decompression_buffer_[0]);
+      buffer = &decompression_buffer_[0];
+    }
+
+    if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
+      auto it = decoders_.find(Encoding::RLE_DICTIONARY);
+      if (it != decoders_.end()) {
+        throw ParquetException("Column cannot have more than one dictionary.");
+      }
+
+      PlainDecoder<TYPE> dictionary(schema_);
+      dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
+          buffer, uncompressed_len);
+      std::shared_ptr<DecoderType> decoder(new DictionaryDecoder<TYPE>(schema_, &dictionary));
+
+      decoders_[Encoding::RLE_DICTIONARY] = decoder;
+      current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
+      continue;
+    } else if (current_page_header_.type == PageType::DATA_PAGE) {
+      // Read a data page.
+      num_buffered_values_ = current_page_header_.data_page_header.num_values;
+
+      // Have not decoded any values from the data page yet
+      num_decoded_values_ = 0;
+
+      // Read definition levels.
+      if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
+        int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
+
+        // Temporary hack until schema resolution
+        max_definition_level_ = 1;
+
+        buffer += sizeof(uint32_t);
+        definition_level_decoder_.reset(
+            new RleDecoder(buffer, num_definition_bytes, 1));
+        buffer += num_definition_bytes;
+        uncompressed_len -= sizeof(uint32_t);
+        uncompressed_len -= num_definition_bytes;
+      } else {
+        // REQUIRED field
+        max_definition_level_ = 0;
+      }
+
+      // TODO: repetition levels
+
+      // Get a decoder object for this page or create a new decoder if this is the
+      // first page with this encoding.
+      Encoding::type encoding = current_page_header_.data_page_header.encoding;
+      if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
+
+      auto it = decoders_.find(encoding);
+      if (it != decoders_.end()) {
+        current_decoder_ = it->second.get();
+      } else {
+        switch (encoding) {
+          case Encoding::PLAIN: {
+            std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(schema_));
+            decoders_[encoding] = decoder;
+            current_decoder_ = decoder.get();
+            break;
+          }
+          case Encoding::RLE_DICTIONARY:
+            throw ParquetException("Dictionary page must be before data page.");
+
+          case Encoding::DELTA_BINARY_PACKED:
+          case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+          case Encoding::DELTA_BYTE_ARRAY:
+            ParquetException::NYI("Unsupported encoding");
+
+          default:
+            throw ParquetException("Unknown encoding type.");
+        }
+      }
+      current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
+      return true;
+    } else {
+      // We don't know what this page type is. We're allowed to skip non-data pages.
+      continue;
+    }
+  }
+  return true;
+}
+
+// ----------------------------------------------------------------------
+// Batch read APIs
+
+static size_t DecodeMany(RleDecoder* decoder, int16_t* levels, size_t batch_size) {
+  size_t num_decoded = 0;
+
+  // TODO(wesm): Push this decoding down into RleDecoder itself
+  for (size_t i = 0; i < batch_size; ++i) {
+    if (!decoder->Get(levels + i)) {
+      break;
+    }
+    ++num_decoded;
+  }
+  return num_decoded;
+}
+
+size_t ColumnReader::ReadDefinitionLevels(size_t batch_size, int16_t* levels) {
+  if (!definition_level_decoder_) {
+    return 0;
+  }
+  return DecodeMany(definition_level_decoder_.get(), levels, batch_size);
+}
+
+size_t ColumnReader::ReadRepetitionLevels(size_t batch_size, int16_t* levels) {
+  if (!repetition_level_decoder_) {
+    return 0;
+  }
+  return DecodeMany(repetition_level_decoder_.get(), levels, batch_size);
+}
+
+// ----------------------------------------------------------------------
+// Dynamic column reader constructor
+
+std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
+    const parquet::SchemaElement* element, std::unique_ptr<InputStream> stream) {
+  switch (metadata->type) {
+    case Type::BOOLEAN:
+      return std::make_shared<BoolReader>(metadata, element, std::move(stream));
+    case Type::INT32:
+      return std::make_shared<Int32Reader>(metadata, element, std::move(stream));
+    case Type::INT64:
+      return std::make_shared<Int64Reader>(metadata, element, std::move(stream));
+    case Type::INT96:
+      return std::make_shared<Int96Reader>(metadata, element, std::move(stream));
+    case Type::FLOAT:
+      return std::make_shared<FloatReader>(metadata, element, std::move(stream));
+    case Type::DOUBLE:
+      return std::make_shared<DoubleReader>(metadata, element, std::move(stream));
+    case Type::BYTE_ARRAY:
+      return std::make_shared<ByteArrayReader>(metadata, element, std::move(stream));
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_shared<FixedLenByteArrayReader>(metadata, element, std::move(stream));
+    default:
+      ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return std::shared_ptr<ColumnReader>(nullptr);
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/column/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/reader.h b/src/parquet/column/reader.h
new file mode 100644
index 0000000..0976836
--- /dev/null
+++ b/src/parquet/column/reader.h
@@ -0,0 +1,256 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_READER_H
+#define PARQUET_COLUMN_READER_H
+
+#include <algorithm>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+#include "parquet/thrift/parquet_constants.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/util/input_stream.h"
+#include "parquet/encodings/encodings.h"
+#include "parquet/util/rle-encoding.h"
+
+namespace std {
+
+template <>
+struct hash<parquet::Encoding::type> {
+  std::size_t operator()(const parquet::Encoding::type& k) const {
+    return hash<int>()(static_cast<int>(k));
+  }
+};
+
+} // namespace std
+
+namespace parquet_cpp {
+
+class Codec;
+class Scanner;
+
+class ColumnReader {
+ public:
+
+  struct Config {
+    int batch_size;
+
+    static Config DefaultConfig() {
+      Config config;
+      config.batch_size = 128;
+      return config;
+    }
+  };
+
+  ColumnReader(const parquet::ColumnMetaData*, const parquet::SchemaElement*,
+      std::unique_ptr<InputStream> stream);
+
+  static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
+      const parquet::SchemaElement*, std::unique_ptr<InputStream> stream);
+
+  // Returns true if there are still values in this column.
+  bool HasNext() {
+    // Either there is no data page available yet, or the data page has been
+    // exhausted
+    if (num_buffered_values_ == 0 || num_decoded_values_ == num_buffered_values_) {
+      if (!ReadNewPage() || num_buffered_values_ == 0) {
+        return false;
+      }
+    }
+    return true;
+  }
+
+  parquet::Type::type type() const {
+    return metadata_->type;
+  }
+
+  const parquet::ColumnMetaData* metadata() const {
+    return metadata_;
+  }
+
+  const parquet::SchemaElement* schema() const {
+    return schema_;
+  }
+
+ protected:
+  virtual bool ReadNewPage() = 0;
+
+  // Read multiple definition levels into preallocated memory
+  //
+  // Returns the number of decoded definition levels
+  size_t ReadDefinitionLevels(size_t batch_size, int16_t* levels);
+
+  // Read multiple repetition levels into preallocated memory
+  //
+  // Returns the number of decoded repetition levels
+  size_t ReadRepetitionLevels(size_t batch_size, int16_t* levels);
+
+  Config config_;
+
+  const parquet::ColumnMetaData* metadata_;
+  const parquet::SchemaElement* schema_;
+  std::unique_ptr<InputStream> stream_;
+
+  // Compression codec to use.
+  std::unique_ptr<Codec> decompressor_;
+  std::vector<uint8_t> decompression_buffer_;
+
+  parquet::PageHeader current_page_header_;
+
+  // Not set if full schema for this field has no optional or repeated elements
+  std::unique_ptr<RleDecoder> definition_level_decoder_;
+
+  // Not set for flat schemas.
+  std::unique_ptr<RleDecoder> repetition_level_decoder_;
+
+  // Temporarily storing this to assist with batch reading
+  int16_t max_definition_level_;
+
+  // The total number of values stored in the data page. This is the maximum of
+  // the number of encoded definition levels or encoded values. For
+  // non-repeated, required columns, this is equal to the number of encoded
+  // values. For repeated or optional values, there may be fewer data values
+  // than levels, and this tells you how many encoded levels there are in that
+  // case.
+  int num_buffered_values_;
+
+  // The number of values from the current data page that have been decoded
+  // into memory
+  int num_decoded_values_;
+};
+
+// API to read values from a single column. This is the main client facing API.
+template <int TYPE>
+class TypedColumnReader : public ColumnReader {
+ public:
+  typedef typename type_traits<TYPE>::value_type T;
+
+  TypedColumnReader(const parquet::ColumnMetaData* metadata,
+      const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) :
+      ColumnReader(metadata, schema, std::move(stream)),
+      current_decoder_(NULL) {
+    size_t value_byte_size = type_traits<TYPE>::value_byte_size;
+    values_buffer_.resize(config_.batch_size * value_byte_size);
+  }
+
+  // Read a batch of repetition levels, definition levels, and values from the
+  // column.
+  //
+  // Since null values are not stored in the values, the number of values read
+  // may be less than the number of repetition and definition levels. With
+  // nested data this is almost certainly true.
+  //
+  // To fully exhaust a row group, you must read batches until the number of
+  // values read reaches the number of stored values according to the metadata.
+  //
+  // This API is the same for both V1 and V2 of the DataPage
+  //
+  // @returns: actual number of levels read (see values_read for number of values read)
+  size_t ReadBatch(int batch_size, int16_t* def_levels, int16_t* rep_levels,
+      T* values, size_t* values_read);
+
+ private:
+  typedef Decoder<TYPE> DecoderType;
+
+  // Advance to the next data page
+  virtual bool ReadNewPage();
+
+  // Read up to batch_size values from the current data page into the
+  // pre-allocated memory T*
+  //
+  // @returns: the number of values read into the out buffer
+  size_t ReadValues(size_t batch_size, T* out);
+
+  // Map of compression type to decompressor object.
+  std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;
+
+  DecoderType* current_decoder_;
+  std::vector<uint8_t> values_buffer_;
+};
+
+
+template <int TYPE>
+inline size_t TypedColumnReader<TYPE>::ReadValues(size_t batch_size, T* out) {
+  size_t num_decoded = current_decoder_->Decode(out, batch_size);
+  num_decoded_values_ += num_decoded;
+  return num_decoded;
+}
+
+template <int TYPE>
+inline size_t TypedColumnReader<TYPE>::ReadBatch(int batch_size, int16_t* def_levels,
+    int16_t* rep_levels, T* values, size_t* values_read) {
+  // HasNext invokes ReadNewPage
+  if (!HasNext()) {
+    *values_read = 0;
+    return 0;
+  }
+
+  // TODO(wesm): keep reading data pages until batch_size is reached, or the
+  // row group is finished
+  batch_size = std::min(batch_size, num_buffered_values_);
+
+  size_t num_def_levels = 0;
+  size_t num_rep_levels = 0;
+
+  // If the field is required and non-repeated, there are no definition levels
+  if (definition_level_decoder_) {
+    num_def_levels = ReadDefinitionLevels(batch_size, def_levels);
+  }
+
+  // Not present for non-repeated fields
+  if (repetition_level_decoder_) {
+    num_rep_levels = ReadRepetitionLevels(batch_size, rep_levels);
+
+    if (num_def_levels != num_rep_levels) {
+      throw ParquetException("Number of decoded rep / def levels did not match");
+    }
+  }
+
+  // TODO(wesm): this tallying of values-to-decode can be performed with better
+  // cache-efficiency if fused with the level decoding.
+  size_t values_to_read = 0;
+  for (size_t i = 0; i < num_def_levels; ++i) {
+    if (def_levels[i] == max_definition_level_) {
+      ++values_to_read;
+    }
+  }
+
+  *values_read = ReadValues(values_to_read, values);
+
+  return num_def_levels;
+}
+
+
+typedef TypedColumnReader<parquet::Type::BOOLEAN> BoolReader;
+typedef TypedColumnReader<parquet::Type::INT32> Int32Reader;
+typedef TypedColumnReader<parquet::Type::INT64> Int64Reader;
+typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
+typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
+typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
+typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
+typedef TypedColumnReader<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader;
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/column/scanner.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.cc b/src/parquet/column/scanner.cc
new file mode 100644
index 0000000..158430b
--- /dev/null
+++ b/src/parquet/column/scanner.cc
@@ -0,0 +1,57 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "parquet/column/scanner.h"
+
+#include <memory>
+
+#include "parquet/column/reader.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/thrift/util.h"
+
+using parquet::Type;
+
+namespace parquet_cpp {
+
+std::shared_ptr<Scanner> Scanner::Make(std::shared_ptr<ColumnReader> col_reader,
+    size_t batch_size) {
+  switch (col_reader->type()) {
+    case Type::BOOLEAN:
+      return std::make_shared<BoolScanner>(col_reader, batch_size);
+    case Type::INT32:
+      return std::make_shared<Int32Scanner>(col_reader, batch_size);
+    case Type::INT64:
+      return std::make_shared<Int64Scanner>(col_reader, batch_size);
+    case Type::INT96:
+      return std::make_shared<Int96Scanner>(col_reader, batch_size);
+    case Type::FLOAT:
+      return std::make_shared<FloatScanner>(col_reader, batch_size);
+    case Type::DOUBLE:
+      return std::make_shared<DoubleScanner>(col_reader, batch_size);
+    case Type::BYTE_ARRAY:
+      return std::make_shared<ByteArrayScanner>(col_reader, batch_size);
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return std::make_shared<FixedLenByteArrayScanner>(col_reader, batch_size);
+    default:
+      ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return std::shared_ptr<Scanner>(nullptr);
+
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/column/scanner.h
----------------------------------------------------------------------
diff --git a/src/parquet/column/scanner.h b/src/parquet/column/scanner.h
new file mode 100644
index 0000000..64de021
--- /dev/null
+++ b/src/parquet/column/scanner.h
@@ -0,0 +1,215 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_SCANNER_H
+#define PARQUET_COLUMN_SCANNER_H
+
+#include <memory>
+#include <ostream>
+#include <string>
+#include <vector>
+
+#include "parquet/column/reader.h"
+#include "parquet/thrift/parquet_types.h"
+
+namespace parquet_cpp {
+
+static constexpr size_t DEFAULT_SCANNER_BATCH_SIZE = 128;
+
+class Scanner {
+ public:
+  explicit Scanner(std::shared_ptr<ColumnReader> reader,
+      size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
+      batch_size_(batch_size),
+      level_offset_(0),
+      levels_buffered_(0),
+      value_offset_(0),
+      values_buffered_(0),
+      reader_(reader) {
+    // TODO: don't allocate for required fields
+    def_levels_.resize(batch_size_);
+    rep_levels_.resize(batch_size_);
+  }
+
+  virtual ~Scanner() {}
+
+  static std::shared_ptr<Scanner> Make(std::shared_ptr<ColumnReader> col_reader,
+      size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE);
+
+  virtual void PrintNext(std::ostream& out, int width) = 0;
+
+  bool HasNext() {
+    return value_offset_ < values_buffered_ || reader_->HasNext();
+  }
+
+  const parquet::SchemaElement* schema() const {
+    return reader_->schema();
+  }
+
+  size_t batch_size() const { return batch_size_;}
+
+  void SetBatchSize(size_t batch_size) {
+    batch_size_ = batch_size;
+  }
+
+ protected:
+  size_t batch_size_;
+
+  std::vector<int16_t> def_levels_;
+  std::vector<int16_t> rep_levels_;
+  size_t level_offset_;
+  size_t levels_buffered_;
+
+  std::vector<uint8_t> value_buffer_;
+  size_t value_offset_;
+  size_t values_buffered_;
+
+ private:
+  std::shared_ptr<ColumnReader> reader_;
+};
+
+
+template <int TYPE>
+class TypedScanner : public Scanner {
+ public:
+  typedef typename type_traits<TYPE>::value_type T;
+
+  explicit TypedScanner(std::shared_ptr<ColumnReader> reader,
+      size_t batch_size = DEFAULT_SCANNER_BATCH_SIZE) :
+      Scanner(reader, batch_size) {
+    typed_reader_ = static_cast<TypedColumnReader<TYPE>*>(reader.get());
+    size_t value_byte_size = type_traits<TYPE>::value_byte_size;
+    value_buffer_.resize(batch_size_ * value_byte_size);
+    values_ = reinterpret_cast<T*>(&value_buffer_[0]);
+  }
+
+  virtual ~TypedScanner() {}
+
+  bool NextLevels(int16_t* def_level, int16_t* rep_level) {
+    if (level_offset_ == levels_buffered_) {
+      levels_buffered_ = typed_reader_->ReadBatch(batch_size_, &def_levels_[0], &rep_levels_[0],
+          values_, &values_buffered_);
+
+      // TODO: repetition levels
+
+      level_offset_ = 0;
+      if (!levels_buffered_) {
+        return false;
+      }
+    }
+    *def_level = def_levels_[level_offset_++];
+    *rep_level = 1;
+    return true;
+  }
+
+  // Returns true if there is a next value
+  bool NextValue(T* val, bool* is_null) {
+    if (value_offset_ == values_buffered_) {
+      if (!HasNext()) {
+        // Out of data pages
+        return false;
+      }
+    }
+
+    // Out of values
+    int16_t def_level;
+    int16_t rep_level;
+    NextLevels(&def_level, &rep_level);
+    *is_null = def_level < rep_level;
+
+    if (*is_null) {
+      return true;
+    }
+
+    if (value_offset_ == values_buffered_) {
+      throw ParquetException("Value was non-null, but has not been buffered");
+    }
+    *val = values_[value_offset_++];
+    return true;
+  }
+
+  virtual void PrintNext(std::ostream& out, int width) {
+    T val;
+    bool is_null;
+
+    char buffer[25];
+    NextValue(&val, &is_null);
+
+    if (is_null) {
+      std::string null_fmt = format_fwf<parquet::Type::BYTE_ARRAY>(width);
+      snprintf(buffer, 25, null_fmt.c_str(), "NULL");
+    } else {
+      FormatValue(&val, buffer, 25, width);
+    }
+    out << buffer;
+  }
+
+ private:
+  // The ownership of this object is expressed through the reader_ variable in the base
+  TypedColumnReader<TYPE>* typed_reader_;
+
+  inline void FormatValue(void* val, char* buffer, size_t bufsize, size_t width);
+
+  T* values_;
+};
+
+
+template <int TYPE>
+inline void TypedScanner<TYPE>::FormatValue(void* val, char* buffer,
+    size_t bufsize, size_t width) {
+  std::string fmt = format_fwf<TYPE>(width);
+  snprintf(buffer, bufsize, fmt.c_str(), *reinterpret_cast<T*>(val));
+}
+
+template <>
+inline void TypedScanner<parquet::Type::INT96>::FormatValue(
+    void* val, char* buffer, size_t bufsize, size_t width) {
+  std::string fmt = format_fwf<parquet::Type::INT96>(width);
+  std::string result = Int96ToString(*reinterpret_cast<Int96*>(val));
+  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+template <>
+inline void TypedScanner<parquet::Type::BYTE_ARRAY>::FormatValue(
+    void* val, char* buffer, size_t bufsize, size_t width) {
+  std::string fmt = format_fwf<parquet::Type::BYTE_ARRAY>(width);
+  std::string result = ByteArrayToString(*reinterpret_cast<ByteArray*>(val));
+  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+template <>
+inline void TypedScanner<parquet::Type::FIXED_LEN_BYTE_ARRAY>::FormatValue(
+    void* val, char* buffer, size_t bufsize, size_t width) {
+  std::string fmt = format_fwf<parquet::Type::FIXED_LEN_BYTE_ARRAY>(width);
+  std::string result = FixedLenByteArrayToString(
+      *reinterpret_cast<FixedLenByteArray*>(val),
+      schema()->type_length);
+  snprintf(buffer, bufsize, fmt.c_str(), result.c_str());
+}
+
+typedef TypedScanner<parquet::Type::BOOLEAN> BoolScanner;
+typedef TypedScanner<parquet::Type::INT32> Int32Scanner;
+typedef TypedScanner<parquet::Type::INT64> Int64Scanner;
+typedef TypedScanner<parquet::Type::INT96> Int96Scanner;
+typedef TypedScanner<parquet::Type::FLOAT> FloatScanner;
+typedef TypedScanner<parquet::Type::DOUBLE> DoubleScanner;
+typedef TypedScanner<parquet::Type::BYTE_ARRAY> ByteArrayScanner;
+typedef TypedScanner<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayScanner;
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_SCANNER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
deleted file mode 100644
index 7c88a32..0000000
--- a/src/parquet/column_reader.cc
+++ /dev/null
@@ -1,194 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#include "parquet/column_reader.h"
-
-#include <algorithm>
-#include <string>
-#include <string.h>
-
-#include "parquet/encodings/encodings.h"
-#include "parquet/compression/codec.h"
-#include "parquet/thrift/util.h"
-#include "parquet/util/input_stream.h"
-
-const int DATA_PAGE_SIZE = 64 * 1024;
-
-namespace parquet_cpp {
-
-using parquet::CompressionCodec;
-using parquet::Encoding;
-using parquet::FieldRepetitionType;
-using parquet::PageType;
-using parquet::Type;
-
-ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
-    const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream)
-  : metadata_(metadata),
-    schema_(schema),
-    stream_(std::move(stream)),
-    num_buffered_values_(0),
-    num_decoded_values_(0),
-    buffered_values_offset_(0) {
-
-  switch (metadata->codec) {
-    case CompressionCodec::UNCOMPRESSED:
-      break;
-    case CompressionCodec::SNAPPY:
-      decompressor_.reset(new SnappyCodec());
-      break;
-    default:
-      ParquetException::NYI("Reading compressed data");
-  }
-
-  config_ = Config::DefaultConfig();
-}
-
-
-// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
-// encoding.
-static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
-  return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
-}
-
-template <int TYPE>
-bool TypedColumnReader<TYPE>::ReadNewPage() {
-  // Loop until we find the next data page.
-
-
-  while (true) {
-    int bytes_read = 0;
-    const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
-    if (bytes_read == 0) return false;
-    uint32_t header_size = bytes_read;
-    DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
-    stream_->Read(header_size, &bytes_read);
-
-    int compressed_len = current_page_header_.compressed_page_size;
-    int uncompressed_len = current_page_header_.uncompressed_page_size;
-
-    // Read the compressed data page.
-    buffer = stream_->Read(compressed_len, &bytes_read);
-    if (bytes_read != compressed_len) ParquetException::EofException();
-
-    // Uncompress it if we need to
-    if (decompressor_ != NULL) {
-      // Grow the uncompressed buffer if we need to.
-      if (uncompressed_len > decompression_buffer_.size()) {
-        decompression_buffer_.resize(uncompressed_len);
-      }
-      decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
-          &decompression_buffer_[0]);
-      buffer = &decompression_buffer_[0];
-    }
-
-    if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
-      auto it = decoders_.find(Encoding::RLE_DICTIONARY);
-      if (it != decoders_.end()) {
-        throw ParquetException("Column cannot have more than one dictionary.");
-      }
-
-      PlainDecoder<TYPE> dictionary(schema_);
-      dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
-          buffer, uncompressed_len);
-      std::shared_ptr<DecoderType> decoder(new DictionaryDecoder<TYPE>(schema_, &dictionary));
-
-      decoders_[Encoding::RLE_DICTIONARY] = decoder;
-      current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
-      continue;
-    } else if (current_page_header_.type == PageType::DATA_PAGE) {
-      // Read a data page.
-      num_buffered_values_ = current_page_header_.data_page_header.num_values;
-
-      // Read definition levels.
-      if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
-        int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
-        buffer += sizeof(uint32_t);
-        definition_level_decoder_.reset(
-            new RleDecoder(buffer, num_definition_bytes, 1));
-        buffer += num_definition_bytes;
-        uncompressed_len -= sizeof(uint32_t);
-        uncompressed_len -= num_definition_bytes;
-      }
-
-      // TODO: repetition levels
-
-      // Get a decoder object for this page or create a new decoder if this is the
-      // first page with this encoding.
-      Encoding::type encoding = current_page_header_.data_page_header.encoding;
-      if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
-
-      auto it = decoders_.find(encoding);
-      if (it != decoders_.end()) {
-        current_decoder_ = it->second.get();
-      } else {
-        switch (encoding) {
-          case Encoding::PLAIN: {
-            std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(schema_));
-            decoders_[encoding] = decoder;
-            current_decoder_ = decoder.get();
-            break;
-          }
-          case Encoding::RLE_DICTIONARY:
-            throw ParquetException("Dictionary page must be before data page.");
-
-          case Encoding::DELTA_BINARY_PACKED:
-          case Encoding::DELTA_LENGTH_BYTE_ARRAY:
-          case Encoding::DELTA_BYTE_ARRAY:
-            ParquetException::NYI("Unsupported encoding");
-
-          default:
-            throw ParquetException("Unknown encoding type.");
-        }
-      }
-      current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
-      return true;
-    } else {
-      // We don't know what this page type is. We're allowed to skip non-data pages.
-      continue;
-    }
-  }
-  return true;
-}
-
-std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
-    const parquet::SchemaElement* element, std::unique_ptr<InputStream> stream) {
-  switch (metadata->type) {
-    case Type::BOOLEAN:
-      return std::make_shared<BoolReader>(metadata, element, std::move(stream));
-    case Type::INT32:
-      return std::make_shared<Int32Reader>(metadata, element, std::move(stream));
-    case Type::INT64:
-      return std::make_shared<Int64Reader>(metadata, element, std::move(stream));
-    case Type::INT96:
-      return std::make_shared<Int96Reader>(metadata, element, std::move(stream));
-    case Type::FLOAT:
-      return std::make_shared<FloatReader>(metadata, element, std::move(stream));
-    case Type::DOUBLE:
-      return std::make_shared<DoubleReader>(metadata, element, std::move(stream));
-    case Type::BYTE_ARRAY:
-      return std::make_shared<ByteArrayReader>(metadata, element, std::move(stream));
-    case Type::FIXED_LEN_BYTE_ARRAY:
-      return std::make_shared<FixedLenByteArrayReader>(metadata, element, std::move(stream));
-    default:
-      ParquetException::NYI("type reader not implemented");
-  }
-  // Unreachable code, but supress compiler warning
-  return std::shared_ptr<ColumnReader>(nullptr);
-}
-
-} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
deleted file mode 100644
index 3d27549..0000000
--- a/src/parquet/column_reader.h
+++ /dev/null
@@ -1,182 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one
-// or more contributor license agreements.  See the NOTICE file
-// distributed with this work for additional information
-// regarding copyright ownership.  The ASF licenses this file
-// to you under the Apache License, Version 2.0 (the
-// "License"); you may not use this file except in compliance
-// with the License.  You may obtain a copy of the License at
-//
-//   http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing,
-// software distributed under the License is distributed on an
-// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-// KIND, either express or implied.  See the License for the
-// specific language governing permissions and limitations
-// under the License.
-
-#ifndef PARQUET_COLUMN_READER_H
-#define PARQUET_COLUMN_READER_H
-
-#include <exception>
-#include <cstdint>
-#include <cstring>
-#include <memory>
-#include <string>
-#include <unordered_map>
-#include <vector>
-
-#include "parquet/exception.h"
-#include "parquet/types.h"
-#include "parquet/thrift/parquet_constants.h"
-#include "parquet/thrift/parquet_types.h"
-#include "parquet/util/input_stream.h"
-#include "parquet/encodings/encodings.h"
-#include "parquet/util/rle-encoding.h"
-
-namespace std {
-
-template <>
-struct hash<parquet::Encoding::type> {
-  std::size_t operator()(const parquet::Encoding::type& k) const {
-    return hash<int>()(static_cast<int>(k));
-  }
-};
-
-} // namespace std
-
-namespace parquet_cpp {
-
-class Codec;
-
-class ColumnReader {
- public:
-
-  struct Config {
-    int batch_size;
-
-    static Config DefaultConfig() {
-      Config config;
-      config.batch_size = 128;
-      return config;
-    }
-  };
-
-  ColumnReader(const parquet::ColumnMetaData*, const parquet::SchemaElement*,
-      std::unique_ptr<InputStream> stream);
-
-  static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
-      const parquet::SchemaElement*, std::unique_ptr<InputStream> stream);
-
-  virtual bool ReadNewPage() = 0;
-
-  // Returns true if there are still values in this column.
-  bool HasNext() {
-    if (num_buffered_values_ == 0) {
-      ReadNewPage();
-      if (num_buffered_values_ == 0) return false;
-    }
-    return true;
-  }
-
-  parquet::Type::type type() const {
-    return metadata_->type;
-  }
-
-  const parquet::ColumnMetaData* metadata() const {
-    return metadata_;
-  }
-
- protected:
-  // Reads the next definition and repetition level. Returns true if the value is NULL.
-  bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level);
-
-  Config config_;
-
-  const parquet::ColumnMetaData* metadata_;
-  const parquet::SchemaElement* schema_;
-  std::unique_ptr<InputStream> stream_;
-
-  // Compression codec to use.
-  std::unique_ptr<Codec> decompressor_;
-  std::vector<uint8_t> decompression_buffer_;
-
-  parquet::PageHeader current_page_header_;
-
-  // Not set if field is required.
-  std::unique_ptr<RleDecoder> definition_level_decoder_;
-  // Not set for flat schemas.
-  std::unique_ptr<RleDecoder> repetition_level_decoder_;
-  int num_buffered_values_;
-
-  int num_decoded_values_;
-  int buffered_values_offset_;
-};
-
-
-// API to read values from a single column. This is the main client facing API.
-template <int TYPE>
-class TypedColumnReader : public ColumnReader {
- public:
-  typedef typename type_traits<TYPE>::value_type T;
-
-  TypedColumnReader(const parquet::ColumnMetaData* metadata,
-      const parquet::SchemaElement* schema, std::unique_ptr<InputStream> stream) :
-      ColumnReader(metadata, schema, std::move(stream)),
-      current_decoder_(NULL) {
-    size_t value_byte_size = type_traits<TYPE>::value_byte_size;
-    values_buffer_.resize(config_.batch_size * value_byte_size);
-  }
-
-  // Returns the next value of this type.
-  // TODO: batchify this interface.
-  T NextValue(int* def_level, int* rep_level) {
-    if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return T();
-    if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
-    return reinterpret_cast<T*>(&values_buffer_[0])[buffered_values_offset_++];
-  }
-
- private:
-  void BatchDecode();
-
-  virtual bool ReadNewPage();
-
-  typedef Decoder<TYPE> DecoderType;
-
-  // Map of compression type to decompressor object.
-  std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;
-
-  DecoderType* current_decoder_;
-  std::vector<uint8_t> values_buffer_;
-};
-
-typedef TypedColumnReader<parquet::Type::BOOLEAN> BoolReader;
-typedef TypedColumnReader<parquet::Type::INT32> Int32Reader;
-typedef TypedColumnReader<parquet::Type::INT64> Int64Reader;
-typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
-typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
-typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
-typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
-typedef TypedColumnReader<parquet::Type::FIXED_LEN_BYTE_ARRAY> FixedLenByteArrayReader;
-
-
-template <int TYPE>
-void TypedColumnReader<TYPE>::BatchDecode() {
-  buffered_values_offset_ = 0;
-  T* buf = reinterpret_cast<T*>(&values_buffer_[0]);
-  int batch_size = config_.batch_size;
-  num_decoded_values_ = current_decoder_->Decode(buf, batch_size);
-}
-
-inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) {
-  *rep_level = 1;
-  if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) {
-    ParquetException::EofException();
-  }
-  --num_buffered_values_;
-  return *def_level == 0;
-}
-
-} // namespace parquet_cpp
-
-#endif // PARQUET_COLUMN_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index 0fd3e97..84a32f3 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -28,7 +28,7 @@
 
 #include "parquet/exception.h"
 #include "parquet/reader.h"
-#include "parquet/column_reader.h"
+#include "parquet/column/reader.h"
 #include "parquet/util/input_stream.h"
 
 #endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 1459afc..9952c62 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -23,6 +23,8 @@
 #include <gtest/gtest.h>
 
 #include "parquet/reader.h"
+#include "parquet/column/reader.h"
+#include "parquet/column/scanner.h"
 
 using std::string;
 
@@ -54,6 +56,63 @@ TEST_F(TestAllTypesPlain, ParseMetaData) {
   reader_.ParseMetaData();
 }
 
+TEST_F(TestAllTypesPlain, TestBatchRead) {
+  RowGroupReader* group = reader_.RowGroup(0);
+
+  // column 0, id
+  std::shared_ptr<Int32Reader> col = std::dynamic_pointer_cast<Int32Reader>(group->Column(0));
+
+  int16_t def_levels[4];
+  int16_t rep_levels[4];
+  int32_t values[4];
+
+  // This file only has 8 rows
+
+  ASSERT_TRUE(col->HasNext());
+  size_t values_read;
+  size_t levels_read = col->ReadBatch(4, def_levels, rep_levels, values, &values_read);
+  ASSERT_EQ(4, levels_read);
+  ASSERT_EQ(4, values_read);
+
+  // Now read past the end of the file
+  ASSERT_TRUE(col->HasNext());
+  levels_read = col->ReadBatch(5, def_levels, rep_levels, values, &values_read);
+  ASSERT_EQ(4, levels_read);
+  ASSERT_EQ(4, values_read);
+
+  ASSERT_FALSE(col->HasNext());
+}
+
+TEST_F(TestAllTypesPlain, TestFlatScannerInt32) {
+  RowGroupReader* group = reader_.RowGroup(0);
+
+  // column 0, id
+  std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
+
+  int32_t val;
+  bool is_null;
+  for (size_t i = 0; i < 8; ++i) {
+    ASSERT_TRUE(scanner->HasNext());
+    ASSERT_TRUE(scanner->NextValue(&val, &is_null));
+    ASSERT_FALSE(is_null);
+  }
+  ASSERT_FALSE(scanner->HasNext());
+  ASSERT_FALSE(scanner->NextValue(&val, &is_null));
+}
+
+
+TEST_F(TestAllTypesPlain, TestSetScannerBatchSize) {
+  RowGroupReader* group = reader_.RowGroup(0);
+
+  // column 0, id
+  std::shared_ptr<Int32Scanner> scanner(new Int32Scanner(group->Column(0)));
+
+  ASSERT_EQ(128, scanner->batch_size());
+  scanner->SetBatchSize(1024);
+  ASSERT_EQ(1024, scanner->batch_size());
+}
+
+
 TEST_F(TestAllTypesPlain, DebugPrintWorks) {
   std::stringstream ss;
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index d467b75..823549d 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -24,11 +24,10 @@
 #include <string>
 #include <vector>
 
-#include "parquet/column_reader.h"
+#include "parquet/column/reader.h"
+#include "parquet/column/scanner.h"
 #include "parquet/exception.h"
-
 #include "parquet/thrift/util.h"
-
 #include "parquet/util/input_stream.h"
 
 using std::string;
@@ -82,12 +81,12 @@ size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) {
 // ----------------------------------------------------------------------
 // RowGroupReader
 
-ColumnReader* RowGroupReader::Column(size_t i) {
+std::shared_ptr<ColumnReader> RowGroupReader::Column(size_t i) {
   // TODO: boundschecking
   auto it = column_readers_.find(i);
   if (it !=  column_readers_.end()) {
     // Already have constructed the ColumnReader
-    return it->second.get();
+    return it->second;
   }
 
   const parquet::ColumnChunk& col = row_group_->columns[i];
@@ -119,7 +118,7 @@ ColumnReader* RowGroupReader::Column(size_t i) {
       &this->parent_->metadata_.schema[i + 1], std::move(input));
   column_readers_[i] = reader;
 
-  return reader.get();
+  return reader;
 }
 
 // ----------------------------------------------------------------------
@@ -144,6 +143,10 @@ void ParquetFileReader::Close() {
 }
 
 RowGroupReader* ParquetFileReader::RowGroup(size_t i) {
+  if (!parsed_metadata_) {
+    ParseMetaData();
+  }
+
   if (i >= num_row_groups()) {
     std::stringstream ss;
     ss << "The file only has " << num_row_groups()
@@ -241,6 +244,7 @@ static string parquet_type_to_string(Type::type t) {
 // the fixed initial size is just for an example
 #define COL_WIDTH "20"
 
+
 void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
   if (!parsed_metadata_) {
     ParseMetaData();
@@ -278,116 +282,40 @@ void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
       continue;
     }
 
+    static constexpr size_t bufsize = 25;
+    char buffer[bufsize];
+
     // Create readers for all columns and print contents
-    vector<ColumnReader*> readers(nColumns, NULL);
+    vector<std::shared_ptr<Scanner> > scanners(nColumns, NULL);
     for (int c = 0; c < nColumns; ++c) {
-      ColumnReader* col_reader = group_reader->Column(c);
-
+      std::shared_ptr<ColumnReader> col_reader = group_reader->Column(c);
       Type::type col_type = col_reader->type();
 
-      printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str());
+      std::stringstream ss;
+      ss << "%-" << COL_WIDTH << "s";
+      std::string fmt = ss.str();
+
+      snprintf(buffer, bufsize, fmt.c_str(), metadata_.schema[c+1].name.c_str());
+      stream << buffer;
 
-      // This is OK in this method as long as the RowGroupReader does not get deleted
-      readers[c] = col_reader;
+      // This is OK in this method as long as the RowGroupReader does not get
+      // deleted
+      scanners[c] = Scanner::Make(col_reader);
     }
     stream << "\n";
 
-    vector<int> def_level(nColumns, 0);
-    vector<int> rep_level(nColumns, 0);
-
-    static constexpr size_t bufsize = 25;
-    char buffer[bufsize];
-
     bool hasRow;
     do {
       hasRow = false;
       for (int c = 0; c < nColumns; ++c) {
-        if (readers[c] == NULL) {
+        if (scanners[c] == NULL) {
           snprintf(buffer, bufsize, "%-" COL_WIDTH"s", " ");
           stream << buffer;
           continue;
         }
-        if (readers[c]->HasNext()) {
+        if (scanners[c]->HasNext()) {
           hasRow = true;
-          switch (readers[c]->type()) {
-            case Type::BOOLEAN: {
-              bool val = reinterpret_cast<BoolReader*>(readers[c])->NextValue(
-                  &def_level[c], &rep_level[c]);
-              if (def_level[c] >= rep_level[c]) {
-                snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val);
-                stream << buffer;
-              }
-              break;
-            }
-            case Type::INT32: {
-              int32_t val = reinterpret_cast<Int32Reader*>(readers[c])->NextValue(
-                  &def_level[c], &rep_level[c]);
-              if (def_level[c] >= rep_level[c]) {
-                snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val);
-                stream << buffer;
-              }
-              break;
-            }
-            case Type::INT64: {
-              int64_t val = reinterpret_cast<Int64Reader*>(readers[c])->NextValue(
-                  &def_level[c], &rep_level[c]);
-              if (def_level[c] >= rep_level[c]) {
-                snprintf(buffer, bufsize, "%-" COL_WIDTH"ld",val);
-                stream << buffer;
-              }
-              break;
-            }
-            case Type::INT96: {
-              Int96 val = reinterpret_cast<Int96Reader*>(readers[c])->NextValue(
-                  &def_level[c], &rep_level[c]);
-              if (def_level[c] >= rep_level[c]) {
-                string result = Int96ToString(val);
-                snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
-                stream << buffer;
-              }
-              break;
-            }
-            case Type::FLOAT: {
-              float val = reinterpret_cast<FloatReader*>(readers[c])->NextValue(
-                  &def_level[c], &rep_level[c]);
-              if (def_level[c] >= rep_level[c]) {
-                snprintf(buffer, bufsize, "%-" COL_WIDTH"f",val);
-                stream << buffer;
-              }
-              break;
-            }
-            case Type::DOUBLE: {
-              double val = reinterpret_cast<DoubleReader*>(readers[c])->NextValue(
-                  &def_level[c], &rep_level[c]);
-              if (def_level[c] >= rep_level[c]) {
-                snprintf(buffer, bufsize, "%-" COL_WIDTH"lf",val);
-                stream << buffer;
-              }
-              break;
-            }
-            case Type::BYTE_ARRAY: {
-              ByteArray val = reinterpret_cast<ByteArrayReader*>(readers[c])->NextValue(
-                  &def_level[c], &rep_level[c]);
-              if (def_level[c] >= rep_level[c]) {
-                string result = ByteArrayToString(val);
-                snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
-                stream << buffer;
-              }
-              break;
-            }
-             case Type::FIXED_LEN_BYTE_ARRAY: {
-              FixedLenByteArray val = reinterpret_cast<FixedLenByteArrayReader*>(
-                  readers[c])->NextValue(&def_level[c], &rep_level[c]);
-              if (def_level[c] >= rep_level[c]) {
-                string result = FixedLenByteArrayToString(val, metadata_.schema[c+1].type_length);
-                snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
-                stream << buffer;
-              }
-              break;
-            }
-           default:
-              continue;
-          }
+          scanners[c]->PrintNext(stream, 17);
         }
       }
       stream << "\n";

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/reader.h b/src/parquet/reader.h
index e8a6806..4c92119 100644
--- a/src/parquet/reader.h
+++ b/src/parquet/reader.h
@@ -79,9 +79,9 @@ class RowGroupReader {
       parent_(parent),
       row_group_(group) {}
 
-  // Construct a ColumnReader for the indicated row group-relative column. The
-  // returned object is owned by the RowGroupReader
-  ColumnReader* Column(size_t i);
+  // Construct a ColumnReader for the indicated row group-relative
+  // column. Ownership is shared with the RowGroupReader.
+  std::shared_ptr<ColumnReader> Column(size_t i);
 
   size_t num_columns() const {
     return row_group_->columns.size();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/dbf1cf1c/src/parquet/types.h
----------------------------------------------------------------------
diff --git a/src/parquet/types.h b/src/parquet/types.h
index 409e335..f39e3a2 100644
--- a/src/parquet/types.h
+++ b/src/parquet/types.h
@@ -21,8 +21,8 @@
 #include <algorithm>
 #include <cstdint>
 #include <cstring>
-#include <string>
 #include <sstream>
+#include <string>
 
 #include "parquet/thrift/parquet_types.h"
 #include "parquet/util/compiler-util.h"
@@ -38,6 +38,8 @@ struct FixedLenByteArray {
   const uint8_t* ptr;
 };
 
+typedef FixedLenByteArray FLBA;
+
 MANUALLY_ALIGNED_STRUCT(1) Int96 {
   uint32_t value[3];
 };
@@ -81,8 +83,9 @@ template <>
 struct type_traits<parquet::Type::BOOLEAN> {
   typedef bool value_type;
   static constexpr parquet::Type::type parquet_type = parquet::Type::BOOLEAN;
-
   static constexpr size_t value_byte_size = 1;
+
+  static constexpr const char* printf_code = "d";
 };
 
 template <>
@@ -91,6 +94,7 @@ struct type_traits<parquet::Type::INT32> {
   static constexpr parquet::Type::type parquet_type = parquet::Type::INT32;
 
   static constexpr size_t value_byte_size = 4;
+  static constexpr const char* printf_code = "d";
 };
 
 template <>
@@ -99,6 +103,7 @@ struct type_traits<parquet::Type::INT64> {
   static constexpr parquet::Type::type parquet_type = parquet::Type::INT64;
 
   static constexpr size_t value_byte_size = 8;
+  static constexpr const char* printf_code = "ld";
 };
 
 template <>
@@ -107,6 +112,7 @@ struct type_traits<parquet::Type::INT96> {
   static constexpr parquet::Type::type parquet_type = parquet::Type::INT96;
 
   static constexpr size_t value_byte_size = 12;
+  static constexpr const char* printf_code = "s";
 };
 
 template <>
@@ -115,6 +121,7 @@ struct type_traits<parquet::Type::FLOAT> {
   static constexpr parquet::Type::type parquet_type = parquet::Type::FLOAT;
 
   static constexpr size_t value_byte_size = 4;
+  static constexpr const char* printf_code = "f";
 };
 
 template <>
@@ -123,6 +130,7 @@ struct type_traits<parquet::Type::DOUBLE> {
   static constexpr parquet::Type::type parquet_type = parquet::Type::DOUBLE;
 
   static constexpr size_t value_byte_size = 8;
+  static constexpr const char* printf_code = "lf";
 };
 
 template <>
@@ -131,6 +139,7 @@ struct type_traits<parquet::Type::BYTE_ARRAY> {
   static constexpr parquet::Type::type parquet_type = parquet::Type::BYTE_ARRAY;
 
   static constexpr size_t value_byte_size = sizeof(ByteArray);
+  static constexpr const char* printf_code = "s";
 };
 
 template <>
@@ -139,8 +148,16 @@ struct type_traits<parquet::Type::FIXED_LEN_BYTE_ARRAY> {
   static constexpr parquet::Type::type parquet_type = parquet::Type::FIXED_LEN_BYTE_ARRAY;
 
   static constexpr size_t value_byte_size = sizeof(FixedLenByteArray);
+  static constexpr const char* printf_code = "s";
 };
 
+template <int TYPE>
+inline std::string format_fwf(int width) {
+  std::stringstream ss;
+  ss << "%-" << width << type_traits<TYPE>::printf_code;
+  return ss.str();
+}
+
 } // namespace parquet_cpp
 
 #endif // PARQUET_TYPES_H