You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/18 06:40:42 UTC
[doris] branch dev-1.1.1 updated: [Bugfix](compaction) fix uniq key compaction bug (#10946)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
new fbd1f1b9f7 [Bugfix](compaction) fix uniq key compaction bug (#10946)
fbd1f1b9f7 is described below
commit fbd1f1b9f747b2777a156e15b43cd1a8d1616e98
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Mon Jul 18 14:40:37 2022 +0800
[Bugfix](compaction) fix uniq key compaction bug (#10946)
One rowset multi segments in uniq key compaction, segments rows will be
merged in generic_iterator but merged_rows not increased。
Compaction will failed in check_correctness, and make a tablet with
too much versions which lead to -235 load error.
Co-authored-by: yixiutt <yi...@selectdb.com>
---
be/src/olap/generic_iterators.cpp | 11 +++++++----
be/src/olap/generic_iterators.h | 2 +-
be/src/olap/reader.cpp | 1 +
be/src/olap/rowset/beta_rowset_reader.cpp | 6 +++---
be/src/olap/rowset/rowset_reader_context.h | 1 +
be/src/vec/olap/vgeneric_iterators.cpp | 8 +++++++-
be/src/vec/olap/vgeneric_iterators.h | 1 +
7 files changed, 21 insertions(+), 9 deletions(-)
diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp
index ba4cc5559d..2765096cb1 100644
--- a/be/src/olap/generic_iterators.cpp
+++ b/be/src/olap/generic_iterators.cpp
@@ -213,8 +213,8 @@ Status MergeIteratorContext::_load_next_block() {
class MergeIterator : public RowwiseIterator {
public:
// MergeIterator takes the ownership of input iterators
- MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique)
- : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) {
+ MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows)
+ : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _merged_rows(merged_rows), _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) {
// use for count the mem use of Block use in Merge
_mem_tracker = MemTracker::CreateTracker(-1, "MergeIterator", std::move(parent), false);
}
@@ -241,6 +241,7 @@ private:
bool _is_unique;
std::unique_ptr<Schema> _schema;
+ uint64_t* _merged_rows;
struct MergeContextComparator {
MergeContextComparator(int idx, bool is_unique)
@@ -314,6 +315,8 @@ Status MergeIterator::next_batch(RowBlockV2* block) {
RowBlockRow dst_row = block->row(row_idx++);
// copy current row to block
copy_row(&dst_row, ctx->current_row(), block->pool());
+ } else if (_merged_rows != nullptr) {
+ (*_merged_rows)++;
}
RETURN_IF_ERROR(ctx->advance());
@@ -390,11 +393,11 @@ Status UnionIterator::next_batch(RowBlockV2* block) {
return Status::EndOfFile("End of UnionIterator");
}
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) {
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows) {
if (inputs.size() == 1) {
return *(inputs.begin());
}
- return new MergeIterator(std::move(inputs), parent, sequence_id_idx, is_unique);
+ return new MergeIterator(std::move(inputs), parent, sequence_id_idx, is_unique, merged_rows);
}
RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent) {
diff --git a/be/src/olap/generic_iterators.h b/be/src/olap/generic_iterators.h
index 1a6ca90030..9cf40ad2fb 100644
--- a/be/src/olap/generic_iterators.h
+++ b/be/src/olap/generic_iterators.h
@@ -25,7 +25,7 @@ namespace doris {
//
// Inputs iterators' ownership is taken by created merge iterator. And client
// should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique);
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows);
// Create a union iterator for input iterators. Union iterator will read
// input iterators one by one.
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 819e84a9a1..9b0b7ceec0 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -226,6 +226,7 @@ OLAPStatus TabletReader::_capture_rs_readers(const ReaderParams& read_params,
_reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
*valid_rs_readers = *rs_readers;
+ _reader_context.merged_rows = &_merged_rows;
return OLAP_SUCCESS;
}
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp
index 2d8f79a8da..69127afe7a 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -120,14 +120,14 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
if (read_context->need_ordered_result &&
_rowset->rowset_meta()->is_segments_overlapping()) {
final_iterator = vectorized::new_merge_iterator(
- iterators, _parent_tracker, read_context->sequence_id_idx,
- read_context->is_unique, read_context->tablet_columns_convert_to_null_set);
+ iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique,
+ read_context->merged_rows, read_context->tablet_columns_convert_to_null_set);
} else {
final_iterator = vectorized::new_union_iterator(iterators, _parent_tracker);
}
} else {
if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) {
- final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique);
+ final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique, read_context->merged_rows);
} else {
final_iterator = new_union_iterator(iterators, _parent_tracker);
}
diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h
index 0bdb903e2e..374e5da857 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -64,6 +64,7 @@ struct RowsetReaderContext {
int batch_size = 1024;
bool is_vec = false;
bool is_unique = false;
+ uint64_t* merged_rows = nullptr;
// need pass this info to VMergeIterator
std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp
index 13b4acf30c..82faf8c14e 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -294,10 +294,12 @@ public:
// VMergeIterator takes the ownership of input iterators
VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent,
int sequence_id_idx, bool is_unique,
+ uint64_t* merged_rows,
const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set)
: _origin_iters(iters),
_sequence_id_idx(sequence_id_idx),
_is_unique(is_unique),
+ _merged_rows(merged_rows),
_tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) {
// use for count the mem use of Block use in Merge
_mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent, false);
@@ -338,6 +340,7 @@ private:
int block_row_max = 0;
int _sequence_id_idx = -1;
bool _is_unique = false;
+ uint64_t* _merged_rows;
const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr;
};
@@ -374,6 +377,8 @@ Status VMergeIterator::next_batch(vectorized::Block* block) {
if (!ctx->need_skip()) {
// copy current row to block
ctx->copy_row(block);
+ } else if(_merged_rows != nullptr){
+ (*_merged_rows)++;
}
RETURN_IF_ERROR(ctx->advance());
@@ -453,11 +458,12 @@ Status VUnionIterator::next_batch(vectorized::Block* block) {
RowwiseIterator* new_merge_iterator(
std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent,
int sequence_id_idx, bool is_unique,
+ uint64_t* merged_rows,
const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set) {
if (inputs.size() == 1) {
return *(inputs.begin());
}
- return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique,
+ return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique, merged_rows,
tablet_columns_convert_to_null_set);
}
diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h
index 0b9af85ef5..d1ebc0cd25 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -29,6 +29,7 @@ namespace vectorized {
// should delete returned iterator after usage.
RowwiseIterator* new_merge_iterator(
std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique,
+ uint64_t* merged_rows,
const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr);
// Create a union iterator for input iterators. Union iterator will read
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org