You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/31 15:38:59 UTC

[doris] 01/20: [improvement](reader) use union merge when rowset are noneoverlapping (#15749)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 296cd73c8c429af26a187f38a0fc9f0f70be58c9
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Mon Jan 16 21:53:18 2023 +0800

    [improvement](reader) use union merge when rowset are noneoverlapping (#15749)
---
 be/src/vec/olap/block_reader.cpp      | 36 ++++++++++++++++++---
 be/src/vec/olap/block_reader.h        |  4 +++
 be/src/vec/olap/vcollect_iterator.cpp | 61 ++++++++++++++++++++++++++++++++---
 be/src/vec/olap/vcollect_iterator.h   |  6 +++-
 4 files changed, 97 insertions(+), 10 deletions(-)

diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index aa68e382b6..fa79e1bb9c 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -33,9 +33,33 @@ BlockReader::~BlockReader() {
     }
 }
 
+bool BlockReader::_rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
+    std::string cur_max_key;
+    for (const auto& rs_reader : rs_readers) {
+        // version 0-1 of every tablet is empty, just skip this rowset
+        if (rs_reader->rowset()->version().second == 1) {
+            continue;
+        }
+        if (rs_reader->rowset()->num_rows() == 0) {
+            continue;
+        }
+        if (rs_reader->rowset()->is_segments_overlapping()) {
+            return true;
+        }
+        std::string min_key;
+        bool has_min_key = rs_reader->rowset()->min_key(&min_key);
+        if (!has_min_key) {
+            return true;
+        }
+        if (min_key <= cur_max_key) {
+            return true;
+        }
+        CHECK(rs_reader->rowset()->max_key(&cur_max_key));
+    }
+    return false;
+}
 Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
                                        std::vector<RowsetReaderSharedPtr>* valid_rs_readers) {
-    _vcollect_iter.init(this, read_params.read_orderby_key, read_params.read_orderby_key_reverse);
     std::vector<RowsetReaderSharedPtr> rs_readers;
     auto res = _capture_rs_readers(read_params, &rs_readers);
     if (!res.ok()) {
@@ -46,6 +70,10 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
                      << ", version:" << read_params.version;
         return res;
     }
+    // check if rowsets are noneoverlapping
+    _is_rowsets_overlapping = _rowsets_overlapping(rs_readers);
+    _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
+                        read_params.read_orderby_key_reverse);
 
     _reader_context.batch_size = _batch_size;
     _reader_context.is_vec = true;
@@ -63,10 +91,8 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
     }
 
     RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers));
-    if (_vcollect_iter.is_merge()) {
-        auto status = _vcollect_iter.current_row(&_next_row);
-        _eof = status.precise_code() == OLAP_ERR_DATA_EOF;
-    }
+    auto status = _vcollect_iter.current_row(&_next_row);
+    _eof = status.is_end_of_file();
 
     return Status::OK();
 }
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 0213783da0..83786a2e69 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -86,6 +86,8 @@ private:
 
     bool _get_next_row_same();
 
+    bool _rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& rs_readers);
+
     VCollectIterator _vcollect_iter;
     IteratorRowRef _next_row {{}, -1, false};
 
@@ -115,6 +117,8 @@ private:
     std::vector<RowLocation> _block_row_locations;
 
     ColumnPtr _delete_filter_column;
+
+    bool _is_rowsets_overlapping = true;
 };
 
 } // namespace vectorized
diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp
index b9160514ad..cc07e9be30 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -37,7 +37,8 @@ VCollectIterator::~VCollectIterator() {
     }
 }
 
-void VCollectIterator::init(TabletReader* reader, bool force_merge, bool is_reverse) {
+void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, bool force_merge,
+                            bool is_reverse) {
     _reader = reader;
     // when aggregate is enabled or key_type is DUP_KEYS, we don't merge
     // multiple data to aggregate for better performance
@@ -47,8 +48,10 @@ void VCollectIterator::init(TabletReader* reader, bool force_merge, bool is_reve
           _reader->_tablet->enable_unique_key_merge_on_write()))) {
         _merge = false;
     }
-
-    if (force_merge) {
+    // When data is none overlapping, no need to build heap to traverse data
+    if (!ori_data_overlapping) {
+        _merge = false;
+    } else if (force_merge) {
         _merge = true;
     }
     _is_reverse = is_reverse;
@@ -127,6 +130,22 @@ Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade
                     new Level1Iterator(_children, _reader, _merge, _is_reverse, _skip_same));
         }
     } else {
+        bool have_multiple_child = false;
+        bool is_first_child = true;
+        for (auto iter = _children.begin(); iter != _children.end();) {
+            auto s = (*iter)->init_for_union(is_first_child, have_multiple_child);
+            if (!s.ok()) {
+                delete (*iter);
+                iter = _children.erase(iter);
+                if (!s.is<END_OF_FILE>()) {
+                    return s;
+                }
+            } else {
+                have_multiple_child = true;
+                is_first_child = false;
+                ++iter;
+            }
+        }
         _inner_iter.reset(new Level1Iterator(_children, _reader, _merge, _is_reverse, _skip_same));
     }
     RETURN_IF_NOT_EOF_AND_OK(_inner_iter->init());
@@ -212,6 +231,36 @@ Status VCollectIterator::Level0Iterator::init(bool get_data_by_ref) {
     return st;
 }
 
+// if is_first_child = true, return first row in block。Unique keys and agg keys will
+// read a line first and then start loop :
+// while (!eof) {
+//     collect_iter->next(&_next_row);
+// }
+// so first child load first row and other child row_pos = -1
+Status VCollectIterator::Level0Iterator::init_for_union(bool is_first_child, bool get_data_by_ref) {
+    _get_data_by_ref = get_data_by_ref && _rs_reader->support_return_data_by_ref() &&
+                       config::enable_storage_vectorization;
+    if (!_get_data_by_ref) {
+        _block = std::make_shared<Block>(_schema.create_block(
+                _reader->_return_columns, _reader->_tablet_columns_convert_to_null_set));
+    }
+    auto st = _refresh_current_row();
+    if (_get_data_by_ref && _block_view.size()) {
+        if (is_first_child) {
+            _ref = _block_view[0];
+        } else {
+            _ref = _block_view[-1];
+        }
+    } else {
+        if (is_first_child) {
+            _ref = {_block, 0, false};
+        } else {
+            _ref = {_block, -1, false};
+        }
+    }
+    return st;
+}
+
 int64_t VCollectIterator::Level0Iterator::version() const {
     return _rs_reader->version().second;
 }
@@ -259,7 +308,7 @@ Status VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) {
 
 Status VCollectIterator::Level0Iterator::next(Block* block) {
     CHECK(!_get_data_by_ref);
-    if (_ref.row_pos == 0 && _ref.block != nullptr && UNLIKELY(_ref.block->rows() > 0)) {
+    if (_ref.row_pos <= 0 && _ref.block != nullptr && UNLIKELY(_ref.block->rows() > 0)) {
         block->swap(*_ref.block);
         _ref.reset();
         return Status::OK();
@@ -306,6 +355,10 @@ VCollectIterator::Level1Iterator::Level1Iterator(
           _skip_same(skip_same) {
     _ref.reset();
     _batch_size = reader->_batch_size;
+    // !_merge means that data are in order, so we just reverse children to return data in reverse
+    if (!_merge && _is_reverse) {
+        _children.reverse();
+    }
 }
 
 VCollectIterator::Level1Iterator::~Level1Iterator() {
diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h
index b16f752bc0..d64e45cf91 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -39,7 +39,7 @@ public:
     // Hold reader point to get reader params
     ~VCollectIterator();
 
-    void init(TabletReader* reader, bool force_merge, bool is_reverse);
+    void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse);
 
     Status add_child(RowsetReaderSharedPtr rs_reader);
 
@@ -85,6 +85,9 @@ private:
                   _compare_columns(reader->_reader_context.read_orderby_key_columns) {};
 
         virtual Status init(bool get_data_by_ref = false) = 0;
+        virtual Status init_for_union(bool is_first_child, bool get_data_by_ref = false) {
+            return Status::OK();
+        };
 
         virtual int64_t version() const = 0;
 
@@ -146,6 +149,7 @@ private:
         ~Level0Iterator() override = default;
 
         Status init(bool get_data_by_ref = false) override;
+        Status init_for_union(bool is_first_child, bool get_data_by_ref = false) override;
 
         int64_t version() const override;
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org