You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/05/19 08:35:22 UTC

[incubator-doris] branch master updated: [BUG] fix bug for vectorized compaction and some storage vectorization bug (#9610)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 7c2db79b73 [BUG] fix bug for vectorized compaction and some storage vectorization bug (#9610)
7c2db79b73 is described below

commit 7c2db79b7354f54b4045966a102f7760e640a817
Author: Gabriel <ga...@gmail.com>
AuthorDate: Thu May 19 16:35:15 2022 +0800

    [BUG] fix bug for vectorized compaction and some storage vectorization bug (#9610)
---
 be/src/olap/compaction.cpp             |  9 +++++---
 be/src/vec/olap/block_reader.cpp       |  2 +-
 be/src/vec/olap/vcollect_iterator.cpp  | 39 +++++++++++++++++++++++++++-------
 be/src/vec/olap/vcollect_iterator.h    |  2 +-
 be/src/vec/olap/vgeneric_iterators.cpp |  5 ++++-
 5 files changed, 43 insertions(+), 14 deletions(-)

diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index b885162b76..db197eb683 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -76,7 +76,10 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     _output_version =
             Version(_input_rowsets.front()->start_version(), _input_rowsets.back()->end_version());
 
-    LOG(INFO) << "start " << compaction_name() << ". tablet=" << _tablet->full_name()
+    auto use_vectorized_compaction = _should_use_vectorized_compaction();
+    string merge_type = use_vectorized_compaction ? "v" : "";
+
+    LOG(INFO) << "start " << merge_type << compaction_name() << ". tablet=" << _tablet->full_name()
               << ", output_version=" << _output_version << ", permits: " << permits;
 
     RETURN_NOT_OK(construct_output_rowset_writer());
@@ -87,7 +90,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
     // The test results show that merger is low-memory-footprint, there is no need to tracker its mem pool
     Merger::Statistics stats;
     Status res;
-    auto use_vectorized_compaction = _should_use_vectorized_compaction();
+
     if (use_vectorized_compaction) {
         res = Merger::vmerge_rowsets(_tablet, compaction_type(), _input_rs_readers,
                                      _output_rs_writer.get(), &stats);
@@ -95,7 +98,7 @@ Status Compaction::do_compaction_impl(int64_t permits) {
         res = Merger::merge_rowsets(_tablet, compaction_type(), _input_rs_readers,
                                     _output_rs_writer.get(), &stats);
     }
-    string merge_type = use_vectorized_compaction ? "v" : "";
+
     if (!res.ok()) {
         LOG(WARNING) << "fail to do " << merge_type << compaction_name() << ". res=" << res
                      << ", tablet=" << _tablet->full_name()
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index 3f352793b3..62e583849f 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -64,7 +64,7 @@ Status BlockReader::_init_collect_iter(const ReaderParams& read_params,
         }
     }
 
-    _vcollect_iter.build_heap(*valid_rs_readers);
+    RETURN_IF_ERROR(_vcollect_iter.build_heap(*valid_rs_readers));
     if (_vcollect_iter.is_merge()) {
         auto status = _vcollect_iter.current_row(&_next_row);
         _eof = status.precise_code() == OLAP_ERR_DATA_EOF;
diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp
index 1fc853294f..e100dcfeb2 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -26,6 +26,14 @@ namespace vectorized {
 
 VCollectIterator::~VCollectIterator() {}
 
+#define RETURN_IF_NOT_EOF_AND_OK(stmt)                                                  \
+    do {                                                                                \
+        const Status& _status_ = (stmt);                                                \
+        if (UNLIKELY(!_status_.ok() && _status_.precise_code() != OLAP_ERR_DATA_EOF)) { \
+            return _status_;                                                            \
+        }                                                                               \
+    } while (false)
+
 void VCollectIterator::init(TabletReader* reader) {
     _reader = reader;
     // when aggregate is enabled or key_type is DUP_KEYS, we don't merge
@@ -45,20 +53,24 @@ Status VCollectIterator::add_child(RowsetReaderSharedPtr rs_reader) {
 // Build a merge heap. If _merge is true, a rowset with the max rownum
 // status will be used as the base rowset, and the other rowsets will be merged first and
 // then merged with the base rowset.
-void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers) {
+Status VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers) {
     DCHECK(rs_readers.size() == _children.size());
     _skip_same = _reader->_tablet->tablet_schema().keys_type() == KeysType::UNIQUE_KEYS;
     if (_children.empty()) {
         _inner_iter.reset(nullptr);
-        return;
+        return Status::OK();
     } else if (_merge) {
         DCHECK(!rs_readers.empty());
         for (auto [c_iter, r_iter] = std::pair {_children.begin(), rs_readers.begin()};
              c_iter != _children.end();) {
-            if ((*c_iter)->init() != Status::OK()) {
+            auto s = (*c_iter)->init();
+            if (!s.ok()) {
                 delete (*c_iter);
                 c_iter = _children.erase(c_iter);
                 r_iter = rs_readers.erase(r_iter);
+                if (s.precise_code() != OLAP_ERR_DATA_EOF) {
+                    return s;
+                }
             } else {
                 ++c_iter;
                 ++r_iter;
@@ -91,7 +103,7 @@ void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers
             }
             Level1Iterator* cumu_iter = new Level1Iterator(cumu_children, _reader,
                                                            cumu_children.size() > 1, _skip_same);
-            cumu_iter->init();
+            RETURN_IF_NOT_EOF_AND_OK(cumu_iter->init());
             std::list<LevelIterator*> children;
             children.push_back(*base_reader_child);
             children.push_back(cumu_iter);
@@ -103,9 +115,10 @@ void VCollectIterator::build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers
     } else {
         _inner_iter.reset(new Level1Iterator(_children, _reader, _merge, _skip_same));
     }
-    _inner_iter->init();
+    RETURN_IF_NOT_EOF_AND_OK(_inner_iter->init());
     // Clear _children earlier to release any related references
     _children.clear();
+    return Status::OK();
 }
 
 bool VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, LevelIterator* rhs) {
@@ -197,9 +210,13 @@ Status VCollectIterator::Level0Iterator::_refresh_current_row() {
             _ref.row_pos = 0;
             _block->clear_column_data();
             auto res = _rs_reader->next_block(_block.get());
-            if (!res.ok()) {
+            if (!res.ok() && res.precise_code() != OLAP_ERR_DATA_EOF) {
                 return res;
             }
+            if (res.precise_code() == OLAP_ERR_DATA_EOF && _block->rows() == 0) {
+                _ref.row_pos = -1;
+                return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
+            }
         }
     } while (_block->rows() != 0);
     _ref.row_pos = -1;
@@ -209,7 +226,6 @@ Status VCollectIterator::Level0Iterator::_refresh_current_row() {
 Status VCollectIterator::Level0Iterator::next(IteratorRowRef* ref) {
     _ref.row_pos++;
     RETURN_NOT_OK(_refresh_current_row());
-
     *ref = _ref;
     return Status::OK();
 }
@@ -220,7 +236,14 @@ Status VCollectIterator::Level0Iterator::next(Block* block) {
         _ref.row_pos = -1;
         return Status::OK();
     } else {
-        return _rs_reader->next_block(block);
+        auto res = _rs_reader->next_block(block);
+        if (!res.ok() && res.precise_code() != OLAP_ERR_DATA_EOF) {
+            return res;
+        }
+        if (res.precise_code() == OLAP_ERR_DATA_EOF && _block->rows() == 0) {
+            return Status::OLAPInternalError(OLAP_ERR_DATA_EOF);
+        }
+        return Status::OK();
     }
 }
 
diff --git a/be/src/vec/olap/vcollect_iterator.h b/be/src/vec/olap/vcollect_iterator.h
index 4bfebd2480..6cae36dcc3 100644
--- a/be/src/vec/olap/vcollect_iterator.h
+++ b/be/src/vec/olap/vcollect_iterator.h
@@ -49,7 +49,7 @@ public:
 
     Status add_child(RowsetReaderSharedPtr rs_reader);
 
-    void build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers);
+    Status build_heap(std::vector<RowsetReaderSharedPtr>& rs_readers);
     // Get top row of the heap, nullptr if reach end.
     Status current_row(IteratorRowRef* ref) const;
 
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp
index 0a7b212420..bd3df6adbb 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -341,7 +341,10 @@ Status VMergeIterator::next_batch(vectorized::Block* block) {
             delete ctx;
         }
     }
-
+    if (!_merge_heap.empty()) {
+        return Status::OK();
+    }
+    // Still last batch needs to be processed
     return Status::EndOfFile("no more data in segment");
 }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org