You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/18 06:40:42 UTC

[doris] branch dev-1.1.1 updated: [Bugfix](compaction) fix uniq key compaction bug (#10946)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch dev-1.1.1
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/dev-1.1.1 by this push:
     new fbd1f1b9f7 [Bugfix](compaction) fix uniq key compaction bug (#10946)
fbd1f1b9f7 is described below

commit fbd1f1b9f747b2777a156e15b43cd1a8d1616e98
Author: yixiutt <10...@users.noreply.github.com>
AuthorDate: Mon Jul 18 14:40:37 2022 +0800

    [Bugfix](compaction) fix uniq key compaction bug (#10946)
    
    One rowset multi segments in uniq key compaction, segments rows will be
    merged in generic_iterator but merged_rows not increased。
    Compaction will failed in check_correctness, and make a tablet with
    too much versions which lead to -235 load error.
    
    Co-authored-by: yixiutt <yi...@selectdb.com>
---
 be/src/olap/generic_iterators.cpp          | 11 +++++++----
 be/src/olap/generic_iterators.h            |  2 +-
 be/src/olap/reader.cpp                     |  1 +
 be/src/olap/rowset/beta_rowset_reader.cpp  |  6 +++---
 be/src/olap/rowset/rowset_reader_context.h |  1 +
 be/src/vec/olap/vgeneric_iterators.cpp     |  8 +++++++-
 be/src/vec/olap/vgeneric_iterators.h       |  1 +
 7 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp
index ba4cc5559d..2765096cb1 100644
--- a/be/src/olap/generic_iterators.cpp
+++ b/be/src/olap/generic_iterators.cpp
@@ -213,8 +213,8 @@ Status MergeIteratorContext::_load_next_block() {
 class MergeIterator : public RowwiseIterator {
 public:
     // MergeIterator takes the ownership of input iterators
-    MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique)
-        : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) {
+    MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows)
+        : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _merged_rows(merged_rows), _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) {
         // use for count the mem use of Block use in Merge
         _mem_tracker = MemTracker::CreateTracker(-1, "MergeIterator", std::move(parent), false);
     }
@@ -241,6 +241,7 @@ private:
     bool _is_unique;
 
     std::unique_ptr<Schema> _schema;
+    uint64_t* _merged_rows;
 
     struct MergeContextComparator {
         MergeContextComparator(int idx, bool is_unique)
@@ -314,6 +315,8 @@ Status MergeIterator::next_batch(RowBlockV2* block) {
             RowBlockRow dst_row = block->row(row_idx++);
             // copy current row to block
             copy_row(&dst_row, ctx->current_row(), block->pool());
+        } else if (_merged_rows != nullptr) {
+            (*_merged_rows)++;
         }
 
         RETURN_IF_ERROR(ctx->advance());
@@ -390,11 +393,11 @@ Status UnionIterator::next_batch(RowBlockV2* block) {
     return Status::EndOfFile("End of UnionIterator");
 }
 
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) {
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows) {
     if (inputs.size() == 1) {
         return *(inputs.begin());
     }
-    return new MergeIterator(std::move(inputs), parent, sequence_id_idx, is_unique);
+    return new MergeIterator(std::move(inputs), parent, sequence_id_idx, is_unique, merged_rows);
 }
 
 RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent) {
diff --git a/be/src/olap/generic_iterators.h b/be/src/olap/generic_iterators.h
index 1a6ca90030..9cf40ad2fb 100644
--- a/be/src/olap/generic_iterators.h
+++ b/be/src/olap/generic_iterators.h
@@ -25,7 +25,7 @@ namespace doris {
 //
 // Inputs iterators' ownership is taken by created merge iterator. And client
 // should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique);
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique, uint64_t* merged_rows);
 
 // Create a union iterator for input iterators. Union iterator will read
 // input iterators one by one.
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 819e84a9a1..9b0b7ceec0 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -226,6 +226,7 @@ OLAPStatus TabletReader::_capture_rs_readers(const ReaderParams& read_params,
     _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
 
     *valid_rs_readers = *rs_readers;
+    _reader_context.merged_rows = &_merged_rows;
 
     return OLAP_SUCCESS;
 }
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp
index 2d8f79a8da..69127afe7a 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -120,14 +120,14 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
         if (read_context->need_ordered_result &&
             _rowset->rowset_meta()->is_segments_overlapping()) {
             final_iterator = vectorized::new_merge_iterator(
-                    iterators, _parent_tracker, read_context->sequence_id_idx,
-                    read_context->is_unique, read_context->tablet_columns_convert_to_null_set);
+                    iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique,
+                    read_context->merged_rows, read_context->tablet_columns_convert_to_null_set);
         } else {
             final_iterator = vectorized::new_union_iterator(iterators, _parent_tracker);
         }
     } else {
         if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) {
-            final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique);
+            final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique, read_context->merged_rows);
         } else {
             final_iterator = new_union_iterator(iterators, _parent_tracker);
         }
diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h
index 0bdb903e2e..374e5da857 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -64,6 +64,7 @@ struct RowsetReaderContext {
     int batch_size = 1024;
     bool is_vec = false;
     bool is_unique = false;
+    uint64_t* merged_rows = nullptr;
 
     // need pass this info to VMergeIterator
     std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr;
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp
index 13b4acf30c..82faf8c14e 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -294,10 +294,12 @@ public:
     // VMergeIterator takes the ownership of input iterators
     VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent,
                    int sequence_id_idx, bool is_unique,
+                   uint64_t* merged_rows,
                    const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set)
             : _origin_iters(iters),
               _sequence_id_idx(sequence_id_idx),
               _is_unique(is_unique),
+              _merged_rows(merged_rows),
               _tablet_columns_convert_to_null_set(tablet_columns_convert_to_null_set) {
         // use for count the mem use of Block use in Merge
         _mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent, false);
@@ -338,6 +340,7 @@ private:
     int block_row_max = 0;
     int _sequence_id_idx = -1;
     bool _is_unique = false;
+    uint64_t* _merged_rows;
     const std::unordered_set<uint32_t>* _tablet_columns_convert_to_null_set = nullptr;
 };
 
@@ -374,6 +377,8 @@ Status VMergeIterator::next_batch(vectorized::Block* block) {
         if (!ctx->need_skip()) {
             // copy current row to block
             ctx->copy_row(block);
+        } else if(_merged_rows != nullptr){
+            (*_merged_rows)++;
         }
 
         RETURN_IF_ERROR(ctx->advance());
@@ -453,11 +458,12 @@ Status VUnionIterator::next_batch(vectorized::Block* block) {
 RowwiseIterator* new_merge_iterator(
         std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent,
         int sequence_id_idx, bool is_unique,
+        uint64_t* merged_rows,
         const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set) {
     if (inputs.size() == 1) {
         return *(inputs.begin());
     }
-    return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique,
+    return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique, merged_rows,
                               tablet_columns_convert_to_null_set);
 }
 
diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h
index 0b9af85ef5..d1ebc0cd25 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -29,6 +29,7 @@ namespace vectorized {
 // should delete returned iterator after usage.
 RowwiseIterator* new_merge_iterator(
         std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique,
+        uint64_t* merged_rows,
         const std::unordered_set<uint32_t>* tablet_columns_convert_to_null_set = nullptr);
 
 // Create a union iterator for input iterators. Union iterator will read


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org