You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/19 15:52:09 UTC

[incubator-doris] branch master updated: [Enhancement] improve parquet reader via arrow's prefetch and multi thread (#9472)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ef65f484df [Enhancement]  improve parquet reader via arrow's prefetch and multi thread (#9472)
ef65f484df is described below

commit ef65f484df70aede49777157657354b80039b5a7
Author: Lightman <31...@users.noreply.github.com>
AuthorDate: Thu May 19 23:52:01 2022 +0800

    [Enhancement]  improve parquet reader via arrow's prefetch and multi thread (#9472)
    
    * add ArrowReaderProperties to parquet::arrow::FileReader
    
    * support perfecth batch
---
 be/src/common/config.h         |   4 ++
 be/src/exec/parquet_reader.cpp | 137 ++++++++++++++++++++++++++++-------------
 be/src/exec/parquet_reader.h   |  23 ++++++-
 3 files changed, 118 insertions(+), 46 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 3f04a75ed9..fcfc2e3f78 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -736,6 +736,10 @@ CONF_Validator(string_type_length_soft_limit_bytes,
 // used for olap scanner to save memory, when the size of unused_object_pool
 // is greater than object_pool_buffer_size, release the object in the unused_object_pool.
 CONF_Int32(object_pool_buffer_size, "100");
+
+// ParquetReaderWrap prefetch buffer size
+CONF_Int32(parquet_reader_max_buffer_size, "50");
+
 } // namespace config
 
 } // namespace doris
diff --git a/be/src/exec/parquet_reader.cpp b/be/src/exec/parquet_reader.cpp
index 3d2555161c..53880cdbb3 100644
--- a/be/src/exec/parquet_reader.cpp
+++ b/be/src/exec/parquet_reader.cpp
@@ -18,9 +18,15 @@
 
 #include <arrow/array.h>
 #include <arrow/status.h>
+#include <arrow/type_fwd.h>
 #include <time.h>
 
+#include <algorithm>
+#include <mutex>
+#include <thread>
+
 #include "common/logging.h"
+#include "common/status.h"
 #include "exec/file_reader.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/TPaloBrokerService.h"
@@ -44,9 +50,6 @@ ParquetReaderWrap::ParquetReaderWrap(FileReader* file_reader, int32_t num_of_col
           _current_line_of_group(0),
           _current_line_of_batch(0) {
     _parquet = std::shared_ptr<ParquetFile>(new ParquetFile(file_reader));
-    _properties = parquet::ReaderProperties();
-    _properties.enable_buffered_stream();
-    _properties.set_buffer_size(65535);
 }
 
 ParquetReaderWrap::~ParquetReaderWrap() {
@@ -55,10 +58,23 @@ ParquetReaderWrap::~ParquetReaderWrap() {
 Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>& tuple_slot_descs,
                                               const std::string& timezone) {
     try {
-        // new file reader for parquet file
-        auto st = parquet::arrow::FileReader::Make(
-                arrow::default_memory_pool(),
-                parquet::ParquetFileReader::Open(_parquet, _properties), &_reader);
+        parquet::ArrowReaderProperties arrow_reader_properties =
+                parquet::default_arrow_reader_properties();
+        arrow_reader_properties.set_pre_buffer(true);
+        arrow_reader_properties.set_use_threads(true);
+        // Open Parquet file reader
+        auto reader_builder = parquet::arrow::FileReaderBuilder();
+        reader_builder.properties(arrow_reader_properties);
+
+        auto st = reader_builder.Open(_parquet);
+
+        if (!st.ok()) {
+            LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
+            return Status::InternalError("Failed to create file reader");
+        }
+
+        st = reader_builder.Build(&_reader);
+
         if (!st.ok()) {
             LOG(WARNING) << "failed to create parquet file reader, errmsg=" << st.ToString();
             return Status::InternalError("Failed to create file reader");
@@ -85,31 +101,23 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
 
         _timezone = timezone;
 
-        if (_current_line_of_group == 0) { // the first read
-            RETURN_IF_ERROR(column_indices(tuple_slot_descs));
-            // read batch
-            arrow::Status status = _reader->GetRecordBatchReader({_current_group},
-                                                                 _parquet_column_ids, &_rb_batch);
-            if (!status.ok()) {
-                LOG(WARNING) << "Get RecordBatch Failed. " << status.ToString();
-                return Status::InternalError(status.ToString());
-            }
-            status = _rb_batch->ReadNext(&_batch);
-            if (!status.ok()) {
-                LOG(WARNING) << "The first read record. " << status.ToString();
-                return Status::InternalError(status.ToString());
-            }
-            _current_line_of_batch = 0;
-            //save column type
-            std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
-            for (int i = 0; i < _parquet_column_ids.size(); i++) {
-                std::shared_ptr<arrow::Field> field = field_schema->field(i);
-                if (!field) {
-                    LOG(WARNING) << "Get field schema failed. Column order:" << i;
-                    return Status::InternalError(status.ToString());
-                }
-                _parquet_column_type.emplace_back(field->type()->id());
+        RETURN_IF_ERROR(column_indices(tuple_slot_descs));
+
+        std::thread thread(&ParquetReaderWrap::prefetch_batch, this);
+        thread.detach();
+
+        // read batch
+        RETURN_IF_ERROR(read_next_batch());
+        _current_line_of_batch = 0;
+        //save column type
+        std::shared_ptr<arrow::Schema> field_schema = _batch->schema();
+        for (int i = 0; i < _parquet_column_ids.size(); i++) {
+            std::shared_ptr<arrow::Field> field = field_schema->field(i);
+            if (!field) {
+                LOG(WARNING) << "Get field schema failed. Column order:" << i;
+                return Status::InternalError(_status.ToString());
             }
+            _parquet_column_type.emplace_back(field->type()->id());
         }
         return Status::OK();
     } catch (parquet::ParquetException& e) {
@@ -121,6 +129,8 @@ Status ParquetReaderWrap::init_parquet_reader(const std::vector<SlotDescriptor*>
 }
 
 void ParquetReaderWrap::close() {
+    _closed = true;
+    _queue_writer_cond.notify_one();
     arrow::Status st = _parquet->Close();
     if (!st.ok()) {
         LOG(WARNING) << "close parquet file error: " << st.ToString();
@@ -195,25 +205,15 @@ Status ParquetReaderWrap::read_record_batch(const std::vector<SlotDescriptor*>&
         _rows_of_group = _file_metadata->RowGroup(_current_group)
                                  ->num_rows(); //get rows of the current row group
         // read batch
-        arrow::Status status =
-                _reader->GetRecordBatchReader({_current_group}, _parquet_column_ids, &_rb_batch);
-        if (!status.ok()) {
-            return Status::InternalError("Get RecordBatchReader Failed.");
-        }
-        status = _rb_batch->ReadNext(&_batch);
-        if (!status.ok()) {
-            return Status::InternalError("Read Batch Error With Libarrow.");
-        }
+        RETURN_IF_ERROR(read_next_batch());
         _current_line_of_batch = 0;
     } else if (_current_line_of_batch >= _batch->num_rows()) {
         VLOG_DEBUG << "read_record_batch, current group id:" << _current_group
                    << " current line of batch:" << _current_line_of_batch
                    << " is larger than batch size:" << _batch->num_rows()
                    << ". start to read next batch";
-        arrow::Status status = _rb_batch->ReadNext(&_batch);
-        if (!status.ok()) {
-            return Status::InternalError("Read Batch Error With Libarrow.");
-        }
+        // read batch
+        RETURN_IF_ERROR(read_next_batch());
         _current_line_of_batch = 0;
     }
     return Status::OK();
@@ -553,6 +553,55 @@ Status ParquetReaderWrap::read(Tuple* tuple, const std::vector<SlotDescriptor*>&
     return read_record_batch(tuple_slot_descs, eof);
 }
 
+void ParquetReaderWrap::prefetch_batch() {
+    auto insert_batch = [this](const auto& batch) {
+        std::unique_lock<std::mutex> lock(_mtx);
+        while (!_closed && _queue.size() == _max_queue_size) {
+            _queue_writer_cond.wait_for(lock, std::chrono::seconds(1));
+        }
+        if (UNLIKELY(_closed)) {
+            return;
+        }
+        _queue.push_back(batch);
+        _queue_reader_cond.notify_one();
+    };
+    int current_group = 0;
+    while (true) {
+        if (_closed || current_group >= _total_groups) {
+            return;
+        }
+        _status = _reader->GetRecordBatchReader({current_group}, _parquet_column_ids, &_rb_batch);
+        if (!_status.ok()) {
+            _closed = true;
+            return;
+        }
+        arrow::RecordBatchVector batches;
+        _status = _rb_batch->ReadAll(&batches);
+        if (!_status.ok()) {
+            _closed = true;
+            return;
+        }
+        std::for_each(batches.begin(), batches.end(), insert_batch);
+        current_group++;
+    }
+}
+
+Status ParquetReaderWrap::read_next_batch() {
+    std::unique_lock<std::mutex> lock(_mtx);
+    while (!_closed && _queue.empty()) {
+        _queue_reader_cond.wait_for(lock, std::chrono::seconds(1));
+    }
+
+    if (UNLIKELY(_closed)) {
+        return Status::InternalError(_status.message());
+    }
+
+    _batch = _queue.front();
+    _queue.pop_front();
+    _queue_writer_cond.notify_one();
+    return Status::OK();
+}
+
 ParquetFile::ParquetFile(FileReader* file) : _file(file) {}
 
 ParquetFile::~ParquetFile() {
diff --git a/be/src/exec/parquet_reader.h b/be/src/exec/parquet_reader.h
index c62054800d..2323656d78 100644
--- a/be/src/exec/parquet_reader.h
+++ b/be/src/exec/parquet_reader.h
@@ -22,6 +22,7 @@
 #include <arrow/io/api.h>
 #include <arrow/io/file.h>
 #include <arrow/io/interfaces.h>
+#include <arrow/status.h>
 #include <parquet/api/reader.h>
 #include <parquet/api/writer.h>
 #include <parquet/arrow/reader.h>
@@ -29,10 +30,16 @@
 #include <parquet/exception.h>
 #include <stdint.h>
 
+#include <atomic>
+#include <condition_variable>
+#include <list>
 #include <map>
+#include <mutex>
 #include <string>
+#include <thread>
 
 #include "common/status.h"
+#include "common/config.h"
 #include "gen_cpp/PaloBrokerService_types.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "gen_cpp/Types_types.h"
@@ -51,7 +58,7 @@ class FileReader;
 class ParquetFile : public arrow::io::RandomAccessFile {
 public:
     ParquetFile(FileReader* file);
-    virtual ~ParquetFile();
+    ~ParquetFile() override;
     arrow::Result<int64_t> Read(int64_t nbytes, void* buffer) override;
     arrow::Result<int64_t> ReadAt(int64_t position, int64_t nbytes, void* out) override;
     arrow::Result<int64_t> GetSize() override;
@@ -92,9 +99,12 @@ private:
     Status handle_timestamp(const std::shared_ptr<arrow::TimestampArray>& ts_array, uint8_t* buf,
                             int32_t* wbtyes);
 
+private:
+    void prefetch_batch();
+    Status read_next_batch();
+
 private:
     const int32_t _num_of_columns_from_file;
-    parquet::ReaderProperties _properties;
     std::shared_ptr<ParquetFile> _parquet;
 
     // parquet file reader object
@@ -113,6 +123,15 @@ private:
     int _current_line_of_batch;
 
     std::string _timezone;
+
+private:
+    std::atomic<bool> _closed = false;
+    arrow::Status _status;
+    std::mutex _mtx;
+    std::condition_variable _queue_reader_cond;
+    std::condition_variable _queue_writer_cond;
+    std::list<std::shared_ptr<arrow::RecordBatch>> _queue;
+    const size_t _max_queue_size = config::parquet_reader_max_buffer_size;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org