You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@arrow.apache.org by we...@apache.org on 2019/06/24 01:34:01 UTC
[arrow] branch master updated: ARROW-1012: [C++] Configurable batch
size for parquet RecordBatchReader
This is an automated email from the ASF dual-hosted git repository.
wesm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/arrow.git
The following commit(s) were added to refs/heads/master by this push:
new 532d4ba ARROW-1012: [C++] Configurable batch size for parquet RecordBatchReader
532d4ba is described below
commit 532d4ba05a87e64a23e3a7b44bfbf34fa0c9a90b
Author: Hatem Helal <hh...@mathworks.com>
AuthorDate: Sun Jun 23 20:33:53 2019 -0500
ARROW-1012: [C++] Configurable batch size for parquet RecordBatchReader
This patch adds support for configuring the record batch size when reading a parquet file by adding a `batch_size` to `ArrowReaderProperties`.
Author: Hatem Helal <hh...@mathworks.com>
Closes #4304 from hatemhelal/arrow-1012 and squashes the following commits:
9ed935374 <Hatem Helal> update todo comment to be a bit more precise
9f93da7e0 <Hatem Helal> rework existing RecordBatchReader test to cover the case where batch size is smaller than the row group
0e2162849 <Hatem Helal> use deque instead of list
108a5d775 <Hatem Helal> Change default bactch size to 64K and comment
159b03041 <Hatem Helal> fix appveyor windows failure: must cast size_t to int
b45782e56 <Hatem Helal> Initial attempt at supporting a configurable batch size for parquet RecordBatchReader
---
cpp/src/parquet/arrow/arrow-reader-writer-test.cc | 18 +--
cpp/src/parquet/arrow/reader.cc | 151 ++++++++++------------
cpp/src/parquet/arrow/reader.h | 12 +-
3 files changed, 86 insertions(+), 95 deletions(-)
diff --git a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
index f59db1f..5781ad5 100644
--- a/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
+++ b/cpp/src/parquet/arrow/arrow-reader-writer-test.cc
@@ -1940,23 +1940,23 @@ TEST(TestArrowReadWrite, GetRecordBatchReader) {
ASSERT_NO_FATAL_FAILURE(WriteTableToBuffer(table, num_rows / 2,
default_arrow_writer_properties(), &buffer));
+ ArrowReaderProperties properties = default_arrow_reader_properties();
+ properties.set_batch_size(100);
+
std::unique_ptr<FileReader> reader;
ASSERT_OK_NO_THROW(OpenFile(std::make_shared<BufferReader>(buffer),
- ::arrow::default_memory_pool(),
- ::parquet::default_reader_properties(), nullptr, &reader));
+ ::arrow::default_memory_pool(), properties, &reader));
std::shared_ptr<::arrow::RecordBatchReader> rb_reader;
ASSERT_OK_NO_THROW(reader->GetRecordBatchReader({0, 1}, &rb_reader));
std::shared_ptr<::arrow::RecordBatch> batch;
- ASSERT_OK(rb_reader->ReadNext(&batch));
- ASSERT_EQ(500, batch->num_rows());
- ASSERT_EQ(20, batch->num_columns());
-
- ASSERT_OK(rb_reader->ReadNext(&batch));
- ASSERT_EQ(500, batch->num_rows());
- ASSERT_EQ(20, batch->num_columns());
+ for (int i = 0; i < 10; ++i) {
+ ASSERT_OK(rb_reader->ReadNext(&batch));
+ ASSERT_EQ(100, batch->num_rows());
+ ASSERT_EQ(20, batch->num_columns());
+ }
ASSERT_OK(rb_reader->ReadNext(&batch));
ASSERT_EQ(nullptr, batch);
diff --git a/cpp/src/parquet/arrow/reader.cc b/cpp/src/parquet/arrow/reader.cc
index 5665603..484719e 100644
--- a/cpp/src/parquet/arrow/reader.cc
+++ b/cpp/src/parquet/arrow/reader.cc
@@ -20,7 +20,9 @@
#include <algorithm>
#include <climits>
#include <cstring>
+#include <deque>
#include <future>
+#include <numeric>
#include <type_traits>
#include <utility>
#include <vector>
@@ -110,14 +112,37 @@ ArrowReaderProperties default_arrow_reader_properties() {
// so we can read only a single row group if we want
class FileColumnIterator {
public:
- explicit FileColumnIterator(int column_index, ParquetFileReader* reader)
+ explicit FileColumnIterator(int column_index, ParquetFileReader* reader,
+ std::vector<int> row_groups)
: column_index_(column_index),
reader_(reader),
- schema_(reader->metadata()->schema()) {}
+ schema_(reader->metadata()->schema()),
+ row_groups_(row_groups.begin(), row_groups.end()) {}
virtual ~FileColumnIterator() {}
- virtual std::unique_ptr<::parquet::PageReader> NextChunk() = 0;
+ std::unique_ptr<::parquet::PageReader> NextChunk() {
+ if (row_groups_.empty()) {
+ return nullptr;
+ }
+
+ auto row_group_reader = reader_->RowGroup(row_groups_.front());
+ row_groups_.pop_front();
+ return row_group_reader->GetColumnPageReader(column_index_);
+ }
+
+ static FileColumnIterator* MakeAllRowGroupsIterator(int column_index,
+ ParquetFileReader* reader) {
+ std::vector<int> row_groups(reader->metadata()->num_row_groups());
+ std::iota(row_groups.begin(), row_groups.end(), 0);
+ return new FileColumnIterator(column_index, reader, row_groups);
+ }
+
+ static FileColumnIterator* MakeSingleRowGroupIterator(int column_index,
+ ParquetFileReader* reader,
+ int row_group) {
+ return new FileColumnIterator(column_index, reader, {row_group});
+ }
const SchemaDescriptor* schema() const { return schema_; }
@@ -131,50 +156,7 @@ class FileColumnIterator {
int column_index_;
ParquetFileReader* reader_;
const SchemaDescriptor* schema_;
-};
-
-class AllRowGroupsIterator : public FileColumnIterator {
- public:
- explicit AllRowGroupsIterator(int column_index, ParquetFileReader* reader)
- : FileColumnIterator(column_index, reader), next_row_group_(0) {}
-
- std::unique_ptr<::parquet::PageReader> NextChunk() override {
- std::unique_ptr<::parquet::PageReader> result;
- if (next_row_group_ < reader_->metadata()->num_row_groups()) {
- result = reader_->RowGroup(next_row_group_)->GetColumnPageReader(column_index_);
- next_row_group_++;
- } else {
- result = nullptr;
- }
- return result;
- }
-
- private:
- int next_row_group_;
-};
-
-class SingleRowGroupIterator : public FileColumnIterator {
- public:
- explicit SingleRowGroupIterator(int column_index, int row_group_number,
- ParquetFileReader* reader)
- : FileColumnIterator(column_index, reader),
- row_group_number_(row_group_number),
- done_(false) {}
-
- std::unique_ptr<::parquet::PageReader> NextChunk() override {
- if (done_) {
- return nullptr;
- }
-
- auto result =
- reader_->RowGroup(row_group_number_)->GetColumnPageReader(column_index_);
- done_ = true;
- return result;
- }
-
- private:
- int row_group_number_;
- bool done_;
+ std::deque<int> row_groups_;
};
class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
@@ -182,52 +164,56 @@ class RowGroupRecordBatchReader : public ::arrow::RecordBatchReader {
explicit RowGroupRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::shared_ptr<::arrow::Schema> schema,
- FileReader* reader)
- : row_group_indices_(row_group_indices),
+ FileReader* reader, int64_t batch_size)
+ : column_readers_(),
+ row_group_indices_(row_group_indices),
column_indices_(column_indices),
schema_(schema),
file_reader_(reader),
- next_row_group_(0) {}
+ batch_size_(batch_size) {}
~RowGroupRecordBatchReader() override {}
std::shared_ptr<::arrow::Schema> schema() const override { return schema_; }
Status ReadNext(std::shared_ptr<::arrow::RecordBatch>* out) override {
- if (table_ != nullptr) { // one row group has been loaded
- std::shared_ptr<::arrow::RecordBatch> tmp;
- RETURN_NOT_OK(table_batch_reader_->ReadNext(&tmp));
- if (tmp != nullptr) { // some column chunks are left in table
- *out = tmp;
- return Status::OK();
- } else { // the entire table is consumed
- table_batch_reader_.reset();
- table_.reset();
+ if (column_readers_.empty()) {
+ // Initialize the column readers
+ column_readers_.reserve(column_indices_.size());
+
+ for (size_t i = 0; i < column_indices_.size(); ++i) {
+ ColumnReaderPtr tmp;
+ RETURN_NOT_OK(file_reader_->GetColumn(column_indices_[i], &tmp));
+ column_readers_.emplace_back(std::move(tmp));
}
}
- // all row groups has been consumed
- if (next_row_group_ == row_group_indices_.size()) {
- *out = nullptr;
- return Status::OK();
- }
+ // TODO (hatemhelal): Consider refactoring this to share logic with ReadTable as this
+ // does not currently honor the use_threads option.
+ std::vector<std::shared_ptr<Column>> columns(column_indices_.size());
- RETURN_NOT_OK(file_reader_->ReadRowGroup(row_group_indices_[next_row_group_],
- column_indices_, &table_));
+ for (size_t i = 0; i < column_indices_.size(); ++i) {
+ std::shared_ptr<ChunkedArray> array;
+ RETURN_NOT_OK(column_readers_[i]->NextBatch(batch_size_, &array));
+ columns[i] = std::make_shared<Column>(schema_->field(static_cast<int>(i)), array);
+ }
- next_row_group_++;
- table_batch_reader_.reset(new ::arrow::TableBatchReader(*table_.get()));
- return table_batch_reader_->ReadNext(out);
+ // Create an intermediate table and use TableBatchReader as an adaptor to a
+ // RecordBatch
+ std::shared_ptr<Table> table = Table::Make(schema_, columns);
+ RETURN_NOT_OK(table->Validate());
+ ::arrow::TableBatchReader table_batch_reader(*table);
+ return table_batch_reader.ReadNext(out);
}
private:
+ using ColumnReaderPtr = std::unique_ptr<ColumnReader>;
+ std::vector<ColumnReaderPtr> column_readers_;
std::vector<int> row_group_indices_;
std::vector<int> column_indices_;
std::shared_ptr<::arrow::Schema> schema_;
FileReader* file_reader_;
- size_t next_row_group_;
- std::shared_ptr<::arrow::Table> table_;
- std::unique_ptr<::arrow::TableBatchReader> table_batch_reader_;
+ int64_t batch_size_;
};
// ----------------------------------------------------------------------
@@ -293,6 +279,8 @@ class FileReader::Impl {
const ::arrow::Schema& old_schema, const std::vector<int>& dict_indices,
std::vector<std::shared_ptr<::arrow::Column>>& columns);
+ int64_t batch_size() const { return reader_properties_.batch_size(); }
+
private:
MemoryPool* pool_;
std::unique_ptr<ParquetFileReader> reader_;
@@ -457,11 +445,8 @@ Status FileReader::Impl::ReadSchemaField(int i, std::shared_ptr<ChunkedArray>* o
Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
std::shared_ptr<ChunkedArray>* out) {
- FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) {
- return new AllRowGroupsIterator(i, reader);
- };
+ auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
auto parquet_schema = reader_->metadata()->schema();
-
auto node = parquet_schema->group_node()->field(i).get();
std::unique_ptr<ColumnReader::ColumnReaderImpl> reader_impl;
@@ -486,9 +471,7 @@ Status FileReader::Impl::ReadSchemaField(int i, const std::vector<int>& indices,
}
Status FileReader::Impl::ReadColumn(int i, std::shared_ptr<ChunkedArray>* out) {
- FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) {
- return new AllRowGroupsIterator(i, reader);
- };
+ auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
std::unique_ptr<ColumnReader> flat_column_reader;
RETURN_NOT_OK(GetColumn(i, iterator_factory, &flat_column_reader));
@@ -531,7 +514,7 @@ Status FileReader::Impl::ReadColumnChunk(int column_index,
FileColumnIteratorFactory iterator_factory = [row_group_index](
int i, ParquetFileReader* reader) {
- return new SingleRowGroupIterator(i, row_group_index, reader);
+ return FileColumnIterator::MakeSingleRowGroupIterator(i, reader, row_group_index);
};
RETURN_NOT_OK(
GetReaderForNode(column_index, node, indices, 1, iterator_factory, &reader_impl));
@@ -752,9 +735,7 @@ Status OpenFile(const std::shared_ptr<::arrow::io::RandomAccessFile>& file,
}
Status FileReader::GetColumn(int i, std::unique_ptr<ColumnReader>* out) {
- FileColumnIteratorFactory iterator_factory = [](int i, ParquetFileReader* reader) {
- return new AllRowGroupsIterator(i, reader);
- };
+ auto iterator_factory = FileColumnIterator::MakeAllRowGroupsIterator;
return impl_->GetColumn(i, iterator_factory, out);
}
@@ -805,7 +786,7 @@ Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indice
Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indices,
const std::vector<int>& column_indices,
std::shared_ptr<RecordBatchReader>* out) {
- // column indicies check
+ // column indices check
std::shared_ptr<::arrow::Schema> schema;
RETURN_NOT_OK(GetSchema(column_indices, &schema));
@@ -819,7 +800,7 @@ Status FileReader::GetRecordBatchReader(const std::vector<int>& row_group_indice
}
*out = std::make_shared<RowGroupRecordBatchReader>(row_group_indices, column_indices,
- schema, this);
+ schema, this, impl_->batch_size());
return Status::OK();
}
diff --git a/cpp/src/parquet/arrow/reader.h b/cpp/src/parquet/arrow/reader.h
index acdda71..48c9237 100644
--- a/cpp/src/parquet/arrow/reader.h
+++ b/cpp/src/parquet/arrow/reader.h
@@ -54,11 +54,16 @@ class RowGroupReader;
static constexpr bool DEFAULT_USE_THREADS = false;
+// Default number of rows to read when using ::arrow::RecordBatchReader
+static constexpr int64_t DEFAULT_BATCH_SIZE = 64 * 1024;
+
/// EXPERIMENTAL: Properties for configuring FileReader behavior.
class PARQUET_EXPORT ArrowReaderProperties {
public:
explicit ArrowReaderProperties(bool use_threads = DEFAULT_USE_THREADS)
- : use_threads_(use_threads), read_dict_indices_() {}
+ : use_threads_(use_threads),
+ read_dict_indices_(),
+ batch_size_(DEFAULT_BATCH_SIZE) {}
void set_use_threads(bool use_threads) { use_threads_ = use_threads; }
@@ -79,9 +84,14 @@ class PARQUET_EXPORT ArrowReaderProperties {
}
}
+ void set_batch_size(int64_t batch_size) { batch_size_ = batch_size; }
+
+ int64_t batch_size() const { return batch_size_; }
+
private:
bool use_threads_;
std::unordered_set<int> read_dict_indices_;
+ int64_t batch_size_;
};
/// EXPERIMENTAL: Constructs the default ArrowReaderProperties