You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2023/01/31 15:38:59 UTC
[doris] 01/20: [improvement](reader) use union merge when rowset are noneoverlapping (#15749)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit 296cd73c8c429af26a187f38a0fc9f0f70be58c9
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Mon Jan 16 21:53:18 2023 +0800
[improvement](reader) use union merge when rowset are noneoverlapping (#15749)
---
be/src/vec/olap/block_reader.cpp | 36 ++++++++++++++++++---
be/src/vec/olap/block_reader.h | 4 +++
be/src/vec/olap/vcollect_iterator.cpp | 61 ++++++++++++++++++++++++++++++++---
be/src/vec/olap/vcollect_iterator.h | 6 +++-
4 files changed, 97 insertions(+), 10 deletions(-)
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index aa68e382b6..fa79e1bb9c 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -33,9 +33,33 @@ BlockReader::~BlockReader() {
}
}
+bool BlockReader::_rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& rs_readers) {
+ std::string cur_max_key;
+ for (const auto& rs_reader : rs_readers) {
+ // version 0-1 of every tablet is empty, just skip this rowset
+ if (rs_reader->rowset()->version().second == 1) {
+ continue;
+ }
+ if (rs_reader->rowset()->num_rows() == 0) {
+ continue;
+ }
+ if (rs_reader->rowset()->is_segments_overlapping()) {
+ return true;
+ }
+ std::string min_key;
+ bool has_min_key = rs_reader->rowset()->min_key(&min_key);
+ if (!has_min_key) {
+ return true;
+ }
+ if (min_key <= cur_max_key) {
+ return true;
+ }
+ CHECK(rs_reader->rowset()->max_key(&cur_max_key));
+ }
+ return false;
+}
Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
std::vector<RowsetReaderSharedPtr>* valid_rs_readers) {
- _vcollect_iter.init(this, read_params.read_orderby_key, read_params.read_orderby_key_reverse);
std::vector<RowsetReaderSharedPtr> rs_readers;
auto res = _capture_rs_readers(read_params, &rs_readers);
if (!res.ok()) {
@@ -46,6 +70,10 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
<< ", version:" << read_params.version;
return res;
}
+ // check if rowsets are noneoverlapping
+ _is_rowsets_overlapping = _rowsets_overlapping(rs_readers);
+ _vcollect_iter.init(this, _is_rowsets_overlapping, read_params.read_orderby_key,
+ read_params.read_orderby_key_reverse);
_reader_context.batch_size = _batch_size;
_reader_context.is_vec = true;
@@ -63,10 +91,8 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
}
RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers));
- if (_vcollect_iter.is_merge()) {
- auto status = _vcollect_iter.current_row(&_next_row);
- _eof = status.precise_code() == OLAP_ERR_DATA_EOF;
- }
+ auto status = _vcollect_iter.current_row(&_next_row);
+ _eof = status.is_end_of_file();
return Status::OK();
}
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 0213783da0..83786a2e69 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -86,6 +86,8 @@ private:
bool _get_next_row_same();
+ bool _rowsets_overlapping(const std::vector<RowsetReaderSharedPtr>& rs_readers);
+
VCollectIterator _vcollect_iter;
IteratorRowRef _next_row {{}, -1, false};
@@ -115,6 +117,8 @@ private:
std::vector<RowLocation> _block_row_locations;
ColumnPtr _delete_filter_column;
+
+ bool _is_rowsets_overlapping = true;
};
} // namespace vectorized
diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp
index b9160514ad..cc07e9be30 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -37,7 +37,8 @@ VCollectIterator::~VCollectIterator() {
}
}
-void VCollectIterator::init(TabletReader* reader, bool force_merge, bool is_reverse) {
+void VCollectIterator::init(TabletReader* reader, bool ori_data_overlapping, bool force_merge,
+ bool is_reverse) {
_reader = reader;
// when aggregate is enabled or key_type is DUP_KEYS, we don't merge
// multiple data to aggregate for better performance
@@ -47,8 +48,10 @@ void VCollectIterator::init(TabletReader* reader, bool force_merge, bool is_reve
_reader->_tablet->enable_unique_key_merge_on_write()))) {
_merge = false;
}
-
- if (force_merge) {
+ // When data is none overlapping, no need to build heap to traverse data
+ if (!ori_data_overlapping) {
+ _merge = false;
+ } else if (force_merge) {
_merge = true;
}
_is_reverse = is_reverse;
@@ -127,6 +130,22 @@ Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_reade
new Level1Iterator(_children, _reader, _merge, _is_reverse, _skip_same));
}
} else {
+ bool have_multiple_child = false;
+ bool is_first_child = true;
+ for (auto iter = _children.begin(); iter != _children.end();) {
+ auto s = (*iter)->init_for_union(is_first_child, have_multiple_child);
+ if (!s.ok()) {
+ delete (*iter);
+ iter = _children.erase(iter);
+ if (!s.is<END_OF_FILE>()) {
+ return s;
+ }
+ } else {
+ have_multiple_child = true;
+ is_first_child = false;
+ ++iter;
+ }
+ }
_inner_iter.reset(new Level1Iterator(_children, _reader, _merge, _is_reverse, _skip_same));
}
RETURN_IF_NOT_EOF_AND_OK(_inner_iter->init());
@@ -212,6 +231,36 @@ Status VCollectIterator::Level0Iterator::init(bool get_data_by_ref) {
return st;
}
+// if is_first_child = true, return first row in block。Unique keys and agg keys will
+// read a line first and then start loop :
+// while (!eof) {
+// collect_iter->next(&_next_row);
+// }
+// so first child load first row and other child row_pos = -1
+Status VCollectIterator::Level0Iterator::init_for_union(bool is_first_child, bool get_data_by_ref) {
+ _get_data_by_ref = get_data_by_ref && _rs_reader->support_return_data_by_ref() &&
+ config::enable_storage_vectorization;
+ if (!_get_data_by_ref) {
+ _block = std::make_shared<Block>(_schema.create_block(
+ _reader->_return_columns, _reader->_tablet_columns_convert_to_null_set));
+ }
+ auto st = _refresh_current_row();
+ if (_get_data_by_ref && _block_view.size()) {
+ if (is_first_child) {
+ _ref = _block_view[0];
+ } else {
+ _ref = _block_view[-1];
+ }
+ } else {
+ if (is_first_child) {
+ _ref = {_block, 0, false};
+ } else {
+ _ref = {_block, -1, false};
+ }
+ }
+ return st;
+}
+
int64_t VCollectIterator::Level0Iterator::version() const {
return _rs_reader->version().second;
}
@@ -259,7 +308,7 @@ Status VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) {
Status VCollectIterator::Level0Iterator::next(Block* block) {
CHECK(!_get_data_by_ref);
- if (_ref.row_pos == 0 && _ref.block != nullptr && UNLIKELY(_ref.block->rows() > 0)) {
+ if (_ref.row_pos <= 0 && _ref.block != nullptr && UNLIKELY(_ref.block->rows() > 0)) {
block->swap(*_ref.block);
_ref.reset();
return Status::OK();
@@ -306,6 +355,10 @@ VCollectIterator::Level1Iterator::Level1Iterator(
_skip_same(skip_same) {
_ref.reset();
_batch_size = reader->_batch_size;
+ // !_merge means that data are in order, so we just reverse children to return data in reverse
+ if (!_merge && _is_reverse) {
+ _children.reverse();
+ }
}
VCollectIterator::Level1Iterator::~Level1Iterator() {
diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h
index b16f752bc0..d64e45cf91 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -39,7 +39,7 @@ public:
// Hold reader point to get reader params
~VCollectIterator();
- void init(TabletReader* reader, bool force_merge, bool is_reverse);
+ void init(TabletReader* reader, bool ori_data_overlapping, bool force_merge, bool is_reverse);
Status add_child(RowsetReaderSharedPtr rs_reader);
@@ -85,6 +85,9 @@ private:
_compare_columns(reader->_reader_context.read_orderby_key_columns) {};
virtual Status init(bool get_data_by_ref = false) = 0;
+ virtual Status init_for_union(bool is_first_child, bool get_data_by_ref = false) {
+ return Status::OK();
+ };
virtual int64_t version() const = 0;
@@ -146,6 +149,7 @@ private:
~Level0Iterator() override = default;
Status init(bool get_data_by_ref = false) override;
+ Status init_for_union(bool is_first_child, bool get_data_by_ref = false) override;
int64_t version() const override;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org