You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by no...@apache.org on 2016/01/28 06:13:21 UTC

[2/2] parquet-cpp git commit: PARQUET-451: Add RowGroupReader helper class and refactor parquet_reader.cc into DebugPrint

PARQUET-451: Add RowGroupReader helper class and refactor parquet_reader.cc into DebugPrint

This also addresses PARQUET-433 and PARQUET-453.

Author: Wes McKinney <we...@cloudera.com>

Closes #23 from wesm/PARQUET-451 and squashes the following commits:

748ee0c [Wes McKinney] Turn MakeColumnReader into static ColumnReader::Make
6528497 [Wes McKinney] Incorporate code review comments
4b5575d [Wes McKinney] [PARQUET-451/453]: Implement RowGroupReader class and refactor parquet_reader.cc into ParquetFileReader::DebugPrint
2985e2e [Wes McKinney] [PARQUET-433]: Templatize decoders and column readers and remove most switch-on-type statements. Add parquet::SchemaElement* member to Decoder<T>, for FLBA metadata.


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/1f24e765
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/1f24e765
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/1f24e765

Branch: refs/heads/master
Commit: 1f24e7658b9e9d41f95e6ce3a0d7a2fe3ace1abf
Parents: 8fc24f8
Author: Wes McKinney <we...@cloudera.com>
Authored: Wed Jan 27 21:13:15 2016 -0800
Committer: Nong Li <no...@gmail.com>
Committed: Wed Jan 27 21:13:15 2016 -0800

----------------------------------------------------------------------
 CMakeLists.txt                                  |   3 +-
 example/CMakeLists.txt                          |   8 +-
 example/decode_benchmark.cc                     |  19 +-
 example/example_util.cc                         |  84 ------
 example/example_util.h                          |  44 ---
 example/parquet_reader.cc                       | 256 ++---------------
 setup_build_env.sh                              |   3 +-
 src/parquet.cc                                  | 272 ------------------
 src/parquet/CMakeLists.txt                      |   2 +
 src/parquet/column_reader.cc                    | 194 +++++++++++++
 src/parquet/column_reader.h                     | 183 ++++++++++++
 src/parquet/compression/codec.h                 |   6 +-
 src/parquet/encodings/CMakeLists.txt            |   1 -
 src/parquet/encodings/bool-encoding.h           |  48 ----
 src/parquet/encodings/delta-bit-pack-encoding.h |  20 +-
 .../encodings/delta-byte-array-encoding.h       |  20 +-
 .../delta-length-byte-array-encoding.h          |  16 +-
 src/parquet/encodings/dictionary-encoding.h     | 131 ++++-----
 src/parquet/encodings/encodings.h               |  34 +--
 src/parquet/encodings/plain-encoding.h          |  82 +++---
 src/parquet/parquet.h                           | 197 +------------
 src/parquet/reader-test.cc                      |  14 +-
 src/parquet/reader.cc                           | 283 ++++++++++++++++++-
 src/parquet/reader.h                            |  65 ++++-
 src/parquet/types.h                             | 112 ++++++++
 src/parquet/util/CMakeLists.txt                 |   5 +
 src/parquet/util/input_stream.cc                |  63 +++++
 src/parquet/util/input_stream.h                 |  80 ++++++
 28 files changed, 1171 insertions(+), 1074 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/CMakeLists.txt b/CMakeLists.txt
index a2f7e6a..b66e296 100644
--- a/CMakeLists.txt
+++ b/CMakeLists.txt
@@ -213,13 +213,14 @@ set(PARQUET_TEST_LINK_LIBS ${PARQUET_MIN_TEST_LIBS})
 # Library config
 
 set(LIBPARQUET_SRCS
-  src/parquet.cc
+  src/parquet/column_reader.cc
   src/parquet/reader.cc
 )
 
 set(LIBPARQUET_LINK_LIBS
   parquet_compression
   parquet_thrift
+  parquet_util
   thriftstatic
 )
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/example/CMakeLists.txt b/example/CMakeLists.txt
index a020184..05c541a 100644
--- a/example/CMakeLists.txt
+++ b/example/CMakeLists.txt
@@ -12,16 +12,10 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-
-add_library(Example STATIC
-  example_util.cc
-)
-
 SET(LINK_LIBS
   parquet
   snappystatic
-  thriftstatic
-  Example)
+  thriftstatic)
 
 add_executable(decode_benchmark decode_benchmark.cc)
 target_link_libraries(decode_benchmark ${LINK_LIBS})

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/decode_benchmark.cc
----------------------------------------------------------------------
diff --git a/example/decode_benchmark.cc b/example/decode_benchmark.cc
index ed4077a..f33232d 100644
--- a/example/decode_benchmark.cc
+++ b/example/decode_benchmark.cc
@@ -16,7 +16,6 @@
 #include <iostream>
 #include <stdio.h>
 
-#include "example_util.h"
 #include "parquet/compression/codec.h"
 #include "parquet/encodings/encodings.h"
 #include "parquet/util/stopwatch.h"
@@ -198,11 +197,11 @@ class DeltaByteArrayEncoder {
 
 uint64_t TestPlainIntEncoding(const uint8_t* data, int num_values, int batch_size) {
   uint64_t result = 0;
-  PlainDecoder decoder(Type::INT64);
+  PlainDecoder<Type::INT64> decoder(nullptr);
   decoder.SetData(num_values, data, num_values * sizeof(int64_t));
   int64_t values[batch_size];
   for (int i = 0; i < num_values;) {
-    int n = decoder.GetInt64(values, batch_size);
+    int n = decoder.Decode(values, batch_size);
     for (int j = 0; j < n; ++j) {
       result += values[j];
     }
@@ -221,7 +220,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value
   } else {
     mini_block_size = 32;
   }
-  DeltaBitPackDecoder decoder(Type::INT64);
+  DeltaBitPackDecoder<Type::INT64> decoder(nullptr);
   DeltaBitPackEncoder encoder(mini_block_size);
   for (int i = 0; i < values.size(); ++i) {
     encoder.Add(values[i]);
@@ -238,7 +237,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value
     decoder.SetData(encoder.num_values(), buffer, len);
     for (int i = 0; i < encoder.num_values(); ++i) {
       int64_t x = 0;
-      decoder.GetInt64(&x, 1);
+      decoder.Decode(&x, 1);
       if (values[i] != x) {
         cerr << "Bad: " << i << endl;
         cerr << "  " << x << " != " << values[i] << endl;
@@ -258,7 +257,7 @@ uint64_t TestBinaryPackedEncoding(const char* name, const vector<int64_t>& value
     for (int k = 0; k < benchmark_iters; ++k) {
       decoder.SetData(encoder.num_values(), buffer, len);
       for (int i = 0; i < values.size();) {
-        int n = decoder.GetInt64(buf, benchmark_batch_size);
+        int n = decoder.Decode(buf, benchmark_batch_size);
         for (int j = 0; j < n; ++j) {
           result += buf[j];
         }
@@ -349,7 +348,7 @@ void TestBinaryPacking() {
 }
 
 void TestDeltaLengthByteArray() {
-  DeltaLengthByteArrayDecoder decoder;
+  DeltaLengthByteArrayDecoder decoder(nullptr);
   DeltaLengthByteArrayEncoder encoder;
 
   vector<string> values;
@@ -369,7 +368,7 @@ void TestDeltaLengthByteArray() {
   decoder.SetData(encoder.num_values(), buffer, len);
   for (int i = 0; i < encoder.num_values(); ++i) {
     ByteArray v;
-    decoder.GetByteArray(&v, 1);
+    decoder.Decode(&v, 1);
     string r = string((char*)v.ptr, v.len);
     if (r != values[i]) {
       cout << "Bad " << r << " != " << values[i] << endl;
@@ -378,7 +377,7 @@ void TestDeltaLengthByteArray() {
 }
 
 void TestDeltaByteArray() {
-  DeltaByteArrayDecoder decoder;
+  DeltaByteArrayDecoder decoder(nullptr);
   DeltaByteArrayEncoder encoder;
 
   vector<string> values;
@@ -407,7 +406,7 @@ void TestDeltaByteArray() {
   decoder.SetData(encoder.num_values(), buffer, len);
   for (int i = 0; i < encoder.num_values(); ++i) {
     ByteArray v;
-    decoder.GetByteArray(&v, 1);
+    decoder.Decode(&v, 1);
     string r = string((char*)v.ptr, v.len);
     if (r != values[i]) {
       cout << "Bad " << r << " != " << values[i] << endl;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/example_util.cc
----------------------------------------------------------------------
diff --git a/example/example_util.cc b/example/example_util.cc
deleted file mode 100644
index 07d8129..0000000
--- a/example/example_util.cc
+++ /dev/null
@@ -1,84 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "example_util.h"
-#include <iostream>
-#include <stdlib.h>
-#include <stdio.h>
-#include <string.h>
-
-#include "parquet/thrift/util.h"
-
-using namespace parquet;
-using namespace parquet_cpp;
-using namespace std;
-
-// 4 byte constant + 4 byte metadata len
-const uint32_t FOOTER_SIZE = 8;
-const uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
-
-struct ScopedFile {
- public:
-  ScopedFile(FILE* f) : file_(f) { }
-  ~ScopedFile() { fclose(file_); }
-
- private:
-  FILE* file_;
-};
-
-bool GetFileMetadata(const string& path, FileMetaData* metadata) {
-  FILE* file = fopen(path.c_str(), "r");
-  if (!file) {
-    cerr << "Could not open file: " << path << endl;
-    return false;
-  }
-  ScopedFile cleanup(file);
-  fseek(file, 0L, SEEK_END);
-  size_t file_len = ftell(file);
-  if (file_len < FOOTER_SIZE) {
-    cerr << "Invalid parquet file. Corrupt footer." << endl;
-    return false;
-  }
-
-  uint8_t footer_buffer[FOOTER_SIZE];
-  fseek(file, file_len - FOOTER_SIZE, SEEK_SET);
-  size_t bytes_read = fread(footer_buffer, 1, FOOTER_SIZE, file);
-  if (bytes_read != FOOTER_SIZE) {
-    cerr << "Invalid parquet file. Corrupt footer." << endl;
-    return false;
-  }
-  if (memcmp(footer_buffer + 4, PARQUET_MAGIC, 4) != 0) {
-    cerr << "Invalid parquet file. Corrupt footer." << endl;
-    return false;
-  }
-
-  uint32_t metadata_len = *reinterpret_cast<uint32_t*>(footer_buffer);
-  size_t metadata_start = file_len - FOOTER_SIZE - metadata_len;
-  if (metadata_start < 0) {
-    cerr << "Invalid parquet file. File is less than file metadata size." << endl;
-    return false;
-  }
-
-  fseek(file, metadata_start, SEEK_SET);
-  uint8_t metadata_buffer[metadata_len];
-  bytes_read = fread(metadata_buffer, 1, metadata_len, file);
-  if (bytes_read != metadata_len) {
-    cerr << "Invalid parquet file. Could not read metadata bytes." << endl;
-    return false;
-  }
-
-  DeserializeThriftMsg(metadata_buffer, &metadata_len, metadata);
-  return true;
-
-}

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/example_util.h
----------------------------------------------------------------------
diff --git a/example/example_util.h b/example/example_util.h
deleted file mode 100644
index a8b58fc..0000000
--- a/example/example_util.h
+++ /dev/null
@@ -1,44 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef PARQUET_EXAMPLE_UTIL_H
-#define PARQUET_EXAMPLE_UTIL_H
-
-#include <string>
-#include <parquet/parquet.h>
-#include <stdio.h>
-
-bool GetFileMetadata(const std::string& path, parquet::FileMetaData* metadata);
-
-class InputFile {
-private:
-  FILE* file;
-  std::string filename;
-
-public:
-  InputFile(const std::string& _filename): filename(_filename) {
-    file = fopen(_filename.c_str(), "r");
-  }
-  ~InputFile() {
-    if (file != NULL) {
-      fclose(file);
-    }
-  }
-
-  FILE* getFileHandle() { return file; }
-  bool isOpen() { return file != NULL; }
-  std::string getFilename()  { return filename; }
-};
-
-#endif  // PARQUET_EXAMPLE_UTIL_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/example/parquet_reader.cc
----------------------------------------------------------------------
diff --git a/example/parquet_reader.cc b/example/parquet_reader.cc
index 5379c5e..7b476b5 100644
--- a/example/parquet_reader.cc
+++ b/example/parquet_reader.cc
@@ -13,255 +13,49 @@
 // limitations under the License.
 
 #include <parquet/parquet.h>
-#include "example_util.h"
 
 #include <iostream>
 
-// the fixed initial size is just for an example
-#define  COL_WIDTH "17"
-
-using namespace parquet;
-using namespace parquet_cpp;
-using namespace std;
-
-struct AnyType {
-  union {
-    bool bool_val;
-    int32_t int32_val;
-    int64_t int64_val;
-    float float_val;
-    double double_val;
-    ByteArray byte_array_val;
-  };
-};
-
-static string ByteArrayToString(const ByteArray& a) {
-  return string(reinterpret_cast<const char*>(a.ptr), a.len);
-}
-
-int ByteCompare(const ByteArray& x1, const ByteArray& x2) {
-  int len = ::min(x1.len, x2.len);
-  int cmp = memcmp(x1.ptr, x2.ptr, len);
-  if (cmp != 0) return cmp;
-  if (len < x1.len) return 1;
-  if (len < x2.len) return -1;
-  return 0;
-}
-
-string type2String(Type::type t) {
-  switch(t) {
-    case Type::BOOLEAN:
-      return "BOOLEAN";
-      break;
-    case Type::INT32:
-      return "INT32";
-      break;
-    case Type::INT64:
-      return "INT64";
-      break;
-    case Type::FLOAT:
-      return "FLOAT";
-      break;
-    case Type::DOUBLE:
-      return "DOUBLE";
-      break;
-    case Type::BYTE_ARRAY:
-      return "BYTE_ARRAY";
-      break;
-    case Type::INT96:
-      return "INT96";
-      break;
-    case Type::FIXED_LEN_BYTE_ARRAY:
-      return "FIXED_LEN_BYTE_ARRAY";
-      break;
-    default:
-      return "UNKNOWN";
-      break;
-  }
-}
-
-void readParquet(const string& filename, const bool printValues) {
-  InputFile file(filename);
-  if (!file.isOpen()) {
-    cerr << "Could not open file " << file.getFilename() << endl;
-    return;
-  }
-
-  FileMetaData metadata;
-  if (!GetFileMetadata(file.getFilename().c_str(), &metadata)) {
-    cerr << "Could not read metadata from file " << file.getFilename() << endl;
-    return;
-  }
-
-  cout << "File statistics:\n" ;
-  cout << "Total rows: " << metadata.num_rows << "\n";
-  for (int c = 1; c < metadata.schema.size(); ++c) {
-    cout << "Column " << c-1 << ": " << metadata.schema[c].name << " ("
-         << type2String(metadata.schema[c].type);
-    if (metadata.schema[c].type == Type::INT96 ||
-        metadata.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) {
-      cout << " - not supported";
-    }
-    cout << ")\n";
-  }
-
-  for (int i = 0; i < metadata.row_groups.size(); ++i) {
-    cout << "--- Row Group " << i << " ---\n";
-
-    // Print column metadata
-    const RowGroup& row_group = metadata.row_groups[i];
-    size_t nColumns = row_group.columns.size();
-
-    for (int c = 0; c < nColumns; ++c) {
-      const ColumnMetaData& meta_data = row_group.columns[c].meta_data;
-      cout << "Column " << c
-           << ": " << meta_data.num_values << " rows, "
-           << meta_data.statistics.null_count << " null values, "
-           << meta_data.statistics.distinct_count << " distinct values, "
-           << "min value: " << (meta_data.statistics.min.length()>0 ?
-                                meta_data.statistics.min : "N/A")
-           << ", max value: " << (meta_data.statistics.max.length()>0 ?
-                                  meta_data.statistics.max : "N/A") << ".\n";
-    }
-
-    if (!printValues) {
-      continue;
-    }
-
-    // Create readers for all columns and print contents
-    vector<ColumnReader*> readers(nColumns, NULL);
-    try {
-      for (int c = 0; c < nColumns; ++c) {
-        const ColumnChunk& col = row_group.columns[c];
-        printf("%-" COL_WIDTH"s", metadata.schema[c+1].name.c_str());
-
-        if (col.meta_data.type == Type::INT96 ||
-            col.meta_data.type == Type::FIXED_LEN_BYTE_ARRAY) {
-          continue;
-        }
-
-        size_t col_start = col.meta_data.data_page_offset;
-        if (col.meta_data.__isset.dictionary_page_offset &&
-            col_start > col.meta_data.dictionary_page_offset) {
-          col_start = col.meta_data.dictionary_page_offset;
-        }
-
-        std::unique_ptr<ScopedInMemoryInputStream> input(
-             new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
-         fseek(file.getFileHandle(), col_start, SEEK_SET);
-         size_t num_read = fread(input->data(),
-                                 1,
-                                 input->size(),
-                                 file.getFileHandle());
-         if (num_read != input->size()) {
-           cerr << "Could not read column data." << endl;
-           continue;
-         }
-
-        readers[c] = new ColumnReader(&col.meta_data,
-                                      &metadata.schema[c+1],
-                                      input.release());
-      }
-      cout << "\n";
-
-      vector<int> def_level(nColumns, 0);
-      vector<int> rep_level(nColumns, 0);
-
-      bool hasRow;
-      do {
-        hasRow = false;
-        for (int c = 0; c < nColumns; ++c) {
-          if (readers[c] == NULL) {
-            printf("%-" COL_WIDTH"s", " ");
-            continue;
-          }
-          const ColumnChunk& col = row_group.columns[c];
-          if (readers[c]->HasNext()) {
-            hasRow = true;
-            switch (col.meta_data.type) {
-              case Type::BOOLEAN: {
-                bool val = readers[c]->GetBool(&def_level[c], &rep_level[c]);
-                if (def_level[c] >= rep_level[c]) {
-                  printf("%-" COL_WIDTH"d",val);
-                }
-                break;
-             }
-              case Type::INT32: {
-                int32_t val = readers[c]->GetInt32(&def_level[c], &rep_level[c]);
-                if (def_level[c] >= rep_level[c]) {
-                  printf("%-" COL_WIDTH"d",val);
-                }
-                break;
-              }
-              case Type::INT64: {
-                int64_t val = readers[c]->GetInt64(&def_level[c], &rep_level[c]);
-                if (def_level[c] >= rep_level[c]) {
-                  printf("%-" COL_WIDTH"ld",val);
-                }
-                break;
-              }
-              case Type::FLOAT: {
-                float val = readers[c]->GetFloat(&def_level[c], &rep_level[c]);
-                if (def_level[c] >= rep_level[c]) {
-                  printf("%-" COL_WIDTH"f",val);
-                }
-                break;
-              }
-              case Type::DOUBLE: {
-                double val = readers[c]->GetDouble(&def_level[c], &rep_level[c]);
-                if (def_level[c] >= rep_level[c]) {
-                  printf("%-" COL_WIDTH"lf",val);
-                }
-                break;
-              }
-              case Type::BYTE_ARRAY: {
-                ByteArray val = readers[c]->GetByteArray(&def_level[c], &rep_level[c]);
-                if (def_level[c] >= rep_level[c]) {
-                  string result = ByteArrayToString(val);
-                  printf("%-" COL_WIDTH"s", result.c_str());
-                }
-                break;
-              }
-              default:
-                continue;
-            }
-          }
-        }
-        cout << "\n";
-      } while (hasRow);
-    } catch (exception& e) {
-      cout << "Caught an exception: " << e.what() << "\n";
-    } catch (...) {
-      cout << "Caught an exception.\n";
-    }
-
-    for(vector<ColumnReader*>::iterator it = readers.begin(); it != readers.end(); it++) {
-      delete *it;
-    }
-  }
-}
-
 int main(int argc, char** argv) {
   if (argc > 3) {
-    cerr << "Usage: parquet_reader [--only-stats] <file>" << endl;
+    std::cerr << "Usage: parquet_reader [--only-stats] <file>"
+              << std::endl;
     return -1;
   }
 
-  string filename;
-  bool printContents = true;
+  std::string filename;
+  bool print_values = true;
 
   // Read command-line options
   char *param, *value;
   for (int i = 1; i < argc; i++) {
     if ( (param = std::strstr(argv[i], "--only-stats")) ) {
-      printContents = false;
+      print_values = false;
     } else {
       filename = argv[i];
     }
   }
 
-  readParquet(filename, printContents);
+  parquet_cpp::ParquetFileReader reader;
+  parquet_cpp::LocalFile file;
+
+  file.Open(filename);
+  if (!file.is_open()) {
+    std::cerr << "Could not open file " << file.path()
+              << std::endl;
+    return -1;
+  }
+
+  try {
+    reader.Open(&file);
+    reader.ParseMetaData();
+    reader.DebugPrint(std::cout, print_values);
+  } catch (const std::exception& e) {
+    std::cerr << "Parquet error: "
+              << e.what()
+              << std::endl;
+    return -1;
+  }
 
   return 0;
 }
-

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/setup_build_env.sh
----------------------------------------------------------------------
diff --git a/setup_build_env.sh b/setup_build_env.sh
index 1cd7bb2..c95b889 100755
--- a/setup_build_env.sh
+++ b/setup_build_env.sh
@@ -17,9 +17,8 @@ if [ "$(uname)" != "Darwin" ]; then
   export THRIFT_HOME=$BUILD_DIR/thirdparty/installed
 fi
 
-export GTEST_HOME=$BUILD_DIR/thirdparty/$GTEST_BASEDIR
-
 export PARQUET_TEST_DATA=$SOURCE_DIR/data
+export GTEST_HOME=$BUILD_DIR/thirdparty/$GTEST_BASEDIR
 
 cmake $SOURCE_DIR
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet.cc
----------------------------------------------------------------------
diff --git a/src/parquet.cc b/src/parquet.cc
deleted file mode 100644
index 6b6adaa..0000000
--- a/src/parquet.cc
+++ /dev/null
@@ -1,272 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#include "parquet/parquet.h"
-
-#include <algorithm>
-#include <string>
-#include <string.h>
-
-#include <thrift/protocol/TDebugProtocol.h>
-
-#include "parquet/encodings/encodings.h"
-#include "parquet/compression/codec.h"
-#include "parquet/thrift/util.h"
-
-const int DATA_PAGE_SIZE = 64 * 1024;
-
-namespace parquet_cpp {
-
-using parquet::CompressionCodec;
-using parquet::Encoding;
-using parquet::FieldRepetitionType;
-using parquet::PageType;
-using parquet::SchemaElement;
-using parquet::Type;
-
-InMemoryInputStream::InMemoryInputStream(const uint8_t* buffer, int64_t len) :
-   buffer_(buffer), len_(len), offset_(0) {}
-
-const uint8_t* InMemoryInputStream::Peek(int num_to_peek, int* num_bytes) {
-  *num_bytes = std::min(static_cast<int64_t>(num_to_peek), len_ - offset_);
-  return buffer_ + offset_;
-}
-
-const uint8_t* InMemoryInputStream::Read(int num_to_read, int* num_bytes) {
-  const uint8_t* result = Peek(num_to_read, num_bytes);
-  offset_ += *num_bytes;
-  return result;
-}
-
-ScopedInMemoryInputStream::ScopedInMemoryInputStream(int64_t len) {
-  buffer_.resize(len);
-  stream_.reset(new InMemoryInputStream(buffer_.data(), buffer_.size()));
-}
-
-uint8_t* ScopedInMemoryInputStream::data() {
-  return buffer_.data();
-}
-
-int64_t ScopedInMemoryInputStream::size() {
-  return buffer_.size();
-}
-
-const uint8_t* ScopedInMemoryInputStream::Peek(int num_to_peek,
-                                               int* num_bytes) {
-  return stream_->Peek(num_to_peek, num_bytes);
-}
-
-const uint8_t* ScopedInMemoryInputStream::Read(int num_to_read,
-                                               int* num_bytes) {
-  return stream_->Read(num_to_read, num_bytes);
-}
-
-
-ColumnReader::~ColumnReader() {
-  delete stream_;
-}
-
-ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
-    const SchemaElement* schema, InputStream* stream)
-  : metadata_(metadata),
-    schema_(schema),
-    stream_(stream),
-    current_decoder_(NULL),
-    num_buffered_values_(0),
-    num_decoded_values_(0),
-    buffered_values_offset_(0) {
-  int value_byte_size;
-  switch (metadata->type) {
-    case parquet::Type::BOOLEAN:
-      value_byte_size = 1;
-      break;
-    case parquet::Type::INT32:
-      value_byte_size = sizeof(int32_t);
-      break;
-    case parquet::Type::INT64:
-      value_byte_size = sizeof(int64_t);
-      break;
-    case parquet::Type::FLOAT:
-      value_byte_size = sizeof(float);
-      break;
-    case parquet::Type::DOUBLE:
-      value_byte_size = sizeof(double);
-      break;
-    case parquet::Type::BYTE_ARRAY:
-      value_byte_size = sizeof(ByteArray);
-      break;
-    default:
-      ParquetException::NYI("Unsupported type");
-  }
-
-  switch (metadata->codec) {
-    case CompressionCodec::UNCOMPRESSED:
-      break;
-    case CompressionCodec::SNAPPY:
-      decompressor_.reset(new SnappyCodec());
-      break;
-    default:
-      ParquetException::NYI("Reading compressed data");
-  }
-
-  config_ = Config::DefaultConfig();
-  values_buffer_.resize(config_.batch_size * value_byte_size);
-}
-
-void ColumnReader::BatchDecode() {
-  buffered_values_offset_ = 0;
-  uint8_t* buf = &values_buffer_[0];
-  int batch_size = config_.batch_size;
-  switch (metadata_->type) {
-    case parquet::Type::BOOLEAN:
-      num_decoded_values_ =
-          current_decoder_->GetBool(reinterpret_cast<bool*>(buf), batch_size);
-      break;
-    case parquet::Type::INT32:
-      num_decoded_values_ =
-          current_decoder_->GetInt32(reinterpret_cast<int32_t*>(buf), batch_size);
-      break;
-    case parquet::Type::INT64:
-      num_decoded_values_ =
-          current_decoder_->GetInt64(reinterpret_cast<int64_t*>(buf), batch_size);
-      break;
-    case parquet::Type::FLOAT:
-      num_decoded_values_ =
-          current_decoder_->GetFloat(reinterpret_cast<float*>(buf), batch_size);
-      break;
-    case parquet::Type::DOUBLE:
-      num_decoded_values_ =
-          current_decoder_->GetDouble(reinterpret_cast<double*>(buf), batch_size);
-      break;
-    case parquet::Type::BYTE_ARRAY:
-      num_decoded_values_ =
-          current_decoder_->GetByteArray(reinterpret_cast<ByteArray*>(buf), batch_size);
-      break;
-    default:
-      ParquetException::NYI("Unsupported type.");
-  }
-}
-
-// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
-// encoding.
-static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
-  return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
-}
-
-bool ColumnReader::ReadNewPage() {
-  // Loop until we find the next data page.
-
-  while (true) {
-    int bytes_read = 0;
-    const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
-    if (bytes_read == 0) return false;
-    uint32_t header_size = bytes_read;
-    DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
-    stream_->Read(header_size, &bytes_read);
-
-    int compressed_len = current_page_header_.compressed_page_size;
-    int uncompressed_len = current_page_header_.uncompressed_page_size;
-
-    // Read the compressed data page.
-    buffer = stream_->Read(compressed_len, &bytes_read);
-    if (bytes_read != compressed_len) ParquetException::EofException();
-
-    // Uncompress it if we need to
-    if (decompressor_ != NULL) {
-      // Grow the uncompressed buffer if we need to.
-      if (uncompressed_len > decompression_buffer_.size()) {
-        decompression_buffer_.resize(uncompressed_len);
-      }
-      decompressor_->Decompress(
-          compressed_len, buffer, uncompressed_len, &decompression_buffer_[0]);
-      buffer = &decompression_buffer_[0];
-    }
-
-    if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
-      std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it =
-          decoders_.find(Encoding::RLE_DICTIONARY);
-      if (it != decoders_.end()) {
-        throw ParquetException("Column cannot have more than one dictionary.");
-      }
-
-      PlainDecoder dictionary(schema_->type);
-      dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
-          buffer, uncompressed_len);
-      std::shared_ptr<Decoder> decoder(
-          new DictionaryDecoder(schema_->type, &dictionary));
-      decoders_[Encoding::RLE_DICTIONARY] = decoder;
-      current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
-      continue;
-    } else if (current_page_header_.type == PageType::DATA_PAGE) {
-      // Read a data page.
-      num_buffered_values_ = current_page_header_.data_page_header.num_values;
-
-      // Read definition levels.
-      if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
-        int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
-        buffer += sizeof(uint32_t);
-        definition_level_decoder_.reset(
-            new RleDecoder(buffer, num_definition_bytes, 1));
-        buffer += num_definition_bytes;
-        uncompressed_len -= sizeof(uint32_t);
-        uncompressed_len -= num_definition_bytes;
-      }
-
-      // TODO: repetition levels
-
-      // Get a decoder object for this page or create a new decoder if this is the
-      // first page with this encoding.
-      Encoding::type encoding = current_page_header_.data_page_header.encoding;
-      if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
-
-      std::unordered_map<Encoding::type, std::shared_ptr<Decoder> >::iterator it =
-          decoders_.find(encoding);
-      if (it != decoders_.end()) {
-        current_decoder_ = it->second.get();
-      } else {
-        switch (encoding) {
-          case Encoding::PLAIN: {
-            std::shared_ptr<Decoder> decoder;
-            if (schema_->type == Type::BOOLEAN) {
-              decoder.reset(new BoolDecoder());
-            } else {
-              decoder.reset(new PlainDecoder(schema_->type));
-            }
-            decoders_[encoding] = decoder;
-            current_decoder_ = decoder.get();
-            break;
-          }
-          case Encoding::RLE_DICTIONARY:
-            throw ParquetException("Dictionary page must be before data page.");
-
-          case Encoding::DELTA_BINARY_PACKED:
-          case Encoding::DELTA_LENGTH_BYTE_ARRAY:
-          case Encoding::DELTA_BYTE_ARRAY:
-            ParquetException::NYI("Unsupported encoding");
-
-          default:
-            throw ParquetException("Unknown encoding type.");
-        }
-      }
-      current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
-      return true;
-    } else {
-      // We don't know what this page type is. We're allowed to skip non-data pages.
-      continue;
-    }
-  }
-  return true;
-}
-
-} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/CMakeLists.txt b/src/parquet/CMakeLists.txt
index f08901e..1809ea1 100644
--- a/src/parquet/CMakeLists.txt
+++ b/src/parquet/CMakeLists.txt
@@ -18,8 +18,10 @@
 # Headers: top level
 install(FILES
   parquet.h
+  column_reader.h
   reader.h
   exception.h
+  types.h
   DESTINATION include/parquet)
 
 ADD_PARQUET_TEST(reader-test)

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/column_reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.cc b/src/parquet/column_reader.cc
new file mode 100644
index 0000000..b7ececb
--- /dev/null
+++ b/src/parquet/column_reader.cc
@@ -0,0 +1,194 @@
+// Copyright 2012 Cloudera Inc.
+//
+// Licensed under the Apache License, Version 2.0 (the "License");
+// you may not use this file except in compliance with the License.
+// You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+#include "parquet/column_reader.h"
+
+#include <algorithm>
+#include <string>
+#include <string.h>
+
+#include "parquet/encodings/encodings.h"
+#include "parquet/compression/codec.h"
+#include "parquet/thrift/util.h"
+#include "parquet/util/input_stream.h"
+
+const int DATA_PAGE_SIZE = 64 * 1024;
+
+namespace parquet_cpp {
+
+using parquet::CompressionCodec;
+using parquet::Encoding;
+using parquet::FieldRepetitionType;
+using parquet::PageType;
+using parquet::Type;
+
+
+ColumnReader::~ColumnReader() {
+  delete stream_;
+}
+
+ColumnReader::ColumnReader(const parquet::ColumnMetaData* metadata,
+    const parquet::SchemaElement* schema, InputStream* stream)
+  : metadata_(metadata),
+    schema_(schema),
+    stream_(stream),
+    num_buffered_values_(0),
+    num_decoded_values_(0),
+    buffered_values_offset_(0) {
+
+  switch (metadata->codec) {
+    case CompressionCodec::UNCOMPRESSED:
+      break;
+    case CompressionCodec::SNAPPY:
+      decompressor_.reset(new SnappyCodec());
+      break;
+    default:
+      ParquetException::NYI("Reading compressed data");
+  }
+
+  config_ = Config::DefaultConfig();
+}
+
+
+// PLAIN_DICTIONARY is deprecated but used to be used as a dictionary index
+// encoding.
+static bool IsDictionaryIndexEncoding(const Encoding::type& e) {
+  return e == Encoding::RLE_DICTIONARY || e == Encoding::PLAIN_DICTIONARY;
+}
+
+template <int TYPE>
+bool TypedColumnReader<TYPE>::ReadNewPage() {
+  // Loop until we find the next data page.
+
+
+  while (true) {
+    int bytes_read = 0;
+    const uint8_t* buffer = stream_->Peek(DATA_PAGE_SIZE, &bytes_read);
+    if (bytes_read == 0) return false;
+    uint32_t header_size = bytes_read;
+    DeserializeThriftMsg(buffer, &header_size, &current_page_header_);
+    stream_->Read(header_size, &bytes_read);
+
+    int compressed_len = current_page_header_.compressed_page_size;
+    int uncompressed_len = current_page_header_.uncompressed_page_size;
+
+    // Read the compressed data page.
+    buffer = stream_->Read(compressed_len, &bytes_read);
+    if (bytes_read != compressed_len) ParquetException::EofException();
+
+    // Uncompress it if we need to
+    if (decompressor_ != NULL) {
+      // Grow the uncompressed buffer if we need to.
+      if (uncompressed_len > decompression_buffer_.size()) {
+        decompression_buffer_.resize(uncompressed_len);
+      }
+      decompressor_->Decompress(compressed_len, buffer, uncompressed_len,
+          &decompression_buffer_[0]);
+      buffer = &decompression_buffer_[0];
+    }
+
+    if (current_page_header_.type == PageType::DICTIONARY_PAGE) {
+      auto it = decoders_.find(Encoding::RLE_DICTIONARY);
+      if (it != decoders_.end()) {
+        throw ParquetException("Column cannot have more than one dictionary.");
+      }
+
+      PlainDecoder<TYPE> dictionary(schema_);
+      dictionary.SetData(current_page_header_.dictionary_page_header.num_values,
+          buffer, uncompressed_len);
+      std::shared_ptr<DecoderType> decoder(new DictionaryDecoder<TYPE>(schema_, &dictionary));
+
+      decoders_[Encoding::RLE_DICTIONARY] = decoder;
+      current_decoder_ = decoders_[Encoding::RLE_DICTIONARY].get();
+      continue;
+    } else if (current_page_header_.type == PageType::DATA_PAGE) {
+      // Read a data page.
+      num_buffered_values_ = current_page_header_.data_page_header.num_values;
+
+      // Read definition levels.
+      if (schema_->repetition_type != FieldRepetitionType::REQUIRED) {
+        int num_definition_bytes = *reinterpret_cast<const uint32_t*>(buffer);
+        buffer += sizeof(uint32_t);
+        definition_level_decoder_.reset(
+            new RleDecoder(buffer, num_definition_bytes, 1));
+        buffer += num_definition_bytes;
+        uncompressed_len -= sizeof(uint32_t);
+        uncompressed_len -= num_definition_bytes;
+      }
+
+      // TODO: repetition levels
+
+      // Get a decoder object for this page or create a new decoder if this is the
+      // first page with this encoding.
+      Encoding::type encoding = current_page_header_.data_page_header.encoding;
+      if (IsDictionaryIndexEncoding(encoding)) encoding = Encoding::RLE_DICTIONARY;
+
+      auto it = decoders_.find(encoding);
+      if (it != decoders_.end()) {
+        current_decoder_ = it->second.get();
+      } else {
+        switch (encoding) {
+          case Encoding::PLAIN: {
+            std::shared_ptr<DecoderType> decoder(new PlainDecoder<TYPE>(schema_));
+            decoders_[encoding] = decoder;
+            current_decoder_ = decoder.get();
+            break;
+          }
+          case Encoding::RLE_DICTIONARY:
+            throw ParquetException("Dictionary page must be before data page.");
+
+          case Encoding::DELTA_BINARY_PACKED:
+          case Encoding::DELTA_LENGTH_BYTE_ARRAY:
+          case Encoding::DELTA_BYTE_ARRAY:
+            ParquetException::NYI("Unsupported encoding");
+
+          default:
+            throw ParquetException("Unknown encoding type.");
+        }
+      }
+      current_decoder_->SetData(num_buffered_values_, buffer, uncompressed_len);
+      return true;
+    } else {
+      // We don't know what this page type is. We're allowed to skip non-data pages.
+      continue;
+    }
+  }
+  return true;
+}
+
+std::shared_ptr<ColumnReader> ColumnReader::Make(const parquet::ColumnMetaData* metadata,
+    const parquet::SchemaElement* element, InputStream* stream) {
+  switch (metadata->type) {
+    case Type::BOOLEAN:
+      return std::make_shared<BoolReader>(metadata, element, stream);
+    case Type::INT32:
+      return std::make_shared<Int32Reader>(metadata, element, stream);
+    case Type::INT64:
+      return std::make_shared<Int64Reader>(metadata, element, stream);
+    case Type::INT96:
+      return std::make_shared<Int96Reader>(metadata, element, stream);
+    case Type::FLOAT:
+      return std::make_shared<FloatReader>(metadata, element, stream);
+    case Type::DOUBLE:
+      return std::make_shared<DoubleReader>(metadata, element, stream);
+    case Type::BYTE_ARRAY:
+      return std::make_shared<ByteArrayReader>(metadata, element, stream);
+    default:
+      ParquetException::NYI("type reader not implemented");
+  }
+  // Unreachable code, but supress compiler warning
+  return std::shared_ptr<ColumnReader>(nullptr);
+}
+
+} // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/column_reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/column_reader.h b/src/parquet/column_reader.h
new file mode 100644
index 0000000..cd6cc02
--- /dev/null
+++ b/src/parquet/column_reader.h
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef PARQUET_COLUMN_READER_H
+#define PARQUET_COLUMN_READER_H
+
+#include <exception>
+#include <cstdint>
+#include <cstring>
+#include <memory>
+#include <string>
+#include <unordered_map>
+#include <vector>
+
+#include "parquet/exception.h"
+#include "parquet/types.h"
+#include "parquet/thrift/parquet_constants.h"
+#include "parquet/thrift/parquet_types.h"
+#include "parquet/util/input_stream.h"
+#include "parquet/encodings/encodings.h"
+#include "parquet/util/rle-encoding.h"
+
+namespace std {
+
+template <>
+struct hash<parquet::Encoding::type> {
+  std::size_t operator()(const parquet::Encoding::type& k) const {
+    return hash<int>()(static_cast<int>(k));
+  }
+};
+
+} // namespace std
+
+namespace parquet_cpp {
+
+class Codec;
+
+class ColumnReader {
+ public:
+
+  struct Config {
+    int batch_size;
+
+    static Config DefaultConfig() {
+      Config config;
+      config.batch_size = 128;
+      return config;
+    }
+  };
+
+  ColumnReader(const parquet::ColumnMetaData*,
+      const parquet::SchemaElement*, InputStream* stream);
+
+  virtual ~ColumnReader();
+
+  static std::shared_ptr<ColumnReader> Make(const parquet::ColumnMetaData*,
+      const parquet::SchemaElement*, InputStream* stream);
+
+  virtual bool ReadNewPage() = 0;
+
+  // Returns true if there are still values in this column.
+  bool HasNext() {
+    if (num_buffered_values_ == 0) {
+      ReadNewPage();
+      if (num_buffered_values_ == 0) return false;
+    }
+    return true;
+  }
+
+  parquet::Type::type type() const {
+    return metadata_->type;
+  }
+
+  const parquet::ColumnMetaData* metadata() const {
+    return metadata_;
+  }
+
+ protected:
+  // Reads the next definition and repetition level. Returns true if the value is NULL.
+  bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level);
+
+  Config config_;
+
+  const parquet::ColumnMetaData* metadata_;
+  const parquet::SchemaElement* schema_;
+  InputStream* stream_;
+
+  // Compression codec to use.
+  std::unique_ptr<Codec> decompressor_;
+  std::vector<uint8_t> decompression_buffer_;
+
+  parquet::PageHeader current_page_header_;
+
+  // Not set if field is required.
+  std::unique_ptr<RleDecoder> definition_level_decoder_;
+  // Not set for flat schemas.
+  std::unique_ptr<RleDecoder> repetition_level_decoder_;
+  int num_buffered_values_;
+
+  int num_decoded_values_;
+  int buffered_values_offset_;
+};
+
+
+// API to read values from a single column. This is the main client facing API.
+template <int TYPE>
+class TypedColumnReader : public ColumnReader {
+ public:
+  typedef typename type_traits<TYPE>::value_type T;
+
+  TypedColumnReader(const parquet::ColumnMetaData* metadata,
+      const parquet::SchemaElement* schema, InputStream* stream) :
+      ColumnReader(metadata, schema, stream),
+      current_decoder_(NULL) {
+    size_t value_byte_size = type_traits<TYPE>::value_byte_size;
+    values_buffer_.resize(config_.batch_size * value_byte_size);
+  }
+
+  // Returns the next value of this type.
+  // TODO: batchify this interface.
+  T NextValue(int* def_level, int* rep_level) {
+    if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return T();
+    if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
+    return reinterpret_cast<T*>(&values_buffer_[0])[buffered_values_offset_++];
+  }
+
+ private:
+  void BatchDecode();
+
+  virtual bool ReadNewPage();
+
+  typedef Decoder<TYPE> DecoderType;
+
+  // Map of compression type to decompressor object.
+  std::unordered_map<parquet::Encoding::type, std::shared_ptr<DecoderType> > decoders_;
+
+  DecoderType* current_decoder_;
+  std::vector<uint8_t> values_buffer_;
+};
+
+typedef TypedColumnReader<parquet::Type::BOOLEAN> BoolReader;
+typedef TypedColumnReader<parquet::Type::INT32> Int32Reader;
+typedef TypedColumnReader<parquet::Type::INT64> Int64Reader;
+typedef TypedColumnReader<parquet::Type::INT96> Int96Reader;
+typedef TypedColumnReader<parquet::Type::FLOAT> FloatReader;
+typedef TypedColumnReader<parquet::Type::DOUBLE> DoubleReader;
+typedef TypedColumnReader<parquet::Type::BYTE_ARRAY> ByteArrayReader;
+
+
+template <int TYPE>
+void TypedColumnReader<TYPE>::BatchDecode() {
+  buffered_values_offset_ = 0;
+  T* buf = reinterpret_cast<T*>(&values_buffer_[0]);
+  int batch_size = config_.batch_size;
+  num_decoded_values_ = current_decoder_->Decode(buf, batch_size);
+}
+
+inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) {
+  *rep_level = 1;
+  if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) {
+    ParquetException::EofException();
+  }
+  --num_buffered_values_;
+  return *def_level == 0;
+}
+
+} // namespace parquet_cpp
+
+#endif // PARQUET_COLUMN_READER_H

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/compression/codec.h
----------------------------------------------------------------------
diff --git a/src/parquet/compression/codec.h b/src/parquet/compression/codec.h
index 8166847..07648d7 100644
--- a/src/parquet/compression/codec.h
+++ b/src/parquet/compression/codec.h
@@ -15,11 +15,9 @@
 #ifndef PARQUET_COMPRESSION_CODEC_H
 #define PARQUET_COMPRESSION_CODEC_H
 
-#include "parquet/parquet.h"
-
 #include <cstdint>
-#include "parquet/thrift/parquet_constants.h"
-#include "parquet/thrift/parquet_types.h"
+
+#include "parquet/exception.h"
 
 namespace parquet_cpp {
 

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/CMakeLists.txt
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/CMakeLists.txt b/src/parquet/encodings/CMakeLists.txt
index 72baf48..544b1e1 100644
--- a/src/parquet/encodings/CMakeLists.txt
+++ b/src/parquet/encodings/CMakeLists.txt
@@ -15,7 +15,6 @@
 # Headers: encodings
 install(FILES
   encodings.h
-  bool-encoding.h
   delta-bit-pack-encoding.h
   delta-byte-array-encoding.h
   delta-length-byte-array-encoding.h

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/bool-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/bool-encoding.h b/src/parquet/encodings/bool-encoding.h
deleted file mode 100644
index 8eb55bc..0000000
--- a/src/parquet/encodings/bool-encoding.h
+++ /dev/null
@@ -1,48 +0,0 @@
-// Copyright 2012 Cloudera Inc.
-//
-// Licensed under the Apache License, Version 2.0 (the "License");
-// you may not use this file except in compliance with the License.
-// You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-#ifndef PARQUET_BOOL_ENCODING_H
-#define PARQUET_BOOL_ENCODING_H
-
-#include "parquet/encodings/encodings.h"
-
-#include <algorithm>
-
-namespace parquet_cpp {
-
-class BoolDecoder : public Decoder {
- public:
-  BoolDecoder() : Decoder(parquet::Type::BOOLEAN, parquet::Encoding::PLAIN) { }
-
-  virtual void SetData(int num_values, const uint8_t* data, int len) {
-    num_values_ = num_values;
-    decoder_ = RleDecoder(data, len, 1);
-  }
-
-  virtual int GetBool(bool* buffer, int max_values) {
-    max_values = std::min(max_values, num_values_);
-    for (int i = 0; i < max_values; ++i) {
-      if (!decoder_.Get(&buffer[i])) ParquetException::EofException();
-    }
-    num_values_ -= max_values;
-    return max_values;
-  }
-
- private:
-  RleDecoder decoder_;
-};
-
-} // namespace parquet_cpp
-
-#endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-bit-pack-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-bit-pack-encoding.h b/src/parquet/encodings/delta-bit-pack-encoding.h
index 77a3b26..b437734 100644
--- a/src/parquet/encodings/delta-bit-pack-encoding.h
+++ b/src/parquet/encodings/delta-bit-pack-encoding.h
@@ -22,10 +22,16 @@
 
 namespace parquet_cpp {
 
-class DeltaBitPackDecoder : public Decoder {
+template <int TYPE>
+class DeltaBitPackDecoder : public Decoder<TYPE> {
  public:
-  explicit DeltaBitPackDecoder(const parquet::Type::type& type)
-    : Decoder(type, parquet::Encoding::DELTA_BINARY_PACKED) {
+  typedef typename type_traits<TYPE>::value_type T;
+
+  explicit DeltaBitPackDecoder(const parquet::SchemaElement* schema)
+      : Decoder<TYPE>(schema, parquet::Encoding::DELTA_BINARY_PACKED) {
+
+    parquet::Type::type type = type_traits<TYPE>::parquet_type;
+
     if (type != parquet::Type::INT32 && type != parquet::Type::INT64) {
       throw ParquetException("Delta bit pack encoding should only be for integer data.");
     }
@@ -38,15 +44,13 @@ class DeltaBitPackDecoder : public Decoder {
     values_current_mini_block_ = 0;
   }
 
-  virtual int GetInt32(int32_t* buffer, int max_values) {
-    return GetInternal(buffer, max_values);
-  }
-
-  virtual int GetInt64(int64_t* buffer, int max_values) {
+  virtual int Decode(T* buffer, int max_values) {
     return GetInternal(buffer, max_values);
   }
 
  private:
+  using Decoder<TYPE>::num_values_;
+
   void InitBlock() {
     uint64_t block_size;
     if (!decoder_.GetVlqInt(&block_size)) ParquetException::EofException();

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-byte-array-encoding.h b/src/parquet/encodings/delta-byte-array-encoding.h
index 3396586..a1b5b48 100644
--- a/src/parquet/encodings/delta-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-byte-array-encoding.h
@@ -21,12 +21,12 @@
 
 namespace parquet_cpp {
 
-class DeltaByteArrayDecoder : public Decoder {
+class DeltaByteArrayDecoder : public Decoder<parquet::Type::BYTE_ARRAY> {
  public:
-  DeltaByteArrayDecoder()
-    : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_BYTE_ARRAY),
-      prefix_len_decoder_(parquet::Type::INT32),
-      suffix_decoder_() {
+  explicit DeltaByteArrayDecoder(const parquet::SchemaElement* schema)
+      : Decoder<parquet::Type::BYTE_ARRAY>(schema, parquet::Encoding::DELTA_BYTE_ARRAY),
+      prefix_len_decoder_(nullptr),
+      suffix_decoder_(nullptr) {
   }
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
@@ -43,13 +43,13 @@ class DeltaByteArrayDecoder : public Decoder {
 
   // TODO: this doesn't work and requires memory management. We need to allocate
   // new strings to store the results.
-  virtual int GetByteArray(ByteArray* buffer, int max_values) {
+  virtual int Decode(ByteArray* buffer, int max_values) {
     max_values = std::min(max_values, num_values_);
     for (int  i = 0; i < max_values; ++i) {
       int prefix_len = 0;
-      prefix_len_decoder_.GetInt32(&prefix_len, 1);
+      prefix_len_decoder_.Decode(&prefix_len, 1);
       ByteArray suffix;
-      suffix_decoder_.GetByteArray(&suffix, 1);
+      suffix_decoder_.Decode(&suffix, 1);
       buffer[i].len = prefix_len + suffix.len;
 
       uint8_t* result = reinterpret_cast<uint8_t*>(malloc(buffer[i].len));
@@ -64,7 +64,9 @@ class DeltaByteArrayDecoder : public Decoder {
   }
 
  private:
-  DeltaBitPackDecoder prefix_len_decoder_;
+  using Decoder<parquet::Type::BYTE_ARRAY>::num_values_;
+
+  DeltaBitPackDecoder<parquet::Type::INT32> prefix_len_decoder_;
   DeltaLengthByteArrayDecoder suffix_decoder_;
   ByteArray last_value_;
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/delta-length-byte-array-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/delta-length-byte-array-encoding.h b/src/parquet/encodings/delta-length-byte-array-encoding.h
index 06bf39d..a6e4c58 100644
--- a/src/parquet/encodings/delta-length-byte-array-encoding.h
+++ b/src/parquet/encodings/delta-length-byte-array-encoding.h
@@ -21,11 +21,12 @@
 
 namespace parquet_cpp {
 
-class DeltaLengthByteArrayDecoder : public Decoder {
+class DeltaLengthByteArrayDecoder : public Decoder<parquet::Type::BYTE_ARRAY> {
  public:
-  DeltaLengthByteArrayDecoder()
-    : Decoder(parquet::Type::BYTE_ARRAY, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY),
-      len_decoder_(parquet::Type::INT32) {
+  explicit DeltaLengthByteArrayDecoder(const parquet::SchemaElement* schema)
+      : Decoder<parquet::Type::BYTE_ARRAY>(
+          schema, parquet::Encoding::DELTA_LENGTH_BYTE_ARRAY),
+      len_decoder_(nullptr) {
   }
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
@@ -38,10 +39,10 @@ class DeltaLengthByteArrayDecoder : public Decoder {
     len_ = len - 4 - total_lengths_len;
   }
 
-  virtual int GetByteArray(ByteArray* buffer, int max_values) {
+  virtual int Decode(ByteArray* buffer, int max_values) {
     max_values = std::min(max_values, num_values_);
     int lengths[max_values];
-    len_decoder_.GetInt32(lengths, max_values);
+    len_decoder_.Decode(lengths, max_values);
     for (int  i = 0; i < max_values; ++i) {
       buffer[i].len = lengths[i];
       buffer[i].ptr = data_;
@@ -53,7 +54,8 @@ class DeltaLengthByteArrayDecoder : public Decoder {
   }
 
  private:
-  DeltaBitPackDecoder len_decoder_;
+  using Decoder<parquet::Type::BYTE_ARRAY>::num_values_;
+  DeltaBitPackDecoder<parquet::Type::INT32> len_decoder_;
   const uint8_t* data_;
   int len_;
 };

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/dictionary-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/dictionary-encoding.h b/src/parquet/encodings/dictionary-encoding.h
index 2501b2a..cb8fb30 100644
--- a/src/parquet/encodings/dictionary-encoding.h
+++ b/src/parquet/encodings/dictionary-encoding.h
@@ -22,56 +22,22 @@
 
 namespace parquet_cpp {
 
-class DictionaryDecoder : public Decoder {
+template <int TYPE>
+class DictionaryDecoder : public Decoder<TYPE> {
  public:
+  typedef typename type_traits<TYPE>::value_type T;
+
   // Initializes the dictionary with values from 'dictionary'. The data in dictionary
   // is not guaranteed to persist in memory after this call so the dictionary decoder
   // needs to copy the data out if necessary.
-  DictionaryDecoder(const parquet::Type::type& type, Decoder* dictionary)
-    : Decoder(type, parquet::Encoding::RLE_DICTIONARY) {
-    int num_dictionary_values = dictionary->values_left();
-    switch (type) {
-      case parquet::Type::BOOLEAN:
-        throw ParquetException("Boolean cols should not be dictionary encoded.");
-
-      case parquet::Type::INT32:
-        int32_dictionary_.resize(num_dictionary_values);
-        dictionary->GetInt32(&int32_dictionary_[0], num_dictionary_values);
-        break;
-      case parquet::Type::INT64:
-        int64_dictionary_.resize(num_dictionary_values);
-        dictionary->GetInt64(&int64_dictionary_[0], num_dictionary_values);
-        break;
-      case parquet::Type::FLOAT:
-        float_dictionary_.resize(num_dictionary_values);
-        dictionary->GetFloat(&float_dictionary_[0], num_dictionary_values);
-        break;
-      case parquet::Type::DOUBLE:
-        double_dictionary_.resize(num_dictionary_values);
-        dictionary->GetDouble(&double_dictionary_[0], num_dictionary_values);
-        break;
-      case parquet::Type::BYTE_ARRAY: {
-        byte_array_dictionary_.resize(num_dictionary_values);
-        dictionary->GetByteArray(&byte_array_dictionary_[0], num_dictionary_values);
-        int total_size = 0;
-        for (int i = 0; i < num_dictionary_values; ++i) {
-          total_size += byte_array_dictionary_[i].len;
-        }
-        byte_array_data_.resize(total_size);
-        int offset = 0;
-        for (int i = 0; i < num_dictionary_values; ++i) {
-          memcpy(&byte_array_data_[offset],
-              byte_array_dictionary_[i].ptr, byte_array_dictionary_[i].len);
-          byte_array_dictionary_[i].ptr = &byte_array_data_[offset];
-          offset += byte_array_dictionary_[i].len;
-        }
-        break;
-      }
-      default:
-        ParquetException::NYI("Unsupported dictionary type");
-    }
+  DictionaryDecoder(const parquet::SchemaElement* schema, Decoder<TYPE>* dictionary)
+      : Decoder<TYPE>(schema, parquet::Encoding::RLE_DICTIONARY) {
+    Init(dictionary);
   }
 
+  // Perform type-specific initiatialization
+  void Init(Decoder<TYPE>* dictionary);
+
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;
     if (len == 0) return;
@@ -81,47 +47,17 @@ class DictionaryDecoder : public Decoder {
     idx_decoder_ = RleDecoder(data, len, bit_width);
   }
 
-  virtual int GetInt32(int32_t* buffer, int max_values) {
-    max_values = std::min(max_values, num_values_);
-    for (int i = 0; i < max_values; ++i) {
-      buffer[i] = int32_dictionary_[index()];
-    }
-    return max_values;
-  }
-
-  virtual int GetInt64(int64_t* buffer, int max_values) {
-    max_values = std::min(max_values, num_values_);
-    for (int i = 0; i < max_values; ++i) {
-      buffer[i] = int64_dictionary_[index()];
-    }
-    return max_values;
-  }
-
-  virtual int GetFloat(float* buffer, int max_values) {
+  virtual int Decode(T* buffer, int max_values) {
     max_values = std::min(max_values, num_values_);
     for (int i = 0; i < max_values; ++i) {
-      buffer[i] = float_dictionary_[index()];
-    }
-    return max_values;
-  }
-
-  virtual int GetDouble(double* buffer, int max_values) {
-    max_values = std::min(max_values, num_values_);
-    for (int i = 0; i < max_values; ++i) {
-      buffer[i] = double_dictionary_[index()];
-    }
-    return max_values;
-  }
-
-  virtual int GetByteArray(ByteArray* buffer, int max_values) {
-    max_values = std::min(max_values, num_values_);
-    for (int i = 0; i < max_values; ++i) {
-      buffer[i] = byte_array_dictionary_[index()];
+      buffer[i] = dictionary_[index()];
     }
     return max_values;
   }
 
  private:
+  using Decoder<TYPE>::num_values_;
+
   int index() {
     int idx = 0;
     if (!idx_decoder_.Get(&idx)) ParquetException::EofException();
@@ -130,11 +66,7 @@ class DictionaryDecoder : public Decoder {
   }
 
   // Only one is set.
-  std::vector<int32_t> int32_dictionary_;
-  std::vector<int64_t> int64_dictionary_;
-  std::vector<float> float_dictionary_;
-  std::vector<double> double_dictionary_;
-  std::vector<ByteArray> byte_array_dictionary_;
+  std::vector<T> dictionary_;
 
   // Data that contains the byte array data (byte_array_dictionary_ just has the
   // pointers).
@@ -143,6 +75,39 @@ class DictionaryDecoder : public Decoder {
   RleDecoder idx_decoder_;
 };
 
+template <int TYPE>
+inline void DictionaryDecoder<TYPE>::Init(Decoder<TYPE>* dictionary) {
+  int num_dictionary_values = dictionary->values_left();
+  dictionary_.resize(num_dictionary_values);
+  dictionary->Decode(&dictionary_[0], num_dictionary_values);
+}
+
+template <>
+inline void DictionaryDecoder<parquet::Type::BOOLEAN>::Init(
+    Decoder<parquet::Type::BOOLEAN>* dictionary) {
+  ParquetException::NYI("Dictionary encoding is not implemented for boolean values");
+}
+
+template <>
+inline void DictionaryDecoder<parquet::Type::BYTE_ARRAY>::Init(
+    Decoder<parquet::Type::BYTE_ARRAY>* dictionary) {
+  int num_dictionary_values = dictionary->values_left();
+  dictionary_.resize(num_dictionary_values);
+  dictionary->Decode(&dictionary_[0], num_dictionary_values);
+
+  int total_size = 0;
+  for (int i = 0; i < num_dictionary_values; ++i) {
+    total_size += dictionary_[i].len;
+  }
+  byte_array_data_.resize(total_size);
+  int offset = 0;
+  for (int i = 0; i < num_dictionary_values; ++i) {
+    memcpy(&byte_array_data_[offset], dictionary_[i].ptr, dictionary_[i].len);
+    dictionary_[i].ptr = &byte_array_data_[offset];
+    offset += dictionary_[i].len;
+  }
+}
+
 } // namespace parquet_cpp
 
 #endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/encodings.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/encodings.h b/src/parquet/encodings/encodings.h
index 9211bf8..2017fca 100644
--- a/src/parquet/encodings/encodings.h
+++ b/src/parquet/encodings/encodings.h
@@ -17,6 +17,8 @@
 
 #include <cstdint>
 
+#include "parquet/types.h"
+
 #include "parquet/thrift/parquet_constants.h"
 #include "parquet/thrift/parquet_types.h"
 #include "parquet/util/rle-encoding.h"
@@ -24,8 +26,12 @@
 
 namespace parquet_cpp {
 
+// The Decoder template is parameterized on parquet::Type::type
+template <int TYPE>
 class Decoder {
  public:
+  typedef typename type_traits<TYPE>::value_type T;
+
   virtual ~Decoder() {}
 
   // Sets the data for a new page. This will be called multiple times on the same
@@ -36,22 +42,7 @@ class Decoder {
   // the decoder would decode put to 'max_values', storing the result in 'buffer'.
   // The function returns the number of values decoded, which should be max_values
   // except for end of the current data page.
-  virtual int GetBool(bool* buffer, int max_values) {
-    throw ParquetException("Decoder does not implement this type.");
-  }
-  virtual int GetInt32(int32_t* buffer, int max_values) {
-    throw ParquetException("Decoder does not implement this type.");
-  }
-  virtual int GetInt64(int64_t* buffer, int max_values) {
-    throw ParquetException("Decoder does not implement this type.");
-  }
-  virtual int GetFloat(float* buffer, int max_values) {
-    throw ParquetException("Decoder does not implement this type.");
-  }
-  virtual int GetDouble(double* buffer, int max_values) {
-    throw ParquetException("Decoder does not implement this type.");
-  }
-  virtual int GetByteArray(ByteArray* buffer, int max_values) {
+  virtual int Decode(T* buffer, int max_values) {
     throw ParquetException("Decoder does not implement this type.");
   }
 
@@ -62,19 +53,22 @@ class Decoder {
   const parquet::Encoding::type encoding() const { return encoding_; }
 
  protected:
-  Decoder(const parquet::Type::type& type, const parquet::Encoding::type& encoding)
-    : type_(type), encoding_(encoding), num_values_(0) {}
+  explicit Decoder(const parquet::SchemaElement* schema,
+      const parquet::Encoding::type& encoding)
+      : schema_(schema), encoding_(encoding), num_values_(0) {}
+
+  // For accessing type-specific metadata, like FIXED_LEN_BYTE_ARRAY
+  const parquet::SchemaElement* schema_;
 
-  const parquet::Type::type type_;
   const parquet::Encoding::type encoding_;
   int num_values_;
 };
 
 } // namespace parquet_cpp
 
-#include "parquet/encodings/bool-encoding.h"
 #include "parquet/encodings/plain-encoding.h"
 #include "parquet/encodings/dictionary-encoding.h"
+
 #include "parquet/encodings/delta-bit-pack-encoding.h"
 #include "parquet/encodings/delta-length-byte-array-encoding.h"
 #include "parquet/encodings/delta-byte-array-encoding.h"

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/encodings/plain-encoding.h
----------------------------------------------------------------------
diff --git a/src/parquet/encodings/plain-encoding.h b/src/parquet/encodings/plain-encoding.h
index b094cdb..5fb460e 100644
--- a/src/parquet/encodings/plain-encoding.h
+++ b/src/parquet/encodings/plain-encoding.h
@@ -21,11 +21,15 @@
 
 namespace parquet_cpp {
 
-class PlainDecoder : public Decoder {
+template <int TYPE>
+class PlainDecoder : public Decoder<TYPE> {
  public:
-  explicit PlainDecoder(const parquet::Type::type& type)
-    : Decoder(type, parquet::Encoding::PLAIN), data_(NULL), len_(0) {
-  }
+  typedef typename type_traits<TYPE>::value_type T;
+  using Decoder<TYPE>::num_values_;
+
+  explicit PlainDecoder(const parquet::SchemaElement* schema) :
+      Decoder<TYPE>(schema, parquet::Encoding::PLAIN),
+      data_(NULL), len_(0) {}
 
   virtual void SetData(int num_values, const uint8_t* data, int len) {
     num_values_ = num_values;
@@ -33,49 +37,61 @@ class PlainDecoder : public Decoder {
     len_ = len;
   }
 
-  int GetValues(void* buffer, int max_values, int byte_size) {
-    max_values = std::min(max_values, num_values_);
-    int size = max_values * byte_size;
-    if (len_ < size)  ParquetException::EofException();
-    memcpy(buffer, data_, size);
-    data_ += size;
-    len_ -= size;
-    num_values_ -= max_values;
-    return max_values;
-  }
+  virtual int Decode(T* buffer, int max_values);
+ private:
+  const uint8_t* data_;
+  int len_;
+};
 
-  virtual int GetInt32(int32_t* buffer, int max_values) {
-    return GetValues(buffer, max_values, sizeof(int32_t));
-  }
+template <int TYPE>
+inline int PlainDecoder<TYPE>::Decode(T* buffer, int max_values) {
+  max_values = std::min(max_values, num_values_);
+  int size = max_values * sizeof(T);
+  if (len_ < size)  ParquetException::EofException();
+  memcpy(buffer, data_, size);
+  data_ += size;
+  len_ -= size;
+  num_values_ -= max_values;
+  return max_values;
+}
 
-  virtual int GetInt64(int64_t* buffer, int max_values) {
-    return GetValues(buffer, max_values, sizeof(int64_t));
+// Template specialization for BYTE_ARRAY
+template <>
+inline int PlainDecoder<parquet::Type::BYTE_ARRAY>::Decode(ByteArray* buffer,
+    int max_values) {
+  max_values = std::min(max_values, num_values_);
+  for (int i = 0; i < max_values; ++i) {
+    buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
+    if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException();
+    buffer[i].ptr = data_ + sizeof(uint32_t);
+    data_ += sizeof(uint32_t) + buffer[i].len;
+    len_ -= sizeof(uint32_t) + buffer[i].len;
   }
+  num_values_ -= max_values;
+  return max_values;
+}
 
-  virtual int GetFloat(float* buffer, int max_values) {
-    return GetValues(buffer, max_values, sizeof(float));
-  }
+template <>
+class PlainDecoder<parquet::Type::BOOLEAN> : public Decoder<parquet::Type::BOOLEAN> {
+ public:
+  explicit PlainDecoder(const parquet::SchemaElement* schema) :
+      Decoder<parquet::Type::BOOLEAN>(schema, parquet::Encoding::PLAIN) {}
 
-  virtual int GetDouble(double* buffer, int max_values) {
-    return GetValues(buffer, max_values, sizeof(double));
+  virtual void SetData(int num_values, const uint8_t* data, int len) {
+    num_values_ = num_values;
+    decoder_ = RleDecoder(data, len, 1);
   }
 
-  virtual int GetByteArray(ByteArray* buffer, int max_values) {
+  virtual int Decode(bool* buffer, int max_values) {
     max_values = std::min(max_values, num_values_);
     for (int i = 0; i < max_values; ++i) {
-      buffer[i].len = *reinterpret_cast<const uint32_t*>(data_);
-      if (len_ < sizeof(uint32_t) + buffer[i].len) ParquetException::EofException();
-      buffer[i].ptr = data_ + sizeof(uint32_t);
-      data_ += sizeof(uint32_t) + buffer[i].len;
-      len_ -= sizeof(uint32_t) + buffer[i].len;
+      if (!decoder_.Get(&buffer[i])) ParquetException::EofException();
     }
     num_values_ -= max_values;
     return max_values;
   }
-
  private:
-  const uint8_t* data_;
-  int len_;
+  RleDecoder decoder_;
 };
 
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/parquet.h
----------------------------------------------------------------------
diff --git a/src/parquet/parquet.h b/src/parquet/parquet.h
index 4469a82..0fd3e97 100644
--- a/src/parquet/parquet.h
+++ b/src/parquet/parquet.h
@@ -27,199 +27,8 @@
 #include <vector>
 
 #include "parquet/exception.h"
-#include "parquet/thrift/parquet_constants.h"
-#include "parquet/thrift/parquet_types.h"
-#include "parquet/util/rle-encoding.h"
-
-namespace std {
-
-template <>
-struct hash<parquet::Encoding::type> {
-  std::size_t operator()(const parquet::Encoding::type& k) const {
-    return hash<int>()(static_cast<int>(k));
-  }
-};
-
-} // namespace std
-
-namespace parquet_cpp {
-
-class Codec;
-class Decoder;
-
-struct ByteArray {
-  uint32_t len;
-  const uint8_t* ptr;
-};
-
-// Interface for the column reader to get the bytes. The interface is a stream
-// interface, meaning the bytes in order and once a byte is read, it does not
-// need to be read again.
-class InputStream {
- public:
-  // Returns the next 'num_to_peek' without advancing the current position.
-  // *num_bytes will contain the number of bytes returned which can only be
-  // less than num_to_peek at end of stream cases.
-  // Since the position is not advanced, calls to this function are idempotent.
-  // The buffer returned to the caller is still owned by the input stream and must
-  // stay valid until the next call to Peek() or Read().
-  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes) = 0;
-
-  // Identical to Peek(), except the current position in the stream is advanced by
-  // *num_bytes.
-  virtual const uint8_t* Read(int num_to_read, int* num_bytes) = 0;
-
-  virtual ~InputStream() {}
-
- protected:
-  InputStream() {}
-};
-
-// Implementation of an InputStream when all the bytes are in memory.
-class InMemoryInputStream : public InputStream {
- public:
-  InMemoryInputStream(const uint8_t* buffer, int64_t len);
-  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
-  virtual const uint8_t* Read(int num_to_read, int* num_bytes);
-
- private:
-  const uint8_t* buffer_;
-  int64_t len_;
-  int64_t offset_;
-};
-
-// A wrapper for InMemoryInputStream to manage the memory.
-class ScopedInMemoryInputStream : public InputStream {
- public:
-  ScopedInMemoryInputStream(int64_t len);
-  uint8_t* data();
-  int64_t size();
-  virtual const uint8_t* Peek(int num_to_peek, int* num_bytes);
-  virtual const uint8_t* Read(int num_to_read, int* num_bytes);
-
- private:
-  std::vector<uint8_t> buffer_;
-  std::unique_ptr<InMemoryInputStream> stream_;
-};
-
-// API to read values from a single column. This is the main client facing API.
-class ColumnReader {
- public:
-  struct Config {
-    int batch_size;
-
-    static Config DefaultConfig() {
-      Config config;
-      config.batch_size = 128;
-      return config;
-    }
-  };
-
-  ColumnReader(const parquet::ColumnMetaData*,
-      const parquet::SchemaElement*, InputStream* stream);
-
-  ~ColumnReader();
-
-  // Returns true if there are still values in this column.
-  bool HasNext();
-
-  // Returns the next value of this type.
-  // TODO: batchify this interface.
-  bool GetBool(int* definition_level, int* repetition_level);
-  int32_t GetInt32(int* definition_level, int* repetition_level);
-  int64_t GetInt64(int* definition_level, int* repetition_level);
-  float GetFloat(int* definition_level, int* repetition_level);
-  double GetDouble(int* definition_level, int* repetition_level);
-  ByteArray GetByteArray(int* definition_level, int* repetition_level);
-
- private:
-  bool ReadNewPage();
-  // Reads the next definition and repetition level. Returns true if the value is NULL.
-  bool ReadDefinitionRepetitionLevels(int* def_level, int* rep_level);
-
-  void BatchDecode();
-
-  Config config_;
-
-  const parquet::ColumnMetaData* metadata_;
-  const parquet::SchemaElement* schema_;
-  InputStream* stream_;
-
-  // Compression codec to use.
-  std::unique_ptr<Codec> decompressor_;
-  std::vector<uint8_t> decompression_buffer_;
-
-  // Map of compression type to decompressor object.
-  std::unordered_map<parquet::Encoding::type, std::shared_ptr<Decoder> > decoders_;
-
-  parquet::PageHeader current_page_header_;
-
-  // Not set if field is required.
-  std::unique_ptr<RleDecoder> definition_level_decoder_;
-  // Not set for flat schemas.
-  std::unique_ptr<RleDecoder> repetition_level_decoder_;
-  Decoder* current_decoder_;
-  int num_buffered_values_;
-
-  std::vector<uint8_t> values_buffer_;
-  int num_decoded_values_;
-  int buffered_values_offset_;
-};
-
-
-inline bool ColumnReader::HasNext() {
-  if (num_buffered_values_ == 0) {
-    ReadNewPage();
-    if (num_buffered_values_ == 0) return false;
-  }
-  return true;
-}
-
-inline bool ColumnReader::GetBool(int* def_level, int* rep_level) {
-  if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return bool();
-  if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
-  return reinterpret_cast<bool*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline int32_t ColumnReader::GetInt32(int* def_level, int* rep_level) {
-  if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return int32_t();
-  if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
-  return reinterpret_cast<int32_t*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline int64_t ColumnReader::GetInt64(int* def_level, int* rep_level) {
-  if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return int64_t();
-  if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
-  return reinterpret_cast<int64_t*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline float ColumnReader::GetFloat(int* def_level, int* rep_level) {
-  if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return float();
-  if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
-  return reinterpret_cast<float*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline double ColumnReader::GetDouble(int* def_level, int* rep_level) {
-  if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return double();
-  if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
-  return reinterpret_cast<double*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline ByteArray ColumnReader::GetByteArray(int* def_level, int* rep_level) {
-  if (ReadDefinitionRepetitionLevels(def_level, rep_level)) return ByteArray();
-  if (buffered_values_offset_ == num_decoded_values_) BatchDecode();
-  return reinterpret_cast<ByteArray*>(&values_buffer_[0])[buffered_values_offset_++];
-}
-
-inline bool ColumnReader::ReadDefinitionRepetitionLevels(int* def_level, int* rep_level) {
-  *rep_level = 1;
-  if (definition_level_decoder_ && !definition_level_decoder_->Get(def_level)) {
-    ParquetException::EofException();
-  }
-  --num_buffered_values_;
-  return *def_level == 0;
-}
-
-} // namespace parquet_cpp
+#include "parquet/reader.h"
+#include "parquet/column_reader.h"
+#include "parquet/util/input_stream.h"
 
 #endif

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/reader-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader-test.cc b/src/parquet/reader-test.cc
index 0f06f3f..1459afc 100644
--- a/src/parquet/reader-test.cc
+++ b/src/parquet/reader-test.cc
@@ -42,9 +42,7 @@ class TestAllTypesPlain : public ::testing::Test {
     reader_.Open(&file_);
   }
 
-  void TearDown() {
-    reader_.Close();
-  }
+  void TearDown() {}
 
  protected:
   LocalFile file_;
@@ -56,4 +54,14 @@ TEST_F(TestAllTypesPlain, ParseMetaData) {
   reader_.ParseMetaData();
 }
 
+TEST_F(TestAllTypesPlain, DebugPrintWorks) {
+  std::stringstream ss;
+
+  // Automatically parses metadata
+  reader_.DebugPrint(ss);
+
+  std::string result = ss.str();
+  ASSERT_TRUE(result.size() > 0);
+}
+
 } // namespace parquet_cpp

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/1f24e765/src/parquet/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/reader.cc b/src/parquet/reader.cc
index 7ccd98c..7c727ba 100644
--- a/src/parquet/reader.cc
+++ b/src/parquet/reader.cc
@@ -18,18 +18,30 @@
 #include "parquet/reader.h"
 
 #include <cstdio>
+#include <cstring>
+#include <memory>
+#include <sstream>
+#include <string>
 #include <vector>
 
+#include "parquet/column_reader.h"
 #include "parquet/exception.h"
+
 #include "parquet/thrift/util.h"
 
+#include "parquet/util/input_stream.h"
+
+using std::string;
+using std::vector;
+using parquet::Type;
+
 namespace parquet_cpp {
 
 // ----------------------------------------------------------------------
 // LocalFile methods
 
 LocalFile::~LocalFile() {
-  // You must explicitly call Close
+  CloseFile();
 }
 
 void LocalFile::Open(const std::string& path) {
@@ -39,6 +51,11 @@ void LocalFile::Open(const std::string& path) {
 }
 
 void LocalFile::Close() {
+  // Pure virtual
+  CloseFile();
+}
+
+void LocalFile::CloseFile() {
   if (is_open_) {
     fclose(file_);
     is_open_ = false;
@@ -58,9 +75,51 @@ size_t LocalFile::Tell() {
   return ftell(file_);
 }
 
-void LocalFile::Read(size_t nbytes, uint8_t* buffer,
-    size_t* bytes_read) {
-  *bytes_read = fread(buffer, 1, nbytes, file_);
+size_t LocalFile::Read(size_t nbytes, uint8_t* buffer) {
+  return fread(buffer, 1, nbytes, file_);
+}
+
+// ----------------------------------------------------------------------
+// RowGroupReader
+
+ColumnReader* RowGroupReader::Column(size_t i) {
+  // TODO: boundschecking
+  auto it = column_readers_.find(i);
+  if (it !=  column_readers_.end()) {
+    // Already have constructed the ColumnReader
+    return it->second.get();
+  }
+
+  const parquet::ColumnChunk& col = row_group_->columns[i];
+
+  size_t col_start = col.meta_data.data_page_offset;
+  if (col.meta_data.__isset.dictionary_page_offset &&
+      col_start > col.meta_data.dictionary_page_offset) {
+    col_start = col.meta_data.dictionary_page_offset;
+  }
+
+  std::unique_ptr<ScopedInMemoryInputStream> input(
+      new ScopedInMemoryInputStream(col.meta_data.total_compressed_size));
+
+  FileLike* source = this->parent_->buffer_;
+
+  source->Seek(col_start);
+
+  // TODO(wesm): Law of demeter violation
+  size_t bytes_read = source->Read(input->size(), input->data());
+
+  if (bytes_read != input->size()) {
+    std::cout << "Bytes needed: " << col.meta_data.total_compressed_size << std::endl;
+    std::cout << "Bytes read: " << bytes_read << std::endl;
+    throw ParquetException("Unable to read column chunk data");
+  }
+
+  // TODO(wesm): This presumes a flat schema
+  std::shared_ptr<ColumnReader> reader = ColumnReader::Make(&col.meta_data,
+      &this->parent_->metadata_.schema[i + 1], input.release());
+  column_readers_[i] = reader;
+
+  return reader.get();
 }
 
 // ----------------------------------------------------------------------
@@ -70,6 +129,12 @@ void LocalFile::Read(size_t nbytes, uint8_t* buffer,
 static constexpr uint32_t FOOTER_SIZE = 8;
 static constexpr uint8_t PARQUET_MAGIC[4] = {'P', 'A', 'R', '1'};
 
+ParquetFileReader::ParquetFileReader() :
+    parsed_metadata_(false),
+    buffer_(nullptr) {}
+
+ParquetFileReader::~ParquetFileReader() {}
+
 void ParquetFileReader::Open(FileLike* buffer) {
   buffer_ = buffer;
 }
@@ -78,6 +143,29 @@ void ParquetFileReader::Close() {
   buffer_->Close();
 }
 
+RowGroupReader* ParquetFileReader::RowGroup(size_t i) {
+  if (i >= num_row_groups()) {
+    std::stringstream ss;
+    ss << "The file only has " << num_row_groups()
+       << "row groups, requested reader for: "
+       << i;
+    throw ParquetException(ss.str());
+  }
+
+  auto it = row_group_readers_.find(i);
+  if (it != row_group_readers_.end()) {
+    // Constructed the RowGroupReader already
+    return it->second.get();
+  }
+  if (!parsed_metadata_) {
+    ParseMetaData();
+  }
+
+  // Construct the RowGroupReader
+  row_group_readers_[i] = std::make_shared<RowGroupReader>(this, &metadata_.row_groups[i]);
+  return row_group_readers_[i].get();
+}
+
 void ParquetFileReader::ParseMetaData() {
   size_t filesize = buffer_->Size();
 
@@ -85,11 +173,11 @@ void ParquetFileReader::ParseMetaData() {
     throw ParquetException("Corrupted file, smaller than file footer");
   }
 
-  size_t bytes_read;
   uint8_t footer_buffer[FOOTER_SIZE];
 
   buffer_->Seek(filesize - FOOTER_SIZE);
-  buffer_->Read(FOOTER_SIZE, footer_buffer, &bytes_read);
+
+  size_t bytes_read = buffer_->Read(FOOTER_SIZE, footer_buffer);
 
   if (bytes_read != FOOTER_SIZE) {
     throw ParquetException("Invalid parquet file. Corrupt footer.");
@@ -107,11 +195,192 @@ void ParquetFileReader::ParseMetaData() {
   buffer_->Seek(metadata_start);
 
   std::vector<uint8_t> metadata_buffer(metadata_len);
-  buffer_->Read(metadata_len, &metadata_buffer[0], &bytes_read);
+  bytes_read = buffer_->Read(metadata_len, &metadata_buffer[0]);
   if (bytes_read != metadata_len) {
     throw ParquetException("Invalid parquet file. Could not read metadata bytes.");
   }
   DeserializeThriftMsg(&metadata_buffer[0], &metadata_len, &metadata_);
+  parsed_metadata_ = true;
+}
+
+// ----------------------------------------------------------------------
+// ParquetFileReader::DebugPrint
+
+static string parquet_type_to_string(Type::type t) {
+  switch (t) {
+    case Type::BOOLEAN:
+      return "BOOLEAN";
+      break;
+    case Type::INT32:
+      return "INT32";
+      break;
+    case Type::INT64:
+      return "INT64";
+      break;
+    case Type::FLOAT:
+      return "FLOAT";
+      break;
+    case Type::DOUBLE:
+      return "DOUBLE";
+      break;
+    case Type::BYTE_ARRAY:
+      return "BYTE_ARRAY";
+      break;
+    case Type::INT96:
+      return "INT96";
+      break;
+    case Type::FIXED_LEN_BYTE_ARRAY:
+      return "FIXED_LEN_BYTE_ARRAY";
+      break;
+    default:
+      return "UNKNOWN";
+      break;
+  }
+}
+
+// the fixed initial size is just for an example
+#define COL_WIDTH "17"
+
+void ParquetFileReader::DebugPrint(std::ostream& stream, bool print_values) {
+  if (!parsed_metadata_) {
+    ParseMetaData();
+  }
+
+  stream << "File statistics:\n";
+  stream << "Total rows: " << metadata_.num_rows << "\n";
+  for (int c = 1; c < metadata_.schema.size(); ++c) {
+    stream << "Column " << c-1 << ": " << metadata_.schema[c].name << " ("
+           << parquet_type_to_string(metadata_.schema[c].type);
+    if (metadata_.schema[c].type == Type::INT96 ||
+        metadata_.schema[c].type == Type::FIXED_LEN_BYTE_ARRAY) {
+      stream << " - not supported";
+    }
+    stream << ")\n";
+  }
+
+  for (int i = 0; i < metadata_.row_groups.size(); ++i) {
+    stream << "--- Row Group " << i << " ---\n";
+
+    RowGroupReader* group_reader = RowGroup(i);
+
+    // Print column metadata
+    size_t nColumns = group_reader->num_columns();
+
+    for (int c = 0; c < group_reader->num_columns(); ++c) {
+      const parquet::ColumnMetaData* meta_data = group_reader->Column(c)->metadata();
+      stream << "Column " << c
+             << ": " << meta_data->num_values << " rows, "
+             << meta_data->statistics.null_count << " null values, "
+             << meta_data->statistics.distinct_count << " distinct values, "
+             << "min value: " << (meta_data->statistics.min.length()>0 ?
+                 meta_data->statistics.min : "N/A")
+             << ", max value: " << (meta_data->statistics.max.length()>0 ?
+                 meta_data->statistics.max : "N/A") << ".\n";
+    }
+
+    if (!print_values) {
+      continue;
+    }
+
+    // Create readers for all columns and print contents
+    vector<ColumnReader*> readers(nColumns, NULL);
+    for (int c = 0; c < nColumns; ++c) {
+      ColumnReader* col_reader = group_reader->Column(c);
+
+      Type::type col_type = col_reader->type();
+
+      printf("%-" COL_WIDTH"s", metadata_.schema[c+1].name.c_str());
+
+      if (col_type == Type::INT96 || col_type == Type::FIXED_LEN_BYTE_ARRAY) {
+        continue;
+      }
+
+      // This is OK in this method as long as the RowGroupReader does not get deleted
+      readers[c] = col_reader;
+    }
+    stream << "\n";
+
+    vector<int> def_level(nColumns, 0);
+    vector<int> rep_level(nColumns, 0);
+
+    static constexpr size_t bufsize = 25;
+    char buffer[bufsize];
+
+    bool hasRow;
+    do {
+      hasRow = false;
+      for (int c = 0; c < nColumns; ++c) {
+        if (readers[c] == NULL) {
+          snprintf(buffer, bufsize, "%-" COL_WIDTH"s", " ");
+          stream << buffer;
+          continue;
+        }
+        if (readers[c]->HasNext()) {
+          hasRow = true;
+          switch (readers[c]->type()) {
+            case Type::BOOLEAN: {
+              bool val = reinterpret_cast<BoolReader*>(readers[c])->NextValue(
+                  &def_level[c], &rep_level[c]);
+              if (def_level[c] >= rep_level[c]) {
+                snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val);
+                stream << buffer;
+              }
+              break;
+            }
+            case Type::INT32: {
+              int32_t val = reinterpret_cast<Int32Reader*>(readers[c])->NextValue(
+                  &def_level[c], &rep_level[c]);
+              if (def_level[c] >= rep_level[c]) {
+                snprintf(buffer, bufsize, "%-" COL_WIDTH"d",val);
+                stream << buffer;
+              }
+              break;
+            }
+            case Type::INT64: {
+              int64_t val = reinterpret_cast<Int64Reader*>(readers[c])->NextValue(
+                  &def_level[c], &rep_level[c]);
+              if (def_level[c] >= rep_level[c]) {
+                snprintf(buffer, bufsize, "%-" COL_WIDTH"ld",val);
+                stream << buffer;
+              }
+              break;
+            }
+            case Type::FLOAT: {
+              float val = reinterpret_cast<FloatReader*>(readers[c])->NextValue(
+                  &def_level[c], &rep_level[c]);
+              if (def_level[c] >= rep_level[c]) {
+                snprintf(buffer, bufsize, "%-" COL_WIDTH"f",val);
+                stream << buffer;
+              }
+              break;
+            }
+            case Type::DOUBLE: {
+              double val = reinterpret_cast<DoubleReader*>(readers[c])->NextValue(
+                  &def_level[c], &rep_level[c]);
+              if (def_level[c] >= rep_level[c]) {
+                snprintf(buffer, bufsize, "%-" COL_WIDTH"lf",val);
+                stream << buffer;
+              }
+              break;
+            }
+            case Type::BYTE_ARRAY: {
+              ByteArray val = reinterpret_cast<ByteArrayReader*>(readers[c])->NextValue(
+                  &def_level[c], &rep_level[c]);
+              if (def_level[c] >= rep_level[c]) {
+                string result = ByteArrayToString(val);
+                snprintf(buffer, bufsize, "%-" COL_WIDTH"s", result.c_str());
+                stream << buffer;
+              }
+              break;
+            }
+            default:
+              continue;
+          }
+        }
+      }
+      stream << "\n";
+    } while (hasRow);
+  }
 }
 
 } // namespace parquet_cpp