You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/24 01:34:01 UTC

[arrow] branch master updated: ARROW-1012: [C++] Configurable batch size for parquet RecordBatchReader

This is an automated email from the ASF dual-hosted git repository.

wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git


The following commit(s) were added to refs/heads/master by this push:
     new 532d4ba  ARROW-1012: [C++] Configurable batch size for parquet RecordBatchReader
532d4ba is described below

commit 532d4ba05a87e64a23e3a7b44bfbf34fa0c9a90b
Author: Hatem Helal <hh...@mathworks.com>
AuthorDate: Sun Jun 23 20:33:53 2019 -0500

    ARROW-1012: [C++] Configurable batch size for parquet RecordBatchReader
    
    This patch adds support for configuring the record batch size when reading a parquet file by adding a  `batch_size` to `ArrowReaderProperties`.
    
    Author: Hatem Helal <hh...@mathworks.com>
    
    Closes #4304 from hatemhelal/arrow-1012 and squashes the following commits:
    
    9ed935374 <Hatem Helal> update todo comment to be a bit more precise
    9f93da7e0 <Hatem Helal> rework existing RecordBatchReader test to cover the case where batch size is smaller than the row group
    0e2162849 <Hatem Helal> use deque instead of list
    108a5d775 <Hatem Helal> Change default bactch size to 64K and comment
    159b03041 <Hatem Helal> fix appveyor windows failure: must cast size_t to int
    b45782e56 <Hatem Helal> Initial attempt at supporting a configurable batch size for parquet RecordBatchReader
---
 cpp/src/parquet/arrow/arrow-reader-writer-test.cc |  18 +--
 cpp/src/parquet/arrow/reader.cc                   | 151 ++++++++++------------
 cpp/src/parquet/arrow/reader.h                    |  12 +-
 3 files changed, 86 insertions(+), 95 deletions(-)

diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index f59db1f..5781ad5 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1940,23 +1940,23 @@ TEST(TestArrowReadWrite, GetRecordBatchReader) {
   ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
                                              default_arrow_writer_properties(), &buffer));
 
+  ArrowReaderProperties properties = default_arrow_reader_properties();
+  properties.set_batch_size(100);
+
   std::unique_ptr<FileReader> reader;
   ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
-                              ::arrow::default_memory_pool(),
-                              ::parquet::default_reader_properties(), nullptr, &reader));
+                              ::arrow::default_memory_pool(), properties, &reader));
 
   std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
   ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader));
 
   std::shared_ptr<::arrow::RecordBatch> batch;
 
-  ASSERT_OK(rb_reader->ReadNext(&batch));
-  ASSERT_EQ(500, batch->num_rows());
-  ASSERT_EQ(20, batch->num_columns());
-
-  ASSERT_OK(rb_reader->ReadNext(&batch));
-  ASSERT_EQ(500, batch->num_rows());
-  ASSERT_EQ(20, batch->num_columns());
+  for (int i = 0; i < 10; ++i) {
+    ASSERT_OK(rb_reader->ReadNext(&batch));
+    ASSERT_EQ(100, batch->num_rows());
+    ASSERT_EQ(20, batch->num_columns());
+  }
 
   ASSERT_OK(rb_reader->ReadNext(&batch));
   ASSERT_EQ(nullptr, batch);
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 5665603..484719e 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -20,7 +20,9 @@
 #include <algorithm>
 #include <climits>
 #include <cstring>
+#include <deque>
 #include <future>
+#include <numeric>
 #include <type_traits>
 #include <utility>
 #include <vector>
@@ -110,14 +112,37 @@ ArrowReaderProperties default_arrow_reader_properties() {
 // so we can read only a single row group if we want
 class FileColumnIterator {
  public:
-  explicit FileColumnIterator(int column_index, ParquetFileReader* reader)
+  explicit FileColumnIterator(int column_index, ParquetFileReader* reader,
+                              std::vector<int> row_groups)
       : column_index_(column_index),
         reader_(reader),
-        schema_(reader->metadata()->schema()) {}
+        schema_(reader->metadata()->schema()),
+        row_groups_(row_groups.begin(), row_groups.end()) {}
 
   virtual ~FileColumnIterator() {}
 
-  virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0;
+  std::unique_ptr<::parquet::PageReader> NextChunk() {
+    if (row_groups_.empty()) {
+      return nullptr;
+    }
+
+    auto row_group_reader = reader_->RowGroup(row_groups_.front());
+    row_groups_.pop_front();
+    return row_group_reader->GetColumnPageReader(column_index_);
+  }
+
+  static FileColumnIterator* MakeAllRowGroupsIterator(int column_index,
+                                                      ParquetFileReader* reader) {
+    std::vector<int> row_groups(reader->metadata()->num_row_groups());
+    std::iota(row_groups.begin(), row_groups.end(), 0);
+    return new FileColumnIterator(column_index, reader, row_groups);
+  }
+
+  static FileColumnIterator* MakeSingleRowGroupIterator(int column_index,
+                                                        ParquetFileReader* reader,
+                                                        int row_group) {
+    return new FileColumnIterator(column_index, reader, {row_group});
+  }
 
   const SchemaDescriptor* schema() const { return schema_; }
 
@@ -131,50 +156,7 @@ class FileColumnIterator {
   int column_index_;
   ParquetFileReader* reader_;
   const SchemaDescriptor* schema_;
-};
-
-class AllRowGroupsIterator : public FileColumnIterator {
- public:
-  explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
-      : FileColumnIterator(column_index, reader), next_row_group_(0) {}
-
-  std::unique_ptr<::parquet::PageReader> NextChunk() override {
-    std::unique_ptr<::parquet::PageReader> result;
-    if (next_row_group_ < reader_->metadata()->num_row_groups()) {
-      result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_);
-      next_row_group_++;
-    } else {
-      result = nullptr;
-    }
-    return result;
-  }
-
- private:
-  int next_row_group_;
-};
-
-class SingleRowGroupIterator : public FileColumnIterator {
- public:
-  explicit SingleRowGroupIterator(int column_index, int row_group_number,
-                                  ParquetFileReader* reader)
-      : FileColumnIterator(column_index, reader),
-        row_group_number_(row_group_number),
-        done_(false) {}
-
-  std::unique_ptr<::parquet::PageReader> NextChunk() override {
-    if (done_) {
-      return nullptr;
-    }
-
-    auto result =
-        reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_);
-    done_ = true;
-    return result;
-  }
-
- private:
-  int row_group_number_;
-  bool done_;
+  std::deque<int> row_groups_;
 };
 
 class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
@@ -182,52 +164,56 @@ class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
   explicit RowGroupRecordBatchReader(const std::vector<int>& row_group_indices,
                                      const std::vector<int>& column_indices,
                                      std::shared_ptr<::arrow::Schema> schema,
-                                     FileReader* reader)
-      : row_group_indices_(row_group_indices),
+                                     FileReader* reader, int64_t batch_size)
+      : column_readers_(),
+        row_group_indices_(row_group_indices),
         column_indices_(column_indices),
         schema_(schema),
         file_reader_(reader),
-        next_row_group_(0) {}
+        batch_size_(batch_size) {}
 
   ~RowGroupRecordBatchReader() override {}
 
   std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
 
   Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
-    if (table_ != nullptr) {  // one row group has been loaded
-      std::shared_ptr<::arrow::RecordBatch> tmp;
-      RETURN_NOT_OK(table_batch_reader_->ReadNext(&tmp));
-      if (tmp != nullptr) {  // some column chunks are left in table
-        *out = tmp;
-        return Status::OK();
-      } else {  // the entire table is consumed
-        table_batch_reader_.reset();
-        table_.reset();
+    if (column_readers_.empty()) {
+      // Initialize the column readers
+      column_readers_.reserve(column_indices_.size());
+
+      for (size_t i = 0; i < column_indices_.size(); ++i) {
+        ColumnReaderPtr tmp;
+        RETURN_NOT_OK(file_reader_->GetColumn(column_indices_[i], &tmp));
+        column_readers_.emplace_back(std::move(tmp));
       }
     }
 
-    // all row groups has been consumed
-    if (next_row_group_ == row_group_indices_.size()) {
-      *out = nullptr;
-      return Status::OK();
-    }
+    // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this
+    // does not currently honor the use_threads option.
+    std::vector<std::shared_ptr<Column>> columns(column_indices_.size());
 
-    RETURN_NOT_OK(file_reader_->ReadRowGroup(row_group_indices_[next_row_group_],
-                                             column_indices_, &table_));
+    for (size_t i = 0; i < column_indices_.size(); ++i) {
+      std::shared_ptr<ChunkedArray> array;
+      RETURN_NOT_OK(column_readers_[i]->NextBatch(batch_size_, &array));
+      columns[i] = std::make_shared<Column>(schema_->field(static_cast<int>(i)), array);
+    }
 
-    next_row_group_++;
-    table_batch_reader_.reset(new ::arrow::TableBatchReader(*table_.get()));
-    return table_batch_reader_->ReadNext(out);
+    // Create an intermediate table and use TableBatchReader as an adaptor to a
+    // RecordBatch
+    std::shared_ptr<Table> table = Table::Make(schema_, columns);
+    RETURN_NOT_OK(table->Validate());
+    ::arrow::TableBatchReader table_batch_reader(*table);
+    return table_batch_reader.ReadNext(out);
   }
 
  private:
+  using ColumnReaderPtr = std::unique_ptr<ColumnReader>;
+  std::vector<ColumnReaderPtr> column_readers_;
   std::vector<int> row_group_indices_;
   std::vector<int> column_indices_;
   std::shared_ptr<::arrow::Schema> schema_;
   FileReader* file_reader_;
-  size_t next_row_group_;
-  std::shared_ptr<::arrow::Table> table_;
-  std::unique_ptr<::arrow::TableBatchReader> table_batch_reader_;
+  int64_t batch_size_;
 };
 
 // ----------------------------------------------------------------------
@@ -293,6 +279,8 @@ class FileReader::Impl {
       const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices,
       std::vector<std::shared_ptr<::arrow::Column>>& columns);
 
+  int64_t batch_size() const { return reader_properties_.batch_size(); }
+
  private:
   MemoryPool* pool_;
   std::unique_ptr<ParquetFileReader> reader_;
@@ -457,11 +445,8 @@ Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* o
 
 Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
                                          std::shared_ptr<ChunkedArray>* out) {
-  FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) {
-    return new AllRowGroupsIterator(i, reader);
-  };
+  auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
   auto parquet_schema = reader_->metadata()->schema();
-
   auto node = parquet_schema->group_node()->field(i).get();
   std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
 
@@ -486,9 +471,7 @@ Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
 }
 
 Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
-  FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) {
-    return new AllRowGroupsIterator(i, reader);
-  };
+  auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
   std::unique_ptr<ColumnReader> flat_column_reader;
   RETURN_NOT_OK(GetColumn(i, iterator_factory, &flat_column_reader));
 
@@ -531,7 +514,7 @@ Status FileReader::Impl::ReadColumnChunk(int column_index,
 
   FileColumnIteratorFactory iterator_factory = [row_group_index](
                                                    int i, ParquetFileReader* reader) {
-    return new SingleRowGroupIterator(i, row_group_index, reader);
+    return FileColumnIterator::MakeSingleRowGroupIterator(i, reader, row_group_index);
   };
   RETURN_NOT_OK(
       GetReaderForNode(column_index, node, indices, 1, iterator_factory, &reader_impl));
@@ -752,9 +735,7 @@ Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
 }
 
 Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
-  FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) {
-    return new AllRowGroupsIterator(i, reader);
-  };
+  auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
   return impl_->GetColumn(i, iterator_factory, out);
 }
 
@@ -805,7 +786,7 @@ Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indice
 Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
                                         const std::vector<int>& column_indices,
                                         std::shared_ptr<RecordBatchReader>* out) {
-  // column indicies check
+  // column indices check
   std::shared_ptr<::arrow::Schema> schema;
   RETURN_NOT_OK(GetSchema(column_indices, &schema));
 
@@ -819,7 +800,7 @@ Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indice
   }
 
   *out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices,
-                                                     schema, this);
+                                                     schema, this, impl_->batch_size());
   return Status::OK();
 }
 
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index acdda71..48c9237 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -54,11 +54,16 @@ class RowGroupReader;
 
 static constexpr bool DEFAULT_USE_THREADS = false;
 
+// Default number of rows to read when using ::arrow::RecordBatchReader
+static constexpr int64_t DEFAULT_BATCH_SIZE = 64 * 1024;
+
 /// EXPERIMENTAL: Properties for configuring FileReader behavior.
 class PARQUET_EXPORT ArrowReaderProperties {
  public:
   explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS)
-      : use_threads_(use_threads), read_dict_indices_() {}
+      : use_threads_(use_threads),
+        read_dict_indices_(),
+        batch_size_(DEFAULT_BATCH_SIZE) {}
 
   void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
 
@@ -79,9 +84,14 @@ class PARQUET_EXPORT ArrowReaderProperties {
     }
   }
 
+  void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; }
+
+  int64_t batch_size() const { return batch_size_; }
+
  private:
   bool use_threads_;
   std::unordered_set<int> read_dict_indices_;
+  int64_t batch_size_;
 };
 
 /// EXPERIMENTAL: Constructs the default ArrowReaderProperties