You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@parquet.apache.org by uw...@apache.org on 2017/04/06 07:26:37 UTC

parquet-cpp git commit: PARQUET-946: Add ReadRowGroup and num_row_group methods to arrow::FileReader

Repository: parquet-cpp
Updated Branches:
  refs/heads/master f8573ebed -> d0646659c


PARQUET-946: Add ReadRowGroup and num_row_group methods to arrow::FileReader

There's a lot of room for improvement / further refactoring here -- the assumption that an entire column in a file is being read runs very deep in the Arrow reader, so I tried to do the minimum work to decouple the row group iteration. There's some code duplication in ReadRowGroup, but we should maybe save further cleanup for a future patch.

Author: Wes McKinney <we...@twosigma.com>

Closes #291 from wesm/PARQUET-946 and squashes the following commits:

6d2b48a [Wes McKinney] Add virtual dtor
c7589f7 [Wes McKinney] Add ReadRowGroup and num_row_group methods to arrow::FileReader


Project: http://git-wip-us.apache.org/repos/asf/parquet-cpp/repo
Commit: http://git-wip-us.apache.org/repos/asf/parquet-cpp/commit/d0646659
Tree: http://git-wip-us.apache.org/repos/asf/parquet-cpp/tree/d0646659
Diff: http://git-wip-us.apache.org/repos/asf/parquet-cpp/diff/d0646659

Branch: refs/heads/master
Commit: d0646659c64585dccd9a8f75a9509c1ae8cfa1fb
Parents: f8573eb
Author: Wes McKinney <we...@twosigma.com>
Authored: Thu Apr 6 09:26:31 2017 +0200
Committer: Uwe L. Korn <uw...@apache.org>
Committed: Thu Apr 6 09:26:31 2017 +0200

----------------------------------------------------------------------
 cmake_modules/FindClangTools.cmake            |  19 +-
 src/parquet/arrow/arrow-reader-writer-test.cc | 146 ++++++----
 src/parquet/arrow/reader.cc                   | 305 +++++++++++++++------
 src/parquet/arrow/reader.h                    |   7 +
 src/parquet/arrow/test-util.h                 |   8 +-
 src/parquet/column/writer.cc                  |   2 +-
 6 files changed, 343 insertions(+), 144 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/cmake_modules/FindClangTools.cmake
----------------------------------------------------------------------
diff --git a/cmake_modules/FindClangTools.cmake b/cmake_modules/FindClangTools.cmake
index c07c7d2..e4ee984 100644
--- a/cmake_modules/FindClangTools.cmake
+++ b/cmake_modules/FindClangTools.cmake
@@ -27,16 +27,16 @@
 # This module defines
 #  CLANG_TIDY_BIN, The  path to the clang tidy binary
 #  CLANG_TIDY_FOUND, Whether clang tidy was found
-#  CLANG_FORMAT_BIN, The path to the clang format binary 
+#  CLANG_FORMAT_BIN, The path to the clang format binary
 #  CLANG_TIDY_FOUND, Whether clang format was found
 
-find_program(CLANG_TIDY_BIN 
-  NAMES clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6  clang-tidy
-  PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin 
+find_program(CLANG_TIDY_BIN
+  NAMES clang-tidy-3.9 clang-tidy-3.8 clang-tidy-3.7 clang-tidy-3.6  clang-tidy
+  PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
         NO_DEFAULT_PATH
 )
 
-if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" ) 
+if ( "${CLANG_TIDY_BIN}" STREQUAL "CLANG_TIDY_BIN-NOTFOUND" )
   set(CLANG_TIDY_FOUND 0)
   message("clang-tidy not found")
 else()
@@ -44,17 +44,16 @@ else()
   message("clang-tidy found at ${CLANG_TIDY_BIN}")
 endif()
 
-find_program(CLANG_FORMAT_BIN 
-  NAMES clang-format-3.8 clang-format-3.7 clang-format-3.6  clang-format
-  PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin 
+find_program(CLANG_FORMAT_BIN
+  NAMES clang-format-3.9 clang-format-3.8 clang-format-3.7 clang-format-3.6  clang-format
+  PATHS ${ClangTools_PATH} $ENV{CLANG_TOOLS_PATH} /usr/local/bin /usr/bin
         NO_DEFAULT_PATH
 )
 
-if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" ) 
+if ( "${CLANG_FORMAT_BIN}" STREQUAL "CLANG_FORMAT_BIN-NOTFOUND" )
   set(CLANG_FORMAT_FOUND 0)
   message("clang-format not found")
 else()
   set(CLANG_FORMAT_FOUND 1)
   message("clang-format found at ${CLANG_FORMAT_BIN}")
 endif()
-

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/arrow-reader-writer-test.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/arrow-reader-writer-test.cc b/src/parquet/arrow/arrow-reader-writer-test.cc
index 3b232f9..dd46893 100644
--- a/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -197,6 +197,36 @@ using ParquetDataType = DataType<test_traits<T>::parquet_enum>;
 template <typename T>
 using ParquetWriter = TypedColumnWriter<ParquetDataType<T>>;
 
+void WriteTableToBuffer(const std::shared_ptr<Table>& table, int num_threads,
+    int64_t row_group_size, std::shared_ptr<Buffer>* out) {
+  auto sink = std::make_shared<InMemoryOutputStream>();
+
+  ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), sink,
+      row_group_size, default_writer_properties()));
+  *out = sink->GetBuffer();
+}
+
+void DoSimpleRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
+    int64_t row_group_size, const std::vector<int>& column_subset,
+    std::shared_ptr<Table>* out) {
+  std::shared_ptr<Buffer> buffer;
+  WriteTableToBuffer(table, num_threads, row_group_size, &buffer);
+
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(
+      OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+          ::parquet::default_reader_properties(), nullptr, &reader));
+
+  reader->set_num_threads(num_threads);
+
+  if (column_subset.size() > 0) {
+    ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
+  } else {
+    // Read everything
+    ASSERT_OK_NO_THROW(reader->ReadTable(out));
+  }
+}
+
 template <typename TestType>
 class TestParquetIO : public ::testing::Test {
  public:
@@ -248,19 +278,6 @@ class TestParquetIO : public ::testing::Test {
     ASSERT_NE(nullptr, out->get());
   }
 
-  void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
-    std::shared_ptr<::arrow::Table> out;
-    std::unique_ptr<FileReader> reader;
-    ReaderFromSink(&reader);
-    ReadTableFromFile(std::move(reader), &out);
-    ASSERT_EQ(1, out->num_columns());
-    ASSERT_EQ(values->length(), out->num_rows());
-
-    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
-    ASSERT_EQ(1, chunked_array->num_chunks());
-    ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
-  }
-
   void PrepareListTable(int64_t size, bool nullable_lists, bool nullable_elements,
       int64_t null_count, std::shared_ptr<Table>* out) {
     std::shared_ptr<Array> values;
@@ -289,13 +306,23 @@ class TestParquetIO : public ::testing::Test {
     *out = MakeSimpleTable(parent_lists, nullable_parent_lists);
   }
 
-  void WriteReadAndCheckSingleColumnTable(const std::shared_ptr<Table>& table) {
-    std::shared_ptr<Array> values = table->column(0)->data()->chunk(0);
-    this->sink_ = std::make_shared<InMemoryOutputStream>();
-    ASSERT_OK_NO_THROW(WriteTable(*table, ::arrow::default_memory_pool(), this->sink_,
-        values->length(), default_writer_properties()));
+  void ReadAndCheckSingleColumnTable(const std::shared_ptr<::arrow::Array>& values) {
+    std::shared_ptr<::arrow::Table> out;
+    std::unique_ptr<FileReader> reader;
+    ReaderFromSink(&reader);
+    ReadTableFromFile(std::move(reader), &out);
+    ASSERT_EQ(1, out->num_columns());
+    ASSERT_EQ(values->length(), out->num_rows());
+
+    std::shared_ptr<ChunkedArray> chunked_array = out->column(0)->data();
+    ASSERT_EQ(1, chunked_array->num_chunks());
+    ASSERT_TRUE(values->Equals(chunked_array->chunk(0)));
+  }
 
-    this->ReadAndCheckSingleColumnTable(values);
+  void CheckRoundTrip(const std::shared_ptr<Table>& table) {
+    std::shared_ptr<Table> result;
+    DoSimpleRoundtrip(table, 1, table->num_rows(), {}, &result);
+    ASSERT_TRUE(table->Equals(*result));
   }
 
   template <typename ArrayType>
@@ -401,37 +428,37 @@ TYPED_TEST(TestParquetIO, SingleColumnTableOptionalReadWrite) {
 
   ASSERT_OK(NullableArray<TypeParam>(SMALL_SIZE, 10, kDefaultSeed, &values));
   std::shared_ptr<Table> table = MakeSimpleTable(values, true);
-  this->WriteReadAndCheckSingleColumnTable(table);
+  this->CheckRoundTrip(table);
 }
 
 TYPED_TEST(TestParquetIO, SingleNullableListNullableColumnReadWrite) {
   std::shared_ptr<Table> table;
   this->PrepareListTable(SMALL_SIZE, true, true, 10, &table);
-  this->WriteReadAndCheckSingleColumnTable(table);
+  this->CheckRoundTrip(table);
 }
 
 TYPED_TEST(TestParquetIO, SingleRequiredListNullableColumnReadWrite) {
   std::shared_ptr<Table> table;
   this->PrepareListTable(SMALL_SIZE, false, true, 10, &table);
-  this->WriteReadAndCheckSingleColumnTable(table);
+  this->CheckRoundTrip(table);
 }
 
 TYPED_TEST(TestParquetIO, SingleNullableListRequiredColumnReadWrite) {
   std::shared_ptr<Table> table;
   this->PrepareListTable(SMALL_SIZE, true, false, 10, &table);
-  this->WriteReadAndCheckSingleColumnTable(table);
+  this->CheckRoundTrip(table);
 }
 
 TYPED_TEST(TestParquetIO, SingleRequiredListRequiredColumnReadWrite) {
   std::shared_ptr<Table> table;
   this->PrepareListTable(SMALL_SIZE, false, false, 0, &table);
-  this->WriteReadAndCheckSingleColumnTable(table);
+  this->CheckRoundTrip(table);
 }
 
 TYPED_TEST(TestParquetIO, SingleNullableListRequiredListRequiredColumnReadWrite) {
   std::shared_ptr<Table> table;
   this->PrepareListOfListTable(SMALL_SIZE, true, false, false, 0, &table);
-  this->WriteReadAndCheckSingleColumnTable(table);
+  this->CheckRoundTrip(table);
 }
 
 TYPED_TEST(TestParquetIO, SingleColumnRequiredChunkedWrite) {
@@ -756,18 +783,24 @@ TYPED_TEST(TestPrimitiveParquetIO, SingleColumnRequiredChunkedTableRead) {
   this->CheckSingleColumnRequiredTableRead(4);
 }
 
-void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out) {
+void MakeDoubleTable(
+    int num_columns, int num_rows, int nchunks, std::shared_ptr<Table>* out) {
   std::shared_ptr<::arrow::Column> column;
   std::vector<std::shared_ptr<::arrow::Column>> columns(num_columns);
   std::vector<std::shared_ptr<::arrow::Field>> fields(num_columns);
 
-  std::shared_ptr<Array> values;
   for (int i = 0; i < num_columns; ++i) {
+    std::vector<std::shared_ptr<Array>> arrays;
+    std::shared_ptr<Array> values;
     ASSERT_OK(NullableArray<::arrow::DoubleType>(
         num_rows, num_rows / 10, static_cast<uint32_t>(i), &values));
     std::stringstream ss;
     ss << "col" << i;
-    column = MakeColumn(ss.str(), values, true);
+
+    for (int j = 0; j < nchunks; ++j) {
+      arrays.push_back(values);
+    }
+    column = MakeColumn(ss.str(), arrays, true);
 
     columns[i] = column;
     fields[i] = column->field();
@@ -776,41 +809,46 @@ void MakeDoubleTable(int num_columns, int num_rows, std::shared_ptr<Table>* out)
   *out = std::make_shared<Table>(schema, columns);
 }
 
-void DoTableRoundtrip(const std::shared_ptr<Table>& table, int num_threads,
-    const std::vector<int>& column_subset, std::shared_ptr<Table>* out) {
-  auto sink = std::make_shared<InMemoryOutputStream>();
-
-  ASSERT_OK_NO_THROW(WriteTable(
-      *table, ::arrow::default_memory_pool(), sink, (table->num_rows() + 1) / 2));
+TEST(TestArrowReadWrite, MultithreadedRead) {
+  const int num_columns = 20;
+  const int num_rows = 1000;
+  const int num_threads = 4;
 
-  std::shared_ptr<Buffer> buffer = sink->GetBuffer();
-  std::unique_ptr<FileReader> reader;
-  ASSERT_OK_NO_THROW(
-      OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
-          ::parquet::default_reader_properties(), nullptr, &reader));
+  std::shared_ptr<Table> table;
+  MakeDoubleTable(num_columns, num_rows, 1, &table);
 
-  reader->set_num_threads(num_threads);
+  std::shared_ptr<Table> result;
+  DoSimpleRoundtrip(table, num_threads, table->num_rows(), {}, &result);
 
-  if (column_subset.size() > 0) {
-    ASSERT_OK_NO_THROW(reader->ReadTable(column_subset, out));
-  } else {
-    // Read everything
-    ASSERT_OK_NO_THROW(reader->ReadTable(out));
-  }
+  ASSERT_TRUE(table->Equals(*result));
 }
 
-TEST(TestArrowReadWrite, MultithreadedRead) {
+TEST(TestArrowReadWrite, ReadSingleRowGroup) {
   const int num_columns = 20;
   const int num_rows = 1000;
-  const int num_threads = 4;
 
   std::shared_ptr<Table> table;
-  MakeDoubleTable(num_columns, num_rows, &table);
+  MakeDoubleTable(num_columns, num_rows, 1, &table);
 
-  std::shared_ptr<Table> result;
-  DoTableRoundtrip(table, num_threads, {}, &result);
+  std::shared_ptr<Buffer> buffer;
+  WriteTableToBuffer(table, 1, num_rows / 2, &buffer);
 
-  ASSERT_TRUE(table->Equals(*result));
+  std::unique_ptr<FileReader> reader;
+  ASSERT_OK_NO_THROW(
+      OpenFile(std::make_shared<BufferReader>(buffer), ::arrow::default_memory_pool(),
+          ::parquet::default_reader_properties(), nullptr, &reader));
+
+  ASSERT_EQ(2, reader->num_row_groups());
+
+  std::shared_ptr<Table> r1, r2;
+  // Read everything
+  ASSERT_OK_NO_THROW(reader->ReadRowGroup(0, &r1));
+  ASSERT_OK_NO_THROW(reader->ReadRowGroup(1, &r2));
+
+  std::shared_ptr<Table> concatenated;
+  ASSERT_OK(ConcatenateTables({r1, r2}, &concatenated));
+
+  ASSERT_TRUE(table->Equals(*concatenated));
 }
 
 TEST(TestArrowReadWrite, ReadColumnSubset) {
@@ -819,11 +857,11 @@ TEST(TestArrowReadWrite, ReadColumnSubset) {
   const int num_threads = 4;
 
   std::shared_ptr<Table> table;
-  MakeDoubleTable(num_columns, num_rows, &table);
+  MakeDoubleTable(num_columns, num_rows, 1, &table);
 
   std::shared_ptr<Table> result;
   std::vector<int> column_subset = {0, 4, 8, 10};
-  DoTableRoundtrip(table, num_threads, column_subset, &result);
+  DoSimpleRoundtrip(table, num_threads, table->num_rows(), column_subset, &result);
 
   std::vector<std::shared_ptr<::arrow::Column>> ex_columns;
   std::vector<std::shared_ptr<::arrow::Field>> ex_fields;

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/reader.cc
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.cc b/src/parquet/arrow/reader.cc
index a26c3ea..823aea9 100644
--- a/src/parquet/arrow/reader.cc
+++ b/src/parquet/arrow/reader.cc
@@ -60,19 +60,139 @@ static inline int64_t impala_timestamp_to_nanoseconds(const Int96& impala_timest
 template <typename ArrowType>
 using ArrayType = typename ::arrow::TypeTraits<ArrowType>::ArrayType;
 
+// ----------------------------------------------------------------------
+// Helper for parallel for-loop
+
+template <class FUNCTION>
+Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
+  std::vector<std::thread> thread_pool;
+  thread_pool.reserve(nthreads);
+  std::atomic<int> task_counter(0);
+
+  std::mutex error_mtx;
+  bool error_occurred = false;
+  Status error;
+
+  for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
+    thread_pool.emplace_back(
+        [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
+          int task_id;
+          while (!error_occurred) {
+            task_id = task_counter.fetch_add(1);
+            if (task_id >= num_tasks) { break; }
+            Status s = func(task_id);
+            if (!s.ok()) {
+              std::lock_guard<std::mutex> lock(error_mtx);
+              error_occurred = true;
+              error = s;
+              break;
+            }
+          }
+        });
+  }
+  for (auto&& thread : thread_pool) {
+    thread.join();
+  }
+  if (error_occurred) { return error; }
+  return Status::OK();
+}
+
+// ----------------------------------------------------------------------
+// Iteration utilities
+
+// Abstraction to decouple row group iteration details from the ColumnReader,
+// so we can read only a single row group if we want
+class FileColumnIterator {
+ public:
+  explicit FileColumnIterator(int column_index, ParquetFileReader* reader)
+      : column_index_(column_index),
+        reader_(reader),
+        schema_(reader->metadata()->schema()) {}
+
+  virtual ~FileColumnIterator() {}
+
+  virtual std::shared_ptr<::parquet::ColumnReader> Next() = 0;
+
+  const SchemaDescriptor* schema() const { return schema_; }
+
+  const ColumnDescriptor* descr() const { return schema_->Column(column_index_); }
+
+  int column_index() const { return column_index_; }
+
+ protected:
+  int column_index_;
+  ParquetFileReader* reader_;
+  const SchemaDescriptor* schema_;
+};
+
+class AllRowGroupsIterator : public FileColumnIterator {
+ public:
+  explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
+      : FileColumnIterator(column_index, reader), next_row_group_(0) {}
+
+  std::shared_ptr<::parquet::ColumnReader> Next() override {
+    std::shared_ptr<::parquet::ColumnReader> result;
+    if (next_row_group_ < reader_->metadata()->num_row_groups()) {
+      result = reader_->RowGroup(next_row_group_)->Column(column_index_);
+      next_row_group_++;
+    } else {
+      result = nullptr;
+    }
+    return result;
+  };
+
+ private:
+  int next_row_group_;
+};
+
+class SingleRowGroupIterator : public FileColumnIterator {
+ public:
+  explicit SingleRowGroupIterator(
+      int column_index, int row_group_number, ParquetFileReader* reader)
+      : FileColumnIterator(column_index, reader),
+        row_group_number_(row_group_number),
+        done_(false) {}
+
+  std::shared_ptr<::parquet::ColumnReader> Next() override {
+    if (done_) { return nullptr; }
+
+    auto result = reader_->RowGroup(row_group_number_)->Column(column_index_);
+    done_ = true;
+    return result;
+  };
+
+ private:
+  int row_group_number_;
+  bool done_;
+};
+
+// ----------------------------------------------------------------------
+// File reader implementation
+
 class FileReader::Impl {
  public:
-  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader);
+  Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+      : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
+
   virtual ~Impl() {}
 
-  bool CheckForFlatColumn(const ColumnDescriptor* descr);
-  bool CheckForFlatListColumn(const ColumnDescriptor* descr);
   Status GetColumn(int i, std::unique_ptr<ColumnReader>* out);
   Status ReadColumn(int i, std::shared_ptr<Array>* out);
-  Status ReadTable(std::shared_ptr<Table>* out);
-  Status ReadTable(const std::vector<int>& column_indices, std::shared_ptr<Table>* out);
+  Status GetSchema(
+      const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out);
+  Status ReadRowGroup(int row_group_index, const std::vector<int>& indices,
+      std::shared_ptr<::arrow::Table>* out);
+  Status ReadTable(const std::vector<int>& indices, std::shared_ptr<Table>* table);
+  Status ReadTable(std::shared_ptr<Table>* table);
+  Status ReadRowGroup(int i, std::shared_ptr<Table>* table);
+
+  bool CheckForFlatColumn(const ColumnDescriptor* descr);
+  bool CheckForFlatListColumn(const ColumnDescriptor* descr);
+
   const ParquetFileReader* parquet_reader() const { return reader_.get(); }
 
+  int num_row_groups() const { return reader_->metadata()->num_row_groups(); }
+
   void set_num_threads(int num_threads) { num_threads_ = num_threads; }
 
  private:
@@ -84,8 +204,17 @@ class FileReader::Impl {
 
 class ColumnReader::Impl {
  public:
-  Impl(MemoryPool* pool, const ColumnDescriptor* descr, ParquetFileReader* reader,
-      int column_index);
+  Impl(MemoryPool* pool, std::unique_ptr<FileColumnIterator> input)
+      : pool_(pool),
+        input_(std::move(input)),
+        descr_(input_->descr()),
+        values_buffer_(pool),
+        def_levels_buffer_(pool),
+        rep_levels_buffer_(pool) {
+    NodeToField(input_->descr()->schema_node(), &field_);
+    NextRowGroup();
+  }
+
   virtual ~Impl() {}
 
   Status NextBatch(int batch_size, std::shared_ptr<Array>* out);
@@ -121,10 +250,9 @@ class ColumnReader::Impl {
   };
 
   MemoryPool* pool_;
+  std::unique_ptr<FileColumnIterator> input_;
   const ColumnDescriptor* descr_;
-  ParquetFileReader* reader_;
-  int column_index_;
-  int next_row_group_;
+
   std::shared_ptr<::parquet::ColumnReader> column_reader_;
   std::shared_ptr<Field> field_;
 
@@ -139,14 +267,16 @@ class ColumnReader::Impl {
   int64_t null_count_;
 };
 
-FileReader::Impl::Impl(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
-    : pool_(pool), reader_(std::move(reader)), num_threads_(1) {}
+FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
+    : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+
+FileReader::~FileReader() {}
 
 Status FileReader::Impl::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
-  const SchemaDescriptor* schema = reader_->metadata()->schema();
+  std::unique_ptr<FileColumnIterator> input(new AllRowGroupsIterator(i, reader_.get()));
 
   std::unique_ptr<ColumnReader::Impl> impl(
-      new ColumnReader::Impl(pool_, schema->Column(i), reader_.get(), i));
+      new ColumnReader::Impl(pool_, std::move(input)));
   *out = std::unique_ptr<ColumnReader>(new ColumnReader(std::move(impl)));
   return Status::OK();
 }
@@ -163,55 +293,59 @@ Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<Array>* out) {
   return flat_column_reader->NextBatch(batch_size, out);
 }
 
-Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
-  std::vector<int> column_indices(reader_->metadata()->num_columns());
-
-  for (size_t i = 0; i < column_indices.size(); ++i) {
-    column_indices[i] = i;
-  }
-  return ReadTable(column_indices, table);
+Status FileReader::Impl::GetSchema(
+    const std::vector<int>& indices, std::shared_ptr<::arrow::Schema>* out) {
+  auto descr = reader_->metadata()->schema();
+  return FromParquetSchema(descr, indices, out);
 }
 
-template <class FUNCTION>
-Status ParallelFor(int nthreads, int num_tasks, FUNCTION&& func) {
-  std::vector<std::thread> thread_pool;
-  thread_pool.reserve(nthreads);
-  std::atomic<int> task_counter(0);
+Status FileReader::Impl::ReadRowGroup(int row_group_index,
+    const std::vector<int>& indices, std::shared_ptr<::arrow::Table>* out) {
+  std::shared_ptr<::arrow::Schema> schema;
+  RETURN_NOT_OK(GetSchema(indices, &schema));
 
-  std::mutex error_mtx;
-  bool error_occurred = false;
-  Status error;
+  auto rg_metadata = reader_->metadata()->RowGroup(row_group_index);
 
-  for (int thread_id = 0; thread_id < nthreads; ++thread_id) {
-    thread_pool.emplace_back(
-        [&num_tasks, &task_counter, &error, &error_occurred, &error_mtx, &func]() {
-          int task_id;
-          while (!error_occurred) {
-            task_id = task_counter.fetch_add(1);
-            if (task_id >= num_tasks) { break; }
-            Status s = func(task_id);
-            if (!s.ok()) {
-              std::lock_guard<std::mutex> lock(error_mtx);
-              error_occurred = true;
-              error = s;
-              break;
-            }
-          }
-        });
-  }
-  for (auto&& thread : thread_pool) {
-    thread.join();
+  int num_columns = static_cast<int>(indices.size());
+  int nthreads = std::min<int>(num_threads_, num_columns);
+  std::vector<std::shared_ptr<Column>> columns(num_columns);
+
+  // TODO(wesm): Refactor to share more code with ReadTable
+
+  auto ReadColumnFunc = [&indices, &row_group_index, &schema, &columns, &rg_metadata,
+      this](int i) {
+    int column_index = indices[i];
+    int64_t batch_size = rg_metadata->ColumnChunk(column_index)->num_values();
+
+    std::unique_ptr<FileColumnIterator> input(
+        new SingleRowGroupIterator(column_index, row_group_index, reader_.get()));
+
+    std::unique_ptr<ColumnReader::Impl> impl(
+        new ColumnReader::Impl(pool_, std::move(input)));
+    ColumnReader flat_column_reader(std::move(impl));
+
+    std::shared_ptr<Array> array;
+    RETURN_NOT_OK(flat_column_reader.NextBatch(batch_size, &array));
+    columns[i] = std::make_shared<Column>(schema->field(i), array);
+    return Status::OK();
+  };
+
+  if (nthreads == 1) {
+    for (int i = 0; i < num_columns; i++) {
+      RETURN_NOT_OK(ReadColumnFunc(i));
+    }
+  } else {
+    RETURN_NOT_OK(ParallelFor(nthreads, num_columns, ReadColumnFunc));
   }
-  if (error_occurred) { return error; }
+
+  *out = std::make_shared<Table>(schema, columns);
   return Status::OK();
 }
 
 Status FileReader::Impl::ReadTable(
     const std::vector<int>& indices, std::shared_ptr<Table>* table) {
-  auto descr = reader_->metadata()->schema();
-
   std::shared_ptr<::arrow::Schema> schema;
-  RETURN_NOT_OK(FromParquetSchema(descr, indices, &schema));
+  RETURN_NOT_OK(GetSchema(indices, &schema));
 
   int num_columns = static_cast<int>(indices.size());
   int nthreads = std::min<int>(num_threads_, num_columns);
@@ -236,10 +370,23 @@ Status FileReader::Impl::ReadTable(
   return Status::OK();
 }
 
-FileReader::FileReader(MemoryPool* pool, std::unique_ptr<ParquetFileReader> reader)
-    : impl_(new FileReader::Impl(pool, std::move(reader))) {}
+Status FileReader::Impl::ReadTable(std::shared_ptr<Table>* table) {
+  std::vector<int> indices(reader_->metadata()->num_columns());
 
-FileReader::~FileReader() {}
+  for (size_t i = 0; i < indices.size(); ++i) {
+    indices[i] = i;
+  }
+  return ReadTable(indices, table);
+}
+
+Status FileReader::Impl::ReadRowGroup(int i, std::shared_ptr<Table>* table) {
+  std::vector<int> indices(reader_->metadata()->num_columns());
+
+  for (size_t i = 0; i < indices.size(); ++i) {
+    indices[i] = i;
+  }
+  return ReadRowGroup(i, indices, table);
+}
 
 // Static ctor
 Status OpenFile(const std::shared_ptr<::arrow::io::ReadableFileInterface>& file,
@@ -280,14 +427,35 @@ Status FileReader::ReadTable(std::shared_ptr<Table>* out) {
 }
 
 Status FileReader::ReadTable(
-    const std::vector<int>& column_indices, std::shared_ptr<Table>* out) {
+    const std::vector<int>& indices, std::shared_ptr<Table>* out) {
+  try {
+    return impl_->ReadTable(indices, out);
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+}
+
+Status FileReader::ReadRowGroup(int i, std::shared_ptr<Table>* out) {
+  try {
+    return impl_->ReadRowGroup(i, out);
+  } catch (const ::parquet::ParquetException& e) {
+    return ::arrow::Status::IOError(e.what());
+  }
+}
+
+Status FileReader::ReadRowGroup(
+    int i, const std::vector<int>& indices, std::shared_ptr<Table>* out) {
   try {
-    return impl_->ReadTable(column_indices, out);
+    return impl_->ReadRowGroup(i, indices, out);
   } catch (const ::parquet::ParquetException& e) {
     return ::arrow::Status::IOError(e.what());
   }
 }
 
+int FileReader::num_row_groups() const {
+  return impl_->num_row_groups();
+}
+
 void FileReader::set_num_threads(int num_threads) {
   impl_->set_num_threads(num_threads);
 }
@@ -296,20 +464,6 @@ const ParquetFileReader* FileReader::parquet_reader() const {
   return impl_->parquet_reader();
 }
 
-ColumnReader::Impl::Impl(MemoryPool* pool, const ColumnDescriptor* descr,
-    ParquetFileReader* reader, int column_index)
-    : pool_(pool),
-      descr_(descr),
-      reader_(reader),
-      column_index_(column_index),
-      next_row_group_(0),
-      values_buffer_(pool),
-      def_levels_buffer_(pool),
-      rep_levels_buffer_(pool) {
-  NodeToField(descr_->schema_node(), &field_);
-  NextRowGroup();
-}
-
 template <typename ArrowType, typename ParquetType>
 Status ColumnReader::Impl::ReadNonNullableBatch(TypedColumnReader<ParquetType>* reader,
     int64_t values_to_read, int64_t* levels_read) {
@@ -563,7 +717,7 @@ Status ColumnReader::Impl::WrapIntoListArray(const int16_t* def_levels,
   if (descr_->max_repetition_level() > 0) {
     std::shared_ptr<::arrow::Schema> arrow_schema;
     RETURN_NOT_OK(
-        FromParquetSchema(reader_->metadata()->schema(), {column_index_}, &arrow_schema));
+        FromParquetSchema(input_->schema(), {input_->column_index()}, &arrow_schema));
 
     // Walk downwards to extract nullability
     std::shared_ptr<Field> current_field = arrow_schema->field(0);
@@ -912,12 +1066,7 @@ Status ColumnReader::Impl::NextBatch(int batch_size, std::shared_ptr<Array>* out
 }
 
 void ColumnReader::Impl::NextRowGroup() {
-  if (next_row_group_ < reader_->metadata()->num_row_groups()) {
-    column_reader_ = reader_->RowGroup(next_row_group_)->Column(column_index_);
-    next_row_group_++;
-  } else {
-    column_reader_ = nullptr;
-  }
+  column_reader_ = input_->Next();
 }
 
 ColumnReader::ColumnReader(std::unique_ptr<Impl> impl) : impl_(std::move(impl)) {}

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/reader.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/reader.h b/src/parquet/arrow/reader.h
index 1aa9c3e..f12acaf 100644
--- a/src/parquet/arrow/reader.h
+++ b/src/parquet/arrow/reader.h
@@ -107,6 +107,13 @@ class PARQUET_EXPORT FileReader {
   ::arrow::Status ReadTable(
       const std::vector<int>& column_indices, std::shared_ptr<::arrow::Table>* out);
 
+  ::arrow::Status ReadRowGroup(int i, const std::vector<int>& column_indices,
+      std::shared_ptr<::arrow::Table>* out);
+
+  ::arrow::Status ReadRowGroup(int i, std::shared_ptr<::arrow::Table>* out);
+
+  int num_row_groups() const;
+
   const ParquetFileReader* parquet_reader() const;
 
   /// Set the number of threads to use during reads of multiple columns. By

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/arrow/test-util.h
----------------------------------------------------------------------
diff --git a/src/parquet/arrow/test-util.h b/src/parquet/arrow/test-util.h
index 2cfc60a..bff952b 100644
--- a/src/parquet/arrow/test-util.h
+++ b/src/parquet/arrow/test-util.h
@@ -260,12 +260,18 @@ Status MakeListArary(const std::shared_ptr<Array>& values, int64_t size,
   return Status::OK();
 }
 
-std::shared_ptr<::arrow::Column> MakeColumn(
+static std::shared_ptr<::arrow::Column> MakeColumn(
     const std::string& name, const std::shared_ptr<Array>& array, bool nullable) {
   auto field = std::make_shared<::arrow::Field>(name, array->type(), nullable);
   return std::make_shared<::arrow::Column>(field, array);
 }
 
+static std::shared_ptr<::arrow::Column> MakeColumn(const std::string& name,
+    const std::vector<std::shared_ptr<Array>>& arrays, bool nullable) {
+  auto field = std::make_shared<::arrow::Field>(name, arrays[0]->type(), nullable);
+  return std::make_shared<::arrow::Column>(field, arrays);
+}
+
 std::shared_ptr<::arrow::Table> MakeSimpleTable(
     const std::shared_ptr<Array>& values, bool nullable) {
   std::shared_ptr<::arrow::Column> column = MakeColumn("col", values, nullable);

http://git-wip-us.apache.org/repos/asf/parquet-cpp/blob/d0646659/src/parquet/column/writer.cc
----------------------------------------------------------------------
diff --git a/src/parquet/column/writer.cc b/src/parquet/column/writer.cc
index 2ba4162..eb74147 100644
--- a/src/parquet/column/writer.cc
+++ b/src/parquet/column/writer.cc
@@ -213,7 +213,7 @@ TypedColumnWriter<Type>::TypedColumnWriter(ColumnChunkMetaDataBuilder* metadata,
     const WriterProperties* properties)
     : ColumnWriter(metadata, std::move(pager), expected_rows,
           (encoding == Encoding::PLAIN_DICTIONARY ||
-                       encoding == Encoding::RLE_DICTIONARY),
+              encoding == Encoding::RLE_DICTIONARY),
           encoding, properties) {
   switch (encoding) {
     case Encoding::PLAIN: