You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by "morningman (via GitHub)" <gi...@apache.org> on 2023/04/19 09:10:03 UTC

[GitHub] [doris] morningman commented on a diff in pull request #18796: [opt](FileReader) merge small IO to optimize read performace

morningman commented on code in PR #18796:
URL: https://github.com/apache/doris/pull/18796#discussion_r1170955215


##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -581,6 +582,42 @@ Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
     return Status::OK();
 }
 
+std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(

Review Comment:
   Have you tested that how many range may be generated for a large file?



##########
be/src/io/fs/buffered_reader.h:
##########
@@ -24,49 +24,309 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "io/cache/block/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
 #include "io/fs/file_reader.h"
+#include "io/fs/s3_file_reader.h"
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
+#include "vec/common/typeid_cast.h"
 
 namespace doris {
 namespace io {
 
+struct PrefetchRange {
+    size_t start_offset;
+    size_t end_offset;
+
+    PrefetchRange(size_t start_offset, size_t end_offset)
+            : start_offset(start_offset), end_offset(end_offset) {}
+
+    PrefetchRange(const PrefetchRange& other) = default;
+
+    PrefetchRange() : start_offset(0), end_offset(0) {}
+};
+
+/**
+ * A FileReader that efficiently supports random access in format like parquet and orc.
+ * In order to merge small IO in parquet and orc, the random access ranges should be generated
+ * when creating the reader. The random access ranges is a list of ranges that order by offset.
+ * The range in random access ranges should be reading sequentially, can be skipped, but can't be
+ * read repeatedly.
+ *
+ * For example, in parquet, the random access ranges is the column offsets in a row group.
+ *
+ * When reading at offset, if [offset, offset + 8MB) contains many random access ranges, the reader
+ * will read data in [offset, offset + 8MB) as a whole, and copy the data in random access ranges
+ * into small buffers(name as box, default 1MB, 64MB in total). A box can be occupied by many ranges,
+ * and use a reference counter to recorde how many ranges are cached in the box. If reference counter
+ * equals zero, the box can be release or reused by other ranges. When there is no empty box for a new
+ * read operation, the read operation will do directly.
+ */
+class RandomAccessReader : public io::FileReader {
+public:
+    static constexpr size_t TOTAL_BUFFER_SIZE = 64 * 1024 * 1024;   // 64MB
+    static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024;      // 8MB
+    static constexpr size_t BOX_SIZE = 1 * 1024 * 1024;             // 1MB
+    static constexpr size_t SMALL_IO = 2 * 1024 * 1024;             // 2MB
+    static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 64
+
+    RandomAccessReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
+                       const std::vector<PrefetchRange>& random_access_ranges)
+            : _profile(profile),
+              _reader(std::move(reader)),
+              _random_access_ranges(random_access_ranges) {
+        _range_cached_data.resize(random_access_ranges.size());
+        _size = _reader->size();
+        _remaining = TOTAL_BUFFER_SIZE;
+        if (_profile != nullptr) {
+            const char* random_profile = "MergedSmallIO";
+            ADD_TIMER(_profile, random_profile);
+            _copy_time = ADD_CHILD_TIMER(_profile, "CopyTime", random_profile);
+            _read_time = ADD_CHILD_TIMER(_profile, "ReadTime", random_profile);
+            _request_io = ADD_CHILD_COUNTER(_profile, "RequestIO", TUnit::UNIT, random_profile);
+            _merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT, random_profile);
+            _request_bytes =
+                    ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES, random_profile);
+            _read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes", TUnit::BYTES, random_profile);
+        }
+    }
+
+    ~RandomAccessReader() override {
+        if (_read_slice != nullptr) {
+            delete[] _read_slice;
+        }
+        for (char* box : _boxes) {
+            delete[] box;
+        }
+        close();
+    }
+
+    Status close() override {
+        if (!_closed) {
+            _closed = true;
+            // the underlying buffer is closed in its own destructor
+            // return _reader->close();
+            if (_profile != nullptr) {
+                COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+                COUNTER_UPDATE(_read_time, _statistics.read_time);
+                COUNTER_UPDATE(_request_io, _statistics.request_io);
+                COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+                COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+                COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
+            }
+        }
+        return Status::OK();
+    }
+
+    const io::Path& path() const override { return _reader->path(); }
+
+    size_t size() const override { return _size; }
+
+    bool closed() const override { return _closed; }
+
+    std::shared_ptr<io::FileSystem> fs() const override { return _reader->fs(); }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const IOContext* io_ctx) override;
+
+private:
+    struct RangeCachedData {
+        size_t start_offset;
+        size_t end_offset;
+        std::vector<int16> ref_box;
+        std::vector<uint32> box_start_offset;
+        std::vector<uint32> box_end_offset;
+        bool has_read = false;
+
+        RangeCachedData(size_t start_offset, size_t end_offset)
+                : start_offset(start_offset), end_offset(end_offset) {}
+
+        RangeCachedData() : start_offset(0), end_offset(0) {}
+
+        bool empty() const { return start_offset == end_offset; }
+
+        bool contains(size_t offset) const { return start_offset <= offset && offset < end_offset; }
+
+        void reset() {
+            start_offset = 0;
+            end_offset = 0;
+            ref_box.clear();
+            box_start_offset.clear();
+            box_end_offset.clear();
+        }
+
+        int16 release_last_box() {
+            // we can only release the last referenced box to ensure sequential read in range
+            if (!empty()) {
+                int16 last_box_ref = ref_box.back();
+                uint32 released_size = box_end_offset.back() - box_start_offset.back();
+                ref_box.pop_back();
+                box_start_offset.pop_back();
+                box_end_offset.pop_back();
+                end_offset -= released_size;
+                if (empty()) {
+                    reset();
+                }
+                return last_box_ref;
+            }
+            return -1;
+        }
+    };
+
+    struct Statistics {
+        int64_t copy_time = 0;
+        int64_t read_time = 0;
+        int64_t request_io = 0;
+        int64_t merged_io = 0;
+        int64_t request_bytes = 0;
+        int64_t read_bytes = 0;
+    };
+
+    RuntimeProfile::Counter* _copy_time;
+    RuntimeProfile::Counter* _read_time;
+    RuntimeProfile::Counter* _request_io;
+    RuntimeProfile::Counter* _merged_io;
+    RuntimeProfile::Counter* _request_bytes;
+    RuntimeProfile::Counter* _read_bytes;
+
+    int _search_read_range(size_t start_offset, size_t end_offset);
+    void _clean_cached_data(RangeCachedData& cached_data);
+    void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice result,
+                      size_t* bytes_read);
+    Status _fill_box(int range_index, size_t start_offset, size_t to_read, size_t* bytes_read,
+                     const IOContext* io_ctx);
+    void _dec_box_ref(int16 box_index);
+
+    RuntimeProfile* _profile = nullptr;
+    io::FileReaderSPtr _reader;
+    const std::vector<PrefetchRange> _random_access_ranges;
+    std::vector<RangeCachedData> _range_cached_data;
+    size_t _size;
+    bool _closed = false;
+    size_t _remaining;
+
+    char* _read_slice = nullptr;
+    std::vector<char*> _boxes;
+    int16 _last_box_ref = -1;
+    uint32 _last_box_usage = 0;
+    std::vector<int16> _box_ref;
+
+    Statistics _statistics;
+};
+
+class DelegateReader {
+public:
+    class ThreadSafeReader : public io::FileReader {
+    public:
+        ThreadSafeReader(io::FileReaderSPtr reader) : _reader(std::move(reader)) {
+            _size = _reader->size();
+        }
+
+        ~ThreadSafeReader() override { close(); }
+
+        Status close() override {
+            if (!_closed) {
+                _closed = true;
+                return _reader->close();
+            }
+            return Status::OK();
+        }
+
+        const io::Path& path() const override { return _reader->path(); }
+
+        size_t size() const override { return _size; }
+
+        bool closed() const override { return _closed; }
+
+        std::shared_ptr<io::FileSystem> fs() const override { return _reader->fs(); }
+
+    protected:
+        Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                            const IOContext* io_ctx) override {
+            if (typeid(*_reader.get()) == typeid(io::S3FileReader) ||

Review Comment:
   Is `typeid()` time consuming?
   Looks like we don' need to check type each time we call `read_at_impl`?



##########
be/src/io/fs/buffered_reader.h:
##########
@@ -24,49 +24,309 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "io/cache/block/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
 #include "io/fs/file_reader.h"
+#include "io/fs/s3_file_reader.h"
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
+#include "vec/common/typeid_cast.h"
 
 namespace doris {
 namespace io {
 
+struct PrefetchRange {
+    size_t start_offset;
+    size_t end_offset;
+
+    PrefetchRange(size_t start_offset, size_t end_offset)
+            : start_offset(start_offset), end_offset(end_offset) {}
+
+    PrefetchRange(const PrefetchRange& other) = default;
+
+    PrefetchRange() : start_offset(0), end_offset(0) {}
+};
+
+/**
+ * A FileReader that efficiently supports random access in format like parquet and orc.
+ * In order to merge small IO in parquet and orc, the random access ranges should be generated
+ * when creating the reader. The random access ranges is a list of ranges that order by offset.
+ * The range in random access ranges should be reading sequentially, can be skipped, but can't be
+ * read repeatedly.
+ *
+ * For example, in parquet, the random access ranges is the column offsets in a row group.
+ *
+ * When reading at offset, if [offset, offset + 8MB) contains many random access ranges, the reader
+ * will read data in [offset, offset + 8MB) as a whole, and copy the data in random access ranges
+ * into small buffers(name as box, default 1MB, 64MB in total). A box can be occupied by many ranges,
+ * and use a reference counter to recorde how many ranges are cached in the box. If reference counter

Review Comment:
   ```suggestion
    * and use a reference counter to record how many ranges are cached in the box. If reference counter
   ```



##########
be/src/io/fs/buffered_reader.cpp:
##########
@@ -33,13 +33,322 @@ bvar::PerSecond<bvar::Adder<uint64_t>> g_bytes_downloaded_per_second("buffered_r
                                                                      "bytes_downloaded_per_second",
                                                                      &g_bytes_downloaded, 60);
 
+Status RandomAccessReader::read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                                        const IOContext* io_ctx) {
+    _statistics.request_io++;
+    *bytes_read = 0;
+    if (result.size == 0) {
+        return Status::OK();
+    }
+    int range_index = _search_read_range(offset, offset + result.size);
+    if (range_index < 0) {
+        SCOPED_RAW_TIMER(&_statistics.read_time);
+        Status st = _reader->read_at(offset, result, bytes_read, io_ctx);
+        _statistics.merged_io++;
+        _statistics.request_bytes += *bytes_read;
+        _statistics.read_bytes += *bytes_read;
+        return st;
+    }
+    if (offset + result.size > _random_access_ranges[range_index].end_offset) {
+        // return _reader->read_at(offset, result, bytes_read, io_ctx);
+        return Status::IOError("Range in RandomAccessReader should be read sequentially");

Review Comment:
   This Reader is named `RandomAccess` but can only read sequentially.
   It is strange, better to rename it.



##########
be/src/io/fs/buffered_reader.h:
##########
@@ -139,6 +424,33 @@ class PrefetchBufferedReader : public io::FileReader {
     size_t _size;
 };
 
+class InMemoryFileReader : public io::FileReader {

Review Comment:
   Add some comment



##########
be/src/io/fs/buffered_reader.h:
##########
@@ -24,49 +24,309 @@
 
 #include "common/config.h"
 #include "common/status.h"
+#include "io/cache/block/cached_remote_file_reader.h"
+#include "io/file_factory.h"
+#include "io/fs/broker_file_reader.h"
 #include "io/fs/file_reader.h"
+#include "io/fs/s3_file_reader.h"
 #include "olap/olap_define.h"
 #include "util/runtime_profile.h"
+#include "vec/common/typeid_cast.h"
 
 namespace doris {
 namespace io {
 
+struct PrefetchRange {
+    size_t start_offset;
+    size_t end_offset;
+
+    PrefetchRange(size_t start_offset, size_t end_offset)
+            : start_offset(start_offset), end_offset(end_offset) {}
+
+    PrefetchRange(const PrefetchRange& other) = default;
+
+    PrefetchRange() : start_offset(0), end_offset(0) {}
+};
+
+/**
+ * A FileReader that efficiently supports random access in format like parquet and orc.
+ * In order to merge small IO in parquet and orc, the random access ranges should be generated
+ * when creating the reader. The random access ranges is a list of ranges that order by offset.
+ * The range in random access ranges should be reading sequentially, can be skipped, but can't be
+ * read repeatedly.
+ *
+ * For example, in parquet, the random access ranges is the column offsets in a row group.
+ *
+ * When reading at offset, if [offset, offset + 8MB) contains many random access ranges, the reader
+ * will read data in [offset, offset + 8MB) as a whole, and copy the data in random access ranges
+ * into small buffers(name as box, default 1MB, 64MB in total). A box can be occupied by many ranges,
+ * and use a reference counter to recorde how many ranges are cached in the box. If reference counter
+ * equals zero, the box can be release or reused by other ranges. When there is no empty box for a new
+ * read operation, the read operation will do directly.
+ */
+class RandomAccessReader : public io::FileReader {
+public:
+    static constexpr size_t TOTAL_BUFFER_SIZE = 64 * 1024 * 1024;   // 64MB
+    static constexpr size_t READ_SLICE_SIZE = 8 * 1024 * 1024;      // 8MB
+    static constexpr size_t BOX_SIZE = 1 * 1024 * 1024;             // 1MB
+    static constexpr size_t SMALL_IO = 2 * 1024 * 1024;             // 2MB
+    static constexpr size_t NUM_BOX = TOTAL_BUFFER_SIZE / BOX_SIZE; // 64
+
+    RandomAccessReader(RuntimeProfile* profile, io::FileReaderSPtr reader,
+                       const std::vector<PrefetchRange>& random_access_ranges)
+            : _profile(profile),
+              _reader(std::move(reader)),
+              _random_access_ranges(random_access_ranges) {
+        _range_cached_data.resize(random_access_ranges.size());
+        _size = _reader->size();
+        _remaining = TOTAL_BUFFER_SIZE;
+        if (_profile != nullptr) {
+            const char* random_profile = "MergedSmallIO";
+            ADD_TIMER(_profile, random_profile);
+            _copy_time = ADD_CHILD_TIMER(_profile, "CopyTime", random_profile);
+            _read_time = ADD_CHILD_TIMER(_profile, "ReadTime", random_profile);
+            _request_io = ADD_CHILD_COUNTER(_profile, "RequestIO", TUnit::UNIT, random_profile);
+            _merged_io = ADD_CHILD_COUNTER(_profile, "MergedIO", TUnit::UNIT, random_profile);
+            _request_bytes =
+                    ADD_CHILD_COUNTER(_profile, "RequestBytes", TUnit::BYTES, random_profile);
+            _read_bytes = ADD_CHILD_COUNTER(_profile, "MergedBytes", TUnit::BYTES, random_profile);
+        }
+    }
+
+    ~RandomAccessReader() override {
+        if (_read_slice != nullptr) {
+            delete[] _read_slice;
+        }
+        for (char* box : _boxes) {
+            delete[] box;
+        }
+        close();
+    }
+
+    Status close() override {
+        if (!_closed) {
+            _closed = true;
+            // the underlying buffer is closed in its own destructor
+            // return _reader->close();
+            if (_profile != nullptr) {
+                COUNTER_UPDATE(_copy_time, _statistics.copy_time);
+                COUNTER_UPDATE(_read_time, _statistics.read_time);
+                COUNTER_UPDATE(_request_io, _statistics.request_io);
+                COUNTER_UPDATE(_merged_io, _statistics.merged_io);
+                COUNTER_UPDATE(_request_bytes, _statistics.request_bytes);
+                COUNTER_UPDATE(_read_bytes, _statistics.read_bytes);
+            }
+        }
+        return Status::OK();
+    }
+
+    const io::Path& path() const override { return _reader->path(); }
+
+    size_t size() const override { return _size; }
+
+    bool closed() const override { return _closed; }
+
+    std::shared_ptr<io::FileSystem> fs() const override { return _reader->fs(); }
+
+protected:
+    Status read_at_impl(size_t offset, Slice result, size_t* bytes_read,
+                        const IOContext* io_ctx) override;
+
+private:
+    struct RangeCachedData {
+        size_t start_offset;
+        size_t end_offset;
+        std::vector<int16> ref_box;
+        std::vector<uint32> box_start_offset;
+        std::vector<uint32> box_end_offset;
+        bool has_read = false;
+
+        RangeCachedData(size_t start_offset, size_t end_offset)
+                : start_offset(start_offset), end_offset(end_offset) {}
+
+        RangeCachedData() : start_offset(0), end_offset(0) {}
+
+        bool empty() const { return start_offset == end_offset; }
+
+        bool contains(size_t offset) const { return start_offset <= offset && offset < end_offset; }
+
+        void reset() {
+            start_offset = 0;
+            end_offset = 0;
+            ref_box.clear();
+            box_start_offset.clear();
+            box_end_offset.clear();
+        }
+
+        int16 release_last_box() {
+            // we can only release the last referenced box to ensure sequential read in range
+            if (!empty()) {
+                int16 last_box_ref = ref_box.back();
+                uint32 released_size = box_end_offset.back() - box_start_offset.back();
+                ref_box.pop_back();
+                box_start_offset.pop_back();
+                box_end_offset.pop_back();
+                end_offset -= released_size;
+                if (empty()) {
+                    reset();
+                }
+                return last_box_ref;
+            }
+            return -1;
+        }
+    };
+
+    struct Statistics {
+        int64_t copy_time = 0;
+        int64_t read_time = 0;
+        int64_t request_io = 0;
+        int64_t merged_io = 0;
+        int64_t request_bytes = 0;
+        int64_t read_bytes = 0;
+    };
+
+    RuntimeProfile::Counter* _copy_time;
+    RuntimeProfile::Counter* _read_time;
+    RuntimeProfile::Counter* _request_io;
+    RuntimeProfile::Counter* _merged_io;
+    RuntimeProfile::Counter* _request_bytes;
+    RuntimeProfile::Counter* _read_bytes;
+
+    int _search_read_range(size_t start_offset, size_t end_offset);
+    void _clean_cached_data(RangeCachedData& cached_data);
+    void _read_in_box(RangeCachedData& cached_data, size_t offset, Slice result,
+                      size_t* bytes_read);
+    Status _fill_box(int range_index, size_t start_offset, size_t to_read, size_t* bytes_read,
+                     const IOContext* io_ctx);
+    void _dec_box_ref(int16 box_index);
+
+    RuntimeProfile* _profile = nullptr;
+    io::FileReaderSPtr _reader;
+    const std::vector<PrefetchRange> _random_access_ranges;
+    std::vector<RangeCachedData> _range_cached_data;
+    size_t _size;
+    bool _closed = false;
+    size_t _remaining;
+
+    char* _read_slice = nullptr;
+    std::vector<char*> _boxes;
+    int16 _last_box_ref = -1;
+    uint32 _last_box_usage = 0;
+    std::vector<int16> _box_ref;
+
+    Statistics _statistics;
+};
+
+class DelegateReader {

Review Comment:
   Add some comment



##########
be/src/vec/exec/format/parquet/vparquet_reader.cpp:
##########
@@ -581,6 +582,42 @@ Status ParquetReader::_init_row_groups(const bool& is_filter_groups) {
     return Status::OK();
 }
 
+std::vector<io::PrefetchRange> ParquetReader::_generate_random_access_ranges(
+        const RowGroupReader::RowGroupIndex& group) {
+    std::vector<io::PrefetchRange> result;
+    int64_t last_chunk_end = -1;
+    std::function<void(const FieldSchema*, const tparquet::RowGroup&)> scalar_range =
+            [&](const FieldSchema* field, const tparquet::RowGroup& row_group) {
+                if (field->type.type == TYPE_ARRAY) {
+                    scalar_range(&field->children[0], row_group);
+                } else if (field->type.type == TYPE_MAP) {
+                    scalar_range(&field->children[0].children[0], row_group);
+                    scalar_range(&field->children[0].children[1], row_group);
+                } else if (field->type.type == TYPE_STRUCT) {
+                    for (int i = 0; i < field->children.size(); ++i) {
+                        scalar_range(&field->children[i], row_group);
+                    }
+                } else {
+                    const tparquet::ColumnChunk& chunk =
+                            row_group.columns[field->physical_column_index];
+                    auto& chunk_meta = chunk.meta_data;
+                    int64_t chunk_start = chunk_meta.__isset.dictionary_page_offset
+                                                  ? chunk_meta.dictionary_page_offset
+                                                  : chunk_meta.data_page_offset;
+                    int64_t chunk_end = chunk_start + chunk_meta.total_compressed_size;
+                    DCHECK_GE(chunk_start, last_chunk_end);

Review Comment:
   How to guarantee this DCHECK?
   Is is possible that columns in `_read_columns` are not sorted in order?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org