You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/19 08:35:22 UTC
[incubator-doris] branch master updated: [BUG] fix bug for vectorized compaction and some storage vectorization bug (#9610)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 7c2db79b73 [BUG] fix bug for vectorized compaction and some storage vectorization bug (#9610)
7c2db79b73 is described below
commit 7c2db79b7354f54b4045966a102f7760e640a817
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu May 19 16:35:15 2022 +0800
[BUG] fix bug for vectorized compaction and some storage vectorization bug (#9610)
---
be/src/olap/compaction.cpp | 9 +++++---
be/src/vec/olap/block_reader.cpp | 2 +-
be/src/vec/olap/vcollect_iterator.cpp | 39 +++++++++++++++++++++++++++-------
be/src/vec/olap/vcollect_iterator.h | 2 +-
be/src/vec/olap/vgeneric_iterators.cpp | 5 ++++-
5 files changed, 43 insertions(+), 14 deletions(-)
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index b885162b76..db197eb683 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -76,7 +76,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
_output_version =
Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version());
- LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
+ auto use_vectorized_compaction = _should_use_vectorized_compaction();
+ string merge_type = use_vectorized_compaction ? "v" : "";
+
+ LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
<< ", output_version=" << _output_version << ", permits: " << permits;
RETURN_NOT_OK(construct_output_rowset_writer());
@@ -87,7 +90,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
// The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
Merger::Statistics stats;
Status res;
- auto use_vectorized_compaction = _should_use_vectorized_compaction();
+
if (use_vectorized_compaction) {
res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
@@ -95,7 +98,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers,
_output_rs_writer.get(), &stats);
}
- string merge_type = use_vectorized_compaction ? "v" : "";
+
if (!res.ok()) {
LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res
<< ", tablet=" << _tablet->full_name()
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 3f352793b3..62e583849f 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -64,7 +64,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
}
}
- _vcollect_iter.build_heap(*valid_rs_readers);
+ RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers));
if (_vcollect_iter.is_merge()) {
auto status = _vcollect_iter.current_row(&_next_row);
_eof = status.precise_code() == OLAP_ERR_DATA_EOF;
diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp
index 1fc853294f..e100dcfeb2 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -26,6 +26,14 @@ namespace vectorized {
VCollectIterator::~VCollectIterator() {}
+#define RETURN_IF_NOT_EOF_AND_OK(stmt) \
+ do { \
+ const Status& _status_ = (stmt); \
+ if (UNLIKELY(!_status_.ok() && _status_.precise_code() != OLAP_ERR_DATA_EOF)) { \
+ return _status_; \
+ } \
+ } while (false)
+
void VCollectIterator::init(TabletReader* reader) {
_reader = reader;
// when aggregate is enabled or key_type is DUP_KEYS, we don't merge
@@ -45,20 +53,24 @@ Status VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
// Build a merge heap. If _merge is true, a rowset with the max rownum
// status will be used as the base rowset, and the other rowsets will be merged first and
// then merged with the base rowset.
-void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers) {
+Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers) {
DCHECK(rs_readers.size() == _children.size());
_skip_same = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS;
if (_children.empty()) {
_inner_iter.reset(nullptr);
- return;
+ return Status::OK();
} else if (_merge) {
DCHECK(!rs_readers.empty());
for (auto [c_iter, r_iter] = std::pair {_children.begin(), rs_readers.begin()};
c_iter != _children.end();) {
- if ((*c_iter)->init() != Status::OK()) {
+ auto s = (*c_iter)->init();
+ if (!s.ok()) {
delete (*c_iter);
c_iter = _children.erase(c_iter);
r_iter = rs_readers.erase(r_iter);
+ if (s.precise_code() != OLAP_ERR_DATA_EOF) {
+ return s;
+ }
} else {
++c_iter;
++r_iter;
@@ -91,7 +103,7 @@ void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers
}
Level1Iterator* cumu_iter = new Level1Iterator(cumu_children, _reader,
cumu_children.size() > 1, _skip_same);
- cumu_iter->init();
+ RETURN_IF_NOT_EOF_AND_OK(cumu_iter->init());
std::list<LevelIterator*> children;
children.push_back(*base_reader_child);
children.push_back(cumu_iter);
@@ -103,9 +115,10 @@ void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers
} else {
_inner_iter.reset(new Level1Iterator(_children, _reader, _merge, _skip_same));
}
- _inner_iter->init();
+ RETURN_IF_NOT_EOF_AND_OK(_inner_iter->init());
// Clear _children earlier to release any related references
_children.clear();
+ return Status::OK();
}
bool VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, LevelIterator* rhs) {
@@ -197,9 +210,13 @@ Status VCollectIterator::Level0Iterator::_refresh_current_row() {
_ref.row_pos = 0;
_block->clear_column_data();
auto res = _rs_reader->next_block(_block.get());
- if (!res.ok()) {
+ if (!res.ok() && res.precise_code() != OLAP_ERR_DATA_EOF) {
return res;
}
+ if (res.precise_code() == OLAP_ERR_DATA_EOF && _block->rows() == 0) {
+ _ref.row_pos = -1;
+ return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
+ }
}
} while (_block->rows() != 0);
_ref.row_pos = -1;
@@ -209,7 +226,6 @@ Status VCollectIterator::Level0Iterator::_refresh_current_row() {
Status VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) {
_ref.row_pos++;
RETURN_NOT_OK(_refresh_current_row());
-
*ref = _ref;
return Status::OK();
}
@@ -220,7 +236,14 @@ Status VCollectIterator::Level0Iterator::next(Block* block) {
_ref.row_pos = -1;
return Status::OK();
} else {
- return _rs_reader->next_block(block);
+ auto res = _rs_reader->next_block(block);
+ if (!res.ok() && res.precise_code() != OLAP_ERR_DATA_EOF) {
+ return res;
+ }
+ if (res.precise_code() == OLAP_ERR_DATA_EOF && _block->rows() == 0) {
+ return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
+ }
+ return Status::OK();
}
}
diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h
index 4bfebd2480..6cae36dcc3 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -49,7 +49,7 @@ public:
Status add_child(RowsetReaderSharedPtr rs_reader);
- void build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers);
+ Status build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers);
// Get top row of the heap, nullptr if reach end.
Status current_row(IteratorRowRef* ref) const;
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp
index 0a7b212420..bd3df6adbb 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -341,7 +341,10 @@ Status VMergeIterator::next_batch(vectorized::Block* block) {
delete ctx;
}
}
-
+ if (!_merge_heap.empty()) {
+ return Status::OK();
+ }
+ // Still last batch needs to be processed
return Status::EndOfFile("no more data in segment");
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org