You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/01/02 14:54:27 UTC

[doris] branch master updated: [fix](vec) VMergeIterator add key same label for agg table (#14722)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 40c53931e5 [fix](vec) VMergeIterator add key same label for agg table (#14722)
40c53931e5 is described below

commit 40c53931e57be494a71591bc1d32fef2450207fd
Author: xueweizhang <zx...@163.com>
AuthorDate: Mon Jan 2 22:54:21 2023 +0800

    [fix](vec) VMergeIterator add key same label for agg table (#14722)
---
 be/src/olap/olap_define.h              |  2 +-
 be/src/vec/core/block.cpp              | 13 +++++++++++++
 be/src/vec/core/block.h                | 15 +++++++++++++++
 be/src/vec/olap/block_reader.cpp       | 11 ++++++++++-
 be/src/vec/olap/block_reader.h         |  2 ++
 be/src/vec/olap/vgeneric_iterators.cpp |  9 ++++++++-
 be/src/vec/olap/vgeneric_iterators.h   | 18 ++++++++++++++++++
 be/test/olap/segcompaction_test.cpp    |  1 +
 8 files changed, 68 insertions(+), 3 deletions(-)

diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 40a6dd9dee..a9c92c69e2 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -212,4 +212,4 @@ private:                                 \
 #define BUILD_VERSION "Unknown"
 #endif
 
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 571a391eb0..817a2ac643 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -198,6 +198,9 @@ void Block::erase_impl(size_t position) {
             ++it;
         }
     }
+    if (position < row_same_bit.size()) {
+        row_same_bit.erase(row_same_bit.begin() + position);
+    }
 }
 
 void Block::erase(const String& name) {
@@ -339,6 +342,9 @@ void Block::set_num_rows(size_t length) {
                 elem.column = elem.column->cut(0, length);
             }
         }
+        if (length < row_same_bit.size()) {
+            row_same_bit.resize(length);
+        }
     }
 }
 
@@ -353,6 +359,9 @@ void Block::skip_num_rows(int64_t& length) {
                 elem.column = elem.column->cut(length, origin_rows - length);
             }
         }
+        if (length < row_same_bit.size()) {
+            row_same_bit.assign(row_same_bit.begin() + length, row_same_bit.end());
+        }
     }
 }
 
@@ -593,6 +602,7 @@ DataTypes Block::get_data_types() const {
 void Block::clear() {
     data.clear();
     index_by_name.clear();
+    row_same_bit.clear();
 }
 
 void Block::clear_column_data(int column_size) noexcept {
@@ -607,17 +617,20 @@ void Block::clear_column_data(int column_size) noexcept {
         DCHECK_EQ(d.column->use_count(), 1);
         (*std::move(d.column)).assume_mutable()->clear();
     }
+    row_same_bit.clear();
 }
 
 void Block::swap(Block& other) noexcept {
     data.swap(other.data);
     index_by_name.swap(other.index_by_name);
+    row_same_bit.swap(other.row_same_bit);
 }
 
 void Block::swap(Block&& other) noexcept {
     clear();
     data = std::move(other.data);
     initialize_index_by_name();
+    row_same_bit = std::move(other.row_same_bit);
 }
 
 void Block::update_hash(SipHash& hash) const {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 803fb82f97..b4a73a896f 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -62,6 +62,7 @@ private:
 
     Container data;
     IndexByName index_by_name;
+    std::vector<bool> row_same_bit;
 
     int64_t _decompress_time_ns = 0;
     int64_t _decompressed_bytes = 0;
@@ -349,6 +350,20 @@ public:
     int64_t get_decompressed_bytes() const { return _decompressed_bytes; }
     int64_t get_compress_time() const { return _compress_time_ns; }
 
+    void set_same_bit(std::vector<bool>::const_iterator begin,
+                      std::vector<bool>::const_iterator end) {
+        row_same_bit.insert(row_same_bit.end(), begin, end);
+
+        DCHECK_EQ(row_same_bit.size(), rows());
+    }
+
+    bool get_same_bit(size_t position) {
+        if (position >= row_same_bit.size()) {
+            return false;
+        }
+        return row_same_bit[position];
+    }
+
 private:
     void erase_impl(size_t position);
     void initialize_index_by_name();
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index be064d26ce..43336b1861 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -213,7 +213,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
             return res;
         }
 
-        if (!_next_row.is_same) {
+        if (!_get_next_row_same()) {
             if (target_block_row == _batch_size) {
                 break;
             }
@@ -413,6 +413,15 @@ void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end,
     }
 }
 
+bool BlockReader::_get_next_row_same() {
+    if (_next_row.is_same) {
+        return true;
+    } else {
+        auto block = _next_row.block.get();
+        return block->get_same_bit(_next_row.row_pos);
+    }
+}
+
 ColumnPredicate* BlockReader::_parse_to_predicate(const FunctionFilter& function_filter) {
     int32_t index = _tablet_schema->field_index(function_filter._col_name);
     if (index < 0) {
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 18e3e0e232..f727930022 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -79,6 +79,8 @@ private:
 
     void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true);
 
+    bool _get_next_row_same();
+
     VCollectIterator _vcollect_iter;
     IteratorRowRef _next_row {{}, -1, false};
 
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp
index 7997a3d43d..2d6d54f762 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -124,6 +124,7 @@ bool VMergeIteratorContext::compare(const VMergeIteratorContext& rhs) const {
     if (_is_unique) {
         result ? set_skip(true) : rhs.set_skip(true);
     }
+    result ? set_same(true) : rhs.set_same(true);
     return result;
 }
 
@@ -148,6 +149,8 @@ void VMergeIteratorContext::copy_rows(Block* block, bool advanced) {
 
         d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num);
     }
+    const auto& tmp_pre_ctx_same_bit = get_pre_ctx_same();
+    dst.set_same_bit(tmp_pre_ctx_same_bit.begin(), tmp_pre_ctx_same_bit.begin() + _cur_batch_num);
     _cur_batch_num = 0;
 }
 
@@ -158,8 +161,9 @@ void VMergeIteratorContext::copy_rows(BlockView* view, bool advanced) {
     size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
     DCHECK(start >= 0);
 
+    const auto& tmp_pre_ctx_same_bit = get_pre_ctx_same();
     for (size_t i = 0; i < _cur_batch_num; ++i) {
-        view->push_back({_block, static_cast<int>(start + i), false});
+        view->push_back({_block, static_cast<int>(start + i), tmp_pre_ctx_same_bit[i]});
     }
 
     _cur_batch_num = 0;
@@ -255,11 +259,14 @@ Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
     if (valid()) {
         RETURN_IF_ERROR(advance());
     }
+    _pre_ctx_same_bit.reserve(_block_row_max);
+    _pre_ctx_same_bit.assign(_block_row_max, false);
     return Status::OK();
 }
 
 Status VMergeIteratorContext::advance() {
     _skip = false;
+    _same = false;
     // NOTE: we increase _index_in_block directly to valid one check
     do {
         _index_in_block++;
diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h
index 210f47022a..e5bb36d0d3 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -120,6 +120,21 @@ public:
 
     void set_skip(bool skip) const { _skip = skip; }
 
+    bool is_same() const { return _same; }
+
+    void set_same(bool same) const { _same = same; }
+
+    const std::vector<bool>& get_pre_ctx_same() const { return _pre_ctx_same_bit; }
+
+    void set_pre_ctx_same(VMergeIteratorContext* ctx) const {
+        int64_t index = ctx->get_cur_batch() - 1;
+        DCHECK(index >= 0);
+        DCHECK_LT(index, _pre_ctx_same_bit.size());
+        _pre_ctx_same_bit[index] = ctx->is_same();
+    }
+
+    size_t get_cur_batch() const { return _cur_batch_num; }
+
     void add_cur_batch() { _cur_batch_num++; }
 
     void reset_cur_batch() { _cur_batch_num = 0; }
@@ -137,6 +152,7 @@ private:
     bool _is_reverse = false;
     bool _valid = false;
     mutable bool _skip = false;
+    mutable bool _same = false;
     size_t _index_in_block = -1;
     // 4096 minus 16 + 16 bytes padding that in padding pod array
     int _block_row_max = 4064;
@@ -151,6 +167,7 @@ private:
     std::shared_ptr<Block> _block;
     // used to store data still on block view
     std::list<std::shared_ptr<Block>> _block_list;
+    mutable std::vector<bool> _pre_ctx_same_bit;
 };
 
 class VMergeIterator : public RowwiseIterator {
@@ -221,6 +238,7 @@ private:
                     }
                     pre_ctx = ctx;
                 }
+                pre_ctx->set_pre_ctx_same(ctx);
                 if (UNLIKELY(_record_rowids)) {
                     _block_row_locations[row_idx] = ctx->current_row_location();
                 }
diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp
index 5ac39db339..78fff2dd6e 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -310,6 +310,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
             }
             s = rowset_writer->flush();
             EXPECT_EQ(Status::OK(), s);
+            sleep(1);
         }
         num_segments = 1;
         rows_per_segment = 6400;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org