You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2023/01/02 14:54:27 UTC
[doris] branch master updated: [fix](vec) VMergeIterator add key same label for agg table (#14722)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 40c53931e5 [fix](vec) VMergeIterator add key same label for agg table (#14722)
40c53931e5 is described below
commit 40c53931e57be494a71591bc1d32fef2450207fd
Author: xueweizhang <zx...@163.com>
AuthorDate: Mon Jan 2 22:54:21 2023 +0800
[fix](vec) VMergeIterator add key same label for agg table (#14722)
---
be/src/olap/olap_define.h | 2 +-
be/src/vec/core/block.cpp | 13 +++++++++++++
be/src/vec/core/block.h | 15 +++++++++++++++
be/src/vec/olap/block_reader.cpp | 11 ++++++++++-
be/src/vec/olap/block_reader.h | 2 ++
be/src/vec/olap/vgeneric_iterators.cpp | 9 ++++++++-
be/src/vec/olap/vgeneric_iterators.h | 18 ++++++++++++++++++
be/test/olap/segcompaction_test.cpp | 1 +
8 files changed, 68 insertions(+), 3 deletions(-)
diff --git a/be/src/olap/olap_define.h b/be/src/olap/olap_define.h
index 40a6dd9dee..a9c92c69e2 100644
--- a/be/src/olap/olap_define.h
+++ b/be/src/olap/olap_define.h
@@ -212,4 +212,4 @@ private: \
#define BUILD_VERSION "Unknown"
#endif
-} // namespace doris
\ No newline at end of file
+} // namespace doris
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 571a391eb0..817a2ac643 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -198,6 +198,9 @@ void Block::erase_impl(size_t position) {
++it;
}
}
+ if (position < row_same_bit.size()) {
+ row_same_bit.erase(row_same_bit.begin() + position);
+ }
}
void Block::erase(const String& name) {
@@ -339,6 +342,9 @@ void Block::set_num_rows(size_t length) {
elem.column = elem.column->cut(0, length);
}
}
+ if (length < row_same_bit.size()) {
+ row_same_bit.resize(length);
+ }
}
}
@@ -353,6 +359,9 @@ void Block::skip_num_rows(int64_t& length) {
elem.column = elem.column->cut(length, origin_rows - length);
}
}
+ if (length < row_same_bit.size()) {
+ row_same_bit.assign(row_same_bit.begin() + length, row_same_bit.end());
+ }
}
}
@@ -593,6 +602,7 @@ DataTypes Block::get_data_types() const {
void Block::clear() {
data.clear();
index_by_name.clear();
+ row_same_bit.clear();
}
void Block::clear_column_data(int column_size) noexcept {
@@ -607,17 +617,20 @@ void Block::clear_column_data(int column_size) noexcept {
DCHECK_EQ(d.column->use_count(), 1);
(*std::move(d.column)).assume_mutable()->clear();
}
+ row_same_bit.clear();
}
void Block::swap(Block& other) noexcept {
data.swap(other.data);
index_by_name.swap(other.index_by_name);
+ row_same_bit.swap(other.row_same_bit);
}
void Block::swap(Block&& other) noexcept {
clear();
data = std::move(other.data);
initialize_index_by_name();
+ row_same_bit = std::move(other.row_same_bit);
}
void Block::update_hash(SipHash& hash) const {
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 803fb82f97..b4a73a896f 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -62,6 +62,7 @@ private:
Container data;
IndexByName index_by_name;
+ std::vector<bool> row_same_bit;
int64_t _decompress_time_ns = 0;
int64_t _decompressed_bytes = 0;
@@ -349,6 +350,20 @@ public:
int64_t get_decompressed_bytes() const { return _decompressed_bytes; }
int64_t get_compress_time() const { return _compress_time_ns; }
+ void set_same_bit(std::vector<bool>::const_iterator begin,
+ std::vector<bool>::const_iterator end) {
+ row_same_bit.insert(row_same_bit.end(), begin, end);
+
+ DCHECK_EQ(row_same_bit.size(), rows());
+ }
+
+ bool get_same_bit(size_t position) {
+ if (position >= row_same_bit.size()) {
+ return false;
+ }
+ return row_same_bit[position];
+ }
+
private:
void erase_impl(size_t position);
void initialize_index_by_name();
diff --git a/be/src/vec/olap/block_reader.cpp b/be/src/vec/olap/block_reader.cpp
index be064d26ce..43336b1861 100644
--- a/be/src/vec/olap/block_reader.cpp
+++ b/be/src/vec/olap/block_reader.cpp
@@ -213,7 +213,7 @@ Status BlockReader::_agg_key_next_block(Block* block, MemPool* mem_pool, ObjectP
return res;
}
- if (!_next_row.is_same) {
+ if (!_get_next_row_same()) {
if (target_block_row == _batch_size) {
break;
}
@@ -413,6 +413,15 @@ void BlockReader::_update_agg_value(MutableColumns& columns, int begin, int end,
}
}
+bool BlockReader::_get_next_row_same() {
+ if (_next_row.is_same) {
+ return true;
+ } else {
+ auto block = _next_row.block.get();
+ return block->get_same_bit(_next_row.row_pos);
+ }
+}
+
ColumnPredicate* BlockReader::_parse_to_predicate(const FunctionFilter& function_filter) {
int32_t index = _tablet_schema->field_index(function_filter._col_name);
if (index < 0) {
diff --git a/be/src/vec/olap/block_reader.h b/be/src/vec/olap/block_reader.h
index 18e3e0e232..f727930022 100644
--- a/be/src/vec/olap/block_reader.h
+++ b/be/src/vec/olap/block_reader.h
@@ -79,6 +79,8 @@ private:
void _update_agg_value(MutableColumns& columns, int begin, int end, bool is_close = true);
+ bool _get_next_row_same();
+
VCollectIterator _vcollect_iter;
IteratorRowRef _next_row {{}, -1, false};
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp
index 7997a3d43d..2d6d54f762 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -124,6 +124,7 @@ bool VMergeIteratorContext::compare(const VMergeIteratorContext& rhs) const {
if (_is_unique) {
result ? set_skip(true) : rhs.set_skip(true);
}
+ result ? set_same(true) : rhs.set_same(true);
return result;
}
@@ -148,6 +149,8 @@ void VMergeIteratorContext::copy_rows(Block* block, bool advanced) {
d_cp->assume_mutable()->insert_range_from(*s_cp, start, _cur_batch_num);
}
+ const auto& tmp_pre_ctx_same_bit = get_pre_ctx_same();
+ dst.set_same_bit(tmp_pre_ctx_same_bit.begin(), tmp_pre_ctx_same_bit.begin() + _cur_batch_num);
_cur_batch_num = 0;
}
@@ -158,8 +161,9 @@ void VMergeIteratorContext::copy_rows(BlockView* view, bool advanced) {
size_t start = _index_in_block - _cur_batch_num + 1 - advanced;
DCHECK(start >= 0);
+ const auto& tmp_pre_ctx_same_bit = get_pre_ctx_same();
for (size_t i = 0; i < _cur_batch_num; ++i) {
- view->push_back({_block, static_cast<int>(start + i), false});
+ view->push_back({_block, static_cast<int>(start + i), tmp_pre_ctx_same_bit[i]});
}
_cur_batch_num = 0;
@@ -255,11 +259,14 @@ Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
if (valid()) {
RETURN_IF_ERROR(advance());
}
+ _pre_ctx_same_bit.reserve(_block_row_max);
+ _pre_ctx_same_bit.assign(_block_row_max, false);
return Status::OK();
}
Status VMergeIteratorContext::advance() {
_skip = false;
+ _same = false;
// NOTE: we increase _index_in_block directly to valid one check
do {
_index_in_block++;
diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h
index 210f47022a..e5bb36d0d3 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -120,6 +120,21 @@ public:
void set_skip(bool skip) const { _skip = skip; }
+ bool is_same() const { return _same; }
+
+ void set_same(bool same) const { _same = same; }
+
+ const std::vector<bool>& get_pre_ctx_same() const { return _pre_ctx_same_bit; }
+
+ void set_pre_ctx_same(VMergeIteratorContext* ctx) const {
+ int64_t index = ctx->get_cur_batch() - 1;
+ DCHECK(index >= 0);
+ DCHECK_LT(index, _pre_ctx_same_bit.size());
+ _pre_ctx_same_bit[index] = ctx->is_same();
+ }
+
+ size_t get_cur_batch() const { return _cur_batch_num; }
+
void add_cur_batch() { _cur_batch_num++; }
void reset_cur_batch() { _cur_batch_num = 0; }
@@ -137,6 +152,7 @@ private:
bool _is_reverse = false;
bool _valid = false;
mutable bool _skip = false;
+ mutable bool _same = false;
size_t _index_in_block = -1;
// 4096 minus 16 + 16 bytes padding that in padding pod array
int _block_row_max = 4064;
@@ -151,6 +167,7 @@ private:
std::shared_ptr<Block> _block;
// used to store data still on block view
std::list<std::shared_ptr<Block>> _block_list;
+ mutable std::vector<bool> _pre_ctx_same_bit;
};
class VMergeIterator : public RowwiseIterator {
@@ -221,6 +238,7 @@ private:
}
pre_ctx = ctx;
}
+ pre_ctx->set_pre_ctx_same(ctx);
if (UNLIKELY(_record_rowids)) {
_block_row_locations[row_idx] = ctx->current_row_location();
}
diff --git a/be/test/olap/segcompaction_test.cpp b/be/test/olap/segcompaction_test.cpp
index 5ac39db339..78fff2dd6e 100644
--- a/be/test/olap/segcompaction_test.cpp
+++ b/be/test/olap/segcompaction_test.cpp
@@ -310,6 +310,7 @@ TEST_F(SegCompactionTest, SegCompactionInterleaveWithBig_ooooOOoOooooooooO) {
}
s = rowset_writer->flush();
EXPECT_EQ(Status::OK(), s);
+ sleep(1);
}
num_segments = 1;
rows_per_segment = 6400;
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org