You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/19 15:52:09 UTC
[incubator-doris] branch master updated: [Enhancement] improve parquet reader via arrow's prefetch and multi thread (#9472)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new ef65f484df [Enhancement] improve parquet reader via arrow's prefetch and multi thread (#9472)
ef65f484df is described below
commit ef65f484df70aede49777157657354b80039b5a7
Author: Lightman <31...@users.noreply.github.com>
AuthorDate: Thu May 19 23:52:01 2022 +0800
[Enhancement] improve parquet reader via arrow's prefetch and multi thread (#9472)
* add ArrowReaderProperties to parquet::arrow::FileReader
* support perfecth batch
---
be/src/common/config.h | 4 ++
be/src/exec/parquet_reader.cpp | 137 ++++++++++++++++++++++++++++-------------
be/src/exec/parquet_reader.h | 23 ++++++-
3 files changed, 118 insertions(+), 46 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 3f04a75ed9..fcfc2e3f78 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -736,6 +736,10 @@ CONF_Validator(string_type_length_soft_limit_bytes,
// used for olap scanner to save memory, when the size of unused_object_pool
// is greater than object_pool_buffer_size, release the object in the unused_object_pool.
CONF_Int32(object_pool_buffer_size, "100");
+
+// ParquetReaderWrap prefetch buffer size
+CONF_Int32(parquet_reader_max_buffer_size, "50");
+
} // namespace config
} // namespace doris
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index 3d2555161c..53880cdbb3 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -18,9 +18,15 @@
#include <arrow/array.h>
#include <arrow/status.h>
+#include <arrow/type_fwd.h>
#include <time.h>
+#include <algorithm>
+#include <mutex>
+#include <thread>
+
#include "common/logging.h"
+#include "common/status.h"
#include "exec/file_reader.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/TPaloBrokerService.h"
@@ -44,9 +50,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_col
_current_line_of_group(0),
_current_line_of_batch(0) {
_parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader));
- _properties = parquet::ReaderProperties();
- _properties.enable_buffered_stream();
- _properties.set_buffer_size(65535);
}
ParquetReaderWrap::~ParquetReaderWrap() {
@@ -55,10 +58,23 @@ ParquetReaderWrap::~ParquetReaderWrap() {
Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
const std::string& timezone) {
try {
- // new file reader for parquet file
- auto st = parquet::arrow::FileReader::Make(
- arrow::default_memory_pool(),
- parquet::ParquetFileReader::Open(_parquet, _properties), &_reader);
+ parquet::ArrowReaderProperties arrow_reader_properties =
+ parquet::default_arrow_reader_properties();
+ arrow_reader_properties.set_pre_buffer(true);
+ arrow_reader_properties.set_use_threads(true);
+ // Open Parquet file reader
+ auto reader_builder = parquet::arrow::FileReaderBuilder();
+ reader_builder.properties(arrow_reader_properties);
+
+ auto st = reader_builder.Open(_parquet);
+
+ if (!st.ok()) {
+ LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
+ return Status::InternalError("Failed to create file reader");
+ }
+
+ st = reader_builder.Build(&_reader);
+
if (!st.ok()) {
LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
return Status::InternalError("Failed to create file reader");
@@ -85,31 +101,23 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
_timezone = timezone;
- if (_current_line_of_group == 0) { // the first read
- RETURN_IF_ERROR(column_indices(tuple_slot_descs));
- // read batch
- arrow::Status status = _reader->GetRecordBatchReader({_current_group},
- _parquet_column_ids, &_rb_batch);
- if (!status.ok()) {
- LOG(WARNING) << "Get RecordBatch Failed. " << status.ToString();
- return Status::InternalError(status.ToString());
- }
- status = _rb_batch->ReadNext(&_batch);
- if (!status.ok()) {
- LOG(WARNING) << "The first read record. " << status.ToString();
- return Status::InternalError(status.ToString());
- }
- _current_line_of_batch = 0;
- //save column type
- std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
- for (int i = 0; i < _parquet_column_ids.size(); i++) {
- std::shared_ptr<arrow::Field> field = field_schema->field(i);
- if (!field) {
- LOG(WARNING) << "Get field schema failed. Column order:" << i;
- return Status::InternalError(status.ToString());
- }
- _parquet_column_type.emplace_back(field->type()->id());
+ RETURN_IF_ERROR(column_indices(tuple_slot_descs));
+
+ std::thread thread(&ParquetReaderWrap::prefetch_batch, this);
+ thread.detach();
+
+ // read batch
+ RETURN_IF_ERROR(read_next_batch());
+ _current_line_of_batch = 0;
+ //save column type
+ std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
+ for (int i = 0; i < _parquet_column_ids.size(); i++) {
+ std::shared_ptr<arrow::Field> field = field_schema->field(i);
+ if (!field) {
+ LOG(WARNING) << "Get field schema failed. Column order:" << i;
+ return Status::InternalError(_status.ToString());
}
+ _parquet_column_type.emplace_back(field->type()->id());
}
return Status::OK();
} catch (parquet::ParquetException& e) {
@@ -121,6 +129,8 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
}
void ParquetReaderWrap::close() {
+ _closed = true;
+ _queue_writer_cond.notify_one();
arrow::Status st = _parquet->Close();
if (!st.ok()) {
LOG(WARNING) << "close parquet file error: " << st.ToString();
@@ -195,25 +205,15 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
_rows_of_group = _file_metadata->RowGroup(_current_group)
->num_rows(); //get rows of the current row group
// read batch
- arrow::Status status =
- _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
- if (!status.ok()) {
- return Status::InternalError("Get RecordBatchReader Failed.");
- }
- status = _rb_batch->ReadNext(&_batch);
- if (!status.ok()) {
- return Status::InternalError("Read Batch Error With Libarrow.");
- }
+ RETURN_IF_ERROR(read_next_batch());
_current_line_of_batch = 0;
} else if (_current_line_of_batch >= _batch->num_rows()) {
VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
<< " current line of batch:" << _current_line_of_batch
<< " is larger than batch size:" << _batch->num_rows()
<< ". start to read next batch";
- arrow::Status status = _rb_batch->ReadNext(&_batch);
- if (!status.ok()) {
- return Status::InternalError("Read Batch Error With Libarrow.");
- }
+ // read batch
+ RETURN_IF_ERROR(read_next_batch());
_current_line_of_batch = 0;
}
return Status::OK();
@@ -553,6 +553,55 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
return read_record_batch(tuple_slot_descs, eof);
}
+void ParquetReaderWrap::prefetch_batch() {
+ auto insert_batch = [this](const auto& batch) {
+ std::unique_lock<std::mutex> lock(_mtx);
+ while (!_closed && _queue.size() == _max_queue_size) {
+ _queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
+ }
+ if (UNLIKELY(_closed)) {
+ return;
+ }
+ _queue.push_back(batch);
+ _queue_reader_cond.notify_one();
+ };
+ int current_group = 0;
+ while (true) {
+ if (_closed || current_group >= _total_groups) {
+ return;
+ }
+ _status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch);
+ if (!_status.ok()) {
+ _closed = true;
+ return;
+ }
+ arrow::RecordBatchVector batches;
+ _status = _rb_batch->ReadAll(&batches);
+ if (!_status.ok()) {
+ _closed = true;
+ return;
+ }
+ std::for_each(batches.begin(), batches.end(), insert_batch);
+ current_group++;
+ }
+}
+
+Status ParquetReaderWrap::read_next_batch() {
+ std::unique_lock<std::mutex> lock(_mtx);
+ while (!_closed && _queue.empty()) {
+ _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
+ }
+
+ if (UNLIKELY(_closed)) {
+ return Status::InternalError(_status.message());
+ }
+
+ _batch = _queue.front();
+ _queue.pop_front();
+ _queue_writer_cond.notify_one();
+ return Status::OK();
+}
+
ParquetFile::ParquetFile(FileReader* file) : _file(file) {}
ParquetFile::~ParquetFile() {
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h
index c62054800d..2323656d78 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/parquet_reader.h
@@ -22,6 +22,7 @@
#include <arrow/io/api.h>
#include <arrow/io/file.h>
#include <arrow/io/interfaces.h>
+#include <arrow/status.h>
#include <parquet/api/reader.h>
#include <parquet/api/writer.h>
#include <parquet/arrow/reader.h>
@@ -29,10 +30,16 @@
#include <parquet/exception.h>
#include <stdint.h>
+#include <atomic>
+#include <condition_variable>
+#include <list>
#include <map>
+#include <mutex>
#include <string>
+#include <thread>
#include "common/status.h"
+#include "common/config.h"
#include "gen_cpp/PaloBrokerService_types.h"
#include "gen_cpp/PlanNodes_types.h"
#include "gen_cpp/Types_types.h"
@@ -51,7 +58,7 @@ class FileReader;
class ParquetFile : public arrow::io::RandomAccessFile {
public:
ParquetFile(FileReader* file);
- virtual ~ParquetFile();
+ ~ParquetFile() override;
arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
arrow::Result<int64_t> GetSize() override;
@@ -92,9 +99,12 @@ private:
Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
int32_t* wbtyes);
+private:
+ void prefetch_batch();
+ Status read_next_batch();
+
private:
const int32_t _num_of_columns_from_file;
- parquet::ReaderProperties _properties;
std::shared_ptr<ParquetFile> _parquet;
// parquet file reader object
@@ -113,6 +123,15 @@ private:
int _current_line_of_batch;
std::string _timezone;
+
+private:
+ std::atomic<bool> _closed = false;
+ arrow::Status _status;
+ std::mutex _mtx;
+ std::condition_variable _queue_reader_cond;
+ std::condition_variable _queue_writer_cond;
+ std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
+ const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
};
} // namespace doris
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org