You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/06/01 15:29:12 UTC

[incubator-doris] 17/22: [Bug][Fix] One Rowset have same key output in unique table (#9858)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit f5386cb8e7b38471d6f19c56b3e51779dd6d3260
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue May 31 12:29:16 2022 +0800

    [Bug][Fix] One Rowset have same key output in unique table (#9858)
    
    Co-authored-by: lihaopeng <li...@baidu.com>
---
 be/src/olap/collect_iterator.cpp             |  2 +-
 be/src/olap/generic_iterators.cpp            | 52 ++++++++++++++++--------
 be/src/olap/generic_iterators.h              |  2 +-
 be/src/olap/reader.cpp                       |  1 +
 be/src/olap/rowset/beta_rowset_reader.cpp    |  4 +-
 be/src/olap/rowset/rowset_reader_context.h   |  1 +
 be/src/olap/schema_change.cpp                |  2 +
 be/src/olap/tuple_reader.cpp                 | 25 ++++++++----
 be/src/vec/olap/vcollect_iterator.cpp        |  8 +---
 be/src/vec/olap/vgeneric_iterators.cpp       | 44 +++++++++++++-------
 be/src/vec/olap/vgeneric_iterators.h         |  2 +-
 be/test/olap/generic_iterators_test.cpp      | 38 ++++++++++++++++-
 be/test/vec/exec/vgeneric_iterators_test.cpp | 61 ++++++++++++++++++++++------
 13 files changed, 175 insertions(+), 67 deletions(-)

diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp
index 404da7e35d..df1f3231a8 100644
--- a/be/src/olap/collect_iterator.cpp
+++ b/be/src/olap/collect_iterator.cpp
@@ -117,7 +117,7 @@ bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a
         return cmp_res > 0;
     }
 
-    // Second: If sequence_id_idx != 0 means we need to compare sequence. sequence only use
+    // Second: If _sequence_id_idx != 0 means we need to compare sequence. sequence only use
     // in unique key. so keep reverse order here
     if (_sequence_id_idx != -1) {
         auto seq_first_cell = first->cell(_sequence_id_idx);
diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp
index 1b8f176637..ba4cc5559d 100644
--- a/be/src/olap/generic_iterators.cpp
+++ b/be/src/olap/generic_iterators.cpp
@@ -150,6 +150,10 @@ public:
 
     uint64_t data_id() const { return _iter->data_id(); }
 
+    bool need_skip() const { return _skip; }
+
+    void set_skip(bool skip) const { _skip = skip; }
+
 private:
     // Load next block into _block
     Status _load_next_block();
@@ -161,6 +165,7 @@ private:
     RowBlockV2 _block;
 
     bool _valid = false;
+    mutable bool _skip = false;
     size_t _index_in_block = -1;
 };
 
@@ -174,6 +179,7 @@ Status MergeIteratorContext::init(const StorageReadOptions& opts) {
 }
 
 Status MergeIteratorContext::advance() {
+    _skip = false;
     // NOTE: we increase _index_in_block directly to valid one check
     do {
         _index_in_block++;
@@ -207,8 +213,8 @@ Status MergeIteratorContext::_load_next_block() {
 class MergeIterator : public RowwiseIterator {
 public:
     // MergeIterator takes the ownership of input iterators
-    MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx)
-        : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _merge_heap(MergeContextComparator(_sequence_id_idx)) {
+    MergeIterator(std::vector<RowwiseIterator*> iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique)
+        : _origin_iters(std::move(iters)), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique), _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) {
         // use for count the mem use of Block use in Merge
         _mem_tracker = MemTracker::CreateTracker(-1, "MergeIterator", std::move(parent), false);
     }
@@ -232,11 +238,13 @@ private:
     std::vector<RowwiseIterator*> _origin_iters;
 
     int _sequence_id_idx;
+    bool _is_unique;
 
     std::unique_ptr<Schema> _schema;
 
     struct MergeContextComparator {
-        explicit MergeContextComparator(int idx) : sequence_id_idx(idx) {};
+        MergeContextComparator(int idx, bool is_unique)
+                : _sequence_id_idx(idx), _is_unique(is_unique) {};
 
         bool operator()(const MergeIteratorContext* lhs, const MergeIteratorContext* rhs) const {
             auto lhs_row = lhs->current_row();
@@ -245,23 +253,29 @@ private:
             if (cmp_res != 0) {
                 return cmp_res > 0;
             }
-            
-            // Second: If sequence_id_idx != 0 means we need to compare sequence. sequence only use
+            auto res = 0;
+            // Second: If _sequence_id_idx != 0 means we need to compare sequence. sequence only use
             // in unique key. so keep reverse order of sequence id here
-            if (sequence_id_idx != -1) {
-                auto l_cell = lhs_row.cell(sequence_id_idx);
-                auto r_cell = rhs_row.cell(sequence_id_idx);
-                auto res = lhs_row.schema()->column(sequence_id_idx)->compare_cell(l_cell, r_cell);
-                if (res != 0) return res < 0;
+            if (_sequence_id_idx != -1) {
+                auto l_cell = lhs_row.cell(_sequence_id_idx);
+                auto r_cell = rhs_row.cell(_sequence_id_idx);
+                res = lhs_row.schema()->column(_sequence_id_idx)->compare_cell(l_cell, r_cell);
             }
+
             // if row cursors equal, compare segment id.
             // here we sort segment id in reverse order, because of the row order in AGG_KEYS
             // dose no matter, but in UNIQUE_KEYS table we only read the latest is one, so we
             // return the row in reverse order of segment id
-            return lhs->data_id() < rhs->data_id();
+            bool result = res == 0 ? lhs->data_id() < rhs->data_id() : res < 0;
+            if (_is_unique) {
+                result ? lhs->set_skip(true) : rhs->set_skip(true);
+            }
+
+            return result;
         }
 
-        int sequence_id_idx;
+        int _sequence_id_idx;
+        bool _is_unique;
     };
 
     using MergeHeap = std::priority_queue<MergeIteratorContext*, 
@@ -292,13 +306,15 @@ Status MergeIterator::init(const StorageReadOptions& opts) {
 
 Status MergeIterator::next_batch(RowBlockV2* block) {
     size_t row_idx = 0;
-    for (; row_idx < block->capacity() && !_merge_heap.empty(); ++row_idx) {
+    for (; row_idx < block->capacity() && !_merge_heap.empty();) {
         auto ctx = _merge_heap.top();
         _merge_heap.pop();
 
-        RowBlockRow dst_row = block->row(row_idx);
-        // copy current row to block
-        copy_row(&dst_row, ctx->current_row(), block->pool());
+        if (!ctx->need_skip()) {
+            RowBlockRow dst_row = block->row(row_idx++);
+            // copy current row to block
+            copy_row(&dst_row, ctx->current_row(), block->pool());
+        }
 
         RETURN_IF_ERROR(ctx->advance());
         if (ctx->valid()) {
@@ -374,11 +390,11 @@ Status UnionIterator::next_batch(RowBlockV2* block) {
     return Status::EndOfFile("End of UnionIterator");
 }
 
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx) {
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) {
     if (inputs.size() == 1) {
         return *(inputs.begin());
     }
-    return new MergeIterator(std::move(inputs), parent, sequence_id_idx);
+    return new MergeIterator(std::move(inputs), parent, sequence_id_idx, is_unique);
 }
 
 RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent) {
diff --git a/be/src/olap/generic_iterators.h b/be/src/olap/generic_iterators.h
index e8f4528885..1a6ca90030 100644
--- a/be/src/olap/generic_iterators.h
+++ b/be/src/olap/generic_iterators.h
@@ -25,7 +25,7 @@ namespace doris {
 //
 // Inputs iterators' ownership is taken by created merge iterator. And client
 // should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx);
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique);
 
 // Create a union iterator for input iterators. Union iterator will read
 // input iterators one by one.
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 4542ae3f65..819e84a9a1 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -223,6 +223,7 @@ OLAPStatus TabletReader::_capture_rs_readers(const ReaderParams& read_params,
     _reader_context.use_page_cache = read_params.use_page_cache;
     _reader_context.sequence_id_idx = _sequence_col_idx;
     _reader_context.batch_size = _batch_size;
+    _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
 
     *valid_rs_readers = *rs_readers;
 
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp
index 4e475fa9b1..3148656dd6 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -120,13 +120,13 @@ OLAPStatus BetaRowsetReader::init(RowsetReaderContext* read_context) {
     RowwiseIterator* final_iterator;
     if (config::enable_storage_vectorization && read_context->is_vec) {
         if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) {
-            final_iterator = vectorized::new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx);
+            final_iterator = vectorized::new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique);
         } else {
             final_iterator = vectorized::new_union_iterator(iterators, _parent_tracker);
         }
     } else {
         if (read_context->need_ordered_result && _rowset->rowset_meta()->is_segments_overlapping()) {
-            final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx);
+            final_iterator = new_merge_iterator(iterators, _parent_tracker, read_context->sequence_id_idx, read_context->is_unique);
         } else {
             final_iterator = new_union_iterator(iterators, _parent_tracker);
         }
diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h
index 07d9340fdf..0ae42f6cf4 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -63,6 +63,7 @@ struct RowsetReaderContext {
     int sequence_id_idx = -1;
     int batch_size = 1024;
     bool is_vec = false;
+    bool is_unique = false;
 };
 
 } // namespace doris
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index 0cd9ae7567..b27fea5f71 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1505,6 +1505,7 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
         // for schema change, seek_columns is the same to return_columns
         reader_context.seek_columns = &return_columns;
         reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
+        reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
 
         auto mem_tracker = MemTracker::CreateTracker(-1, "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-"
                                                          + std::to_string(new_tablet->tablet_id()), _mem_tracker, true, false, MemTrackerLevel::TASK);
@@ -1724,6 +1725,7 @@ OLAPStatus SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tabl
     reader_context.return_columns = &return_columns;
     reader_context.seek_columns = &return_columns;
     reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
+    reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
 
     RowsetReaderSharedPtr rowset_reader;
     RETURN_NOT_OK((*base_rowset)->create_reader(_mem_tracker, &rowset_reader));
diff --git a/be/src/olap/tuple_reader.cpp b/be/src/olap/tuple_reader.cpp
index 5c15c2b42f..33640777f5 100644
--- a/be/src/olap/tuple_reader.cpp
+++ b/be/src/olap/tuple_reader.cpp
@@ -185,15 +185,24 @@ OLAPStatus TupleReader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem
         // in UNIQUE_KEY highest version is the final result, there is no need to
         // merge the lower versions
         direct_copy_row(row_cursor, *_next_key);
-        // skip the lower version rows;
-        auto res = _collect_iter.next(&_next_key, &_next_delete_flag);
-        if (LIKELY(res != OLAP_ERR_DATA_EOF)) {
-            if (UNLIKELY(res != OLAP_SUCCESS)) {
-                LOG(WARNING) << "next failed: " << res;
-                return res;
+        while (_next_key) {
+            // skip the lower version rows;
+            auto res = _collect_iter.next(&_next_key, &_next_delete_flag);
+            if (LIKELY(res != OLAP_ERR_DATA_EOF)) {
+                if (UNLIKELY(res != OLAP_SUCCESS)) {
+                    LOG(WARNING) << "next failed: " << res;
+                    return res;
+                }
+
+                if (!equal_row(_key_cids, *row_cursor, *_next_key)) {
+                    agg_finalize_row(_value_cids, row_cursor, mem_pool);
+                    break;
+                }
+                _merged_rows++;
+                cur_delete_flag = _next_delete_flag;
+            } else {
+                break;
             }
-            agg_finalize_row(_value_cids, row_cursor, mem_pool);
-            cur_delete_flag = _next_delete_flag;
         }
 
         // if reader needs to filter delete row and current delete_flag is true,
diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp
index 411c96f4e5..0d351a55fb 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -142,14 +142,8 @@ bool VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, L
     // for UNIQUE_KEYS just read the highest version and no need agg_update.
     // for AGG_KEYS if a version is deleted, the lower version no need to agg_update
     bool lower = (cmp_res != 0) ? (cmp_res < 0) : (lhs->version() < rhs->version());
+    lower ? lhs->set_same(true) : rhs->set_same(true);
 
-    // if lhs or rhs set same is true, means some same value already output, so need to
-    // set another is same
-    if (lower) {
-        lhs->is_same() ? rhs->set_same(true) : lhs->set_same(true);
-    } else {
-        rhs->is_same() ? lhs->set_same(true) : rhs->set_same(true);
-    }
     return lower;
 }
 
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp
index 19e7be78b0..2ffb4e224f 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -120,7 +120,7 @@ Status VAutoIncrementIterator::init(const StorageReadOptions& opts) {
 //      }
 class VMergeIteratorContext {
 public:
-    VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx) : _iter(iter), _sequence_id_idx(sequence_id_idx) {}
+    VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique) : _iter(iter), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique) {}
     VMergeIteratorContext(const VMergeIteratorContext&) = delete;
     VMergeIteratorContext(VMergeIteratorContext&&) = delete;
     VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete;
@@ -165,14 +165,18 @@ public:
         if (cmp_res != 0) {
             return cmp_res > 0;
         }
-        
+
+        auto col_cmp_res = 0;
         if (_sequence_id_idx != -1) {
-            int col_cmp_res = this->_block.compare_column_at(_index_in_block, rhs._index_in_block, _sequence_id_idx, rhs._block, -1);
-            if (col_cmp_res != 0) {
-                return col_cmp_res < 0;
-            }
+            col_cmp_res = this->_block.compare_column_at(_index_in_block, rhs._index_in_block,
+                                                         _sequence_id_idx, rhs._block, -1);
         }
-        return this->data_id() < rhs.data_id();
+        auto result = col_cmp_res == 0 ? this->data_id() < rhs.data_id() : col_cmp_res < 0;
+
+        if (_is_unique) {
+            result ? this->set_skip(true) : rhs.set_skip(true);
+        }
+        return result;
     }
 
     void copy_row(vectorized::Block* block) {
@@ -203,6 +207,10 @@ public:
 
     uint64_t data_id() const { return _iter->data_id(); }
 
+    bool need_skip() const { return _skip; }
+
+    void set_skip(bool skip) const { _skip = skip; }
+
 private:
     // Load next block into _block
     Status _load_next_block();
@@ -212,10 +220,12 @@ private:
     // used to store data load from iterator->next_batch(Vectorized::Block*)
     vectorized::Block _block;
 
+    int _sequence_id_idx = -1;
+    bool _is_unique = false;
     bool _valid = false;
+    mutable bool _skip = false;
     size_t _index_in_block = -1;
     int _block_row_max = 4096;
-    int _sequence_id_idx = -1;
 };
 
 Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -230,6 +240,7 @@ Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
 }
 
 Status VMergeIteratorContext::advance() {
+    _skip = false;
     // NOTE: we increase _index_in_block directly to valid one check
     do {
         _index_in_block++;
@@ -263,8 +274,8 @@ Status VMergeIteratorContext::_load_next_block() {
 class VMergeIterator : public RowwiseIterator {
 public:
     // VMergeIterator takes the ownership of input iterators
-    VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx) : 
-        _origin_iters(iters),_sequence_id_idx(sequence_id_idx) {
+    VMergeIterator(std::vector<RowwiseIterator*>& iters, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) : 
+        _origin_iters(iters),_sequence_id_idx(sequence_id_idx), _is_unique(is_unique) {
         // use for count the mem use of Block use in Merge
         _mem_tracker = MemTracker::CreateTracker(-1, "VMergeIterator", parent, false);
     }
@@ -303,6 +314,7 @@ private:
 
     int block_row_max = 0;
     int _sequence_id_idx = -1;
+    bool _is_unique = false;
 };
 
 Status VMergeIterator::init(const StorageReadOptions& opts) {
@@ -312,7 +324,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts) {
     _schema = &(*_origin_iters.begin())->schema();
 
     for (auto iter : _origin_iters) {
-        auto ctx = std::make_unique<VMergeIteratorContext>(iter, _sequence_id_idx);
+        auto ctx = std::make_unique<VMergeIteratorContext>(iter, _sequence_id_idx, _is_unique);
         RETURN_IF_ERROR(ctx->init(opts));
         if (!ctx->valid()) {
             continue;
@@ -335,8 +347,10 @@ Status VMergeIterator::next_batch(vectorized::Block* block) {
         auto ctx = _merge_heap.top();
         _merge_heap.pop();
 
-        // copy current row to block
-        ctx->copy_row(block);
+        if (!ctx->need_skip()) {
+            // copy current row to block
+            ctx->copy_row(block);
+        }
 
         RETURN_IF_ERROR(ctx->advance());
         if (ctx->valid()) {
@@ -412,11 +426,11 @@ Status VUnionIterator::next_batch(vectorized::Block* block) {
 }
 
 
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx) {
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique) {
     if (inputs.size() == 1) {
         return *(inputs.begin());
     }
-    return new VMergeIterator(inputs, parent, sequence_id_idx);
+    return new VMergeIterator(inputs, parent, sequence_id_idx, is_unique);
 }
 
 RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent) {
diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h
index af9733bf44..063d07da51 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -27,7 +27,7 @@ namespace vectorized {
 //
 // Inputs iterators' ownership is taken by created merge iterator. And client
 // should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx);
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, std::shared_ptr<MemTracker> parent, int sequence_id_idx, bool is_unique);
 
 // Create a union iterator for input iterators. Union iterator will read
 // input iterators one by one.
diff --git a/be/test/olap/generic_iterators_test.cpp b/be/test/olap/generic_iterators_test.cpp
index b73ad0271e..de25b071bb 100644
--- a/be/test/olap/generic_iterators_test.cpp
+++ b/be/test/olap/generic_iterators_test.cpp
@@ -115,7 +115,7 @@ TEST(GenericIteratorsTest, Union) {
     delete iter;
 }
 
-TEST(GenericIteratorsTest, Merge) {
+TEST(GenericIteratorsTest, MergeAgg) {
     auto schema = create_schema();
     std::vector<RowwiseIterator*> inputs;
 
@@ -124,7 +124,7 @@ TEST(GenericIteratorsTest, Merge) {
     inputs.push_back(new_auto_increment_iterator(schema, 300));
 
     auto iter = new_merge_iterator(
-            std::move(inputs), MemTracker::CreateTracker(-1, "MergeIterator", nullptr, false), -1);
+            std::move(inputs), MemTracker::CreateTracker(-1, "MergeIterator", nullptr, false), -1, false);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     ASSERT_TRUE(st.ok());
@@ -158,6 +158,40 @@ TEST(GenericIteratorsTest, Merge) {
     delete iter;
 }
 
+TEST(GenericIteratorsTest, MergeUnique) {
+    auto schema = create_schema();
+    std::vector<RowwiseIterator*> inputs;
+
+    inputs.push_back(new_auto_increment_iterator(schema, 100));
+    inputs.push_back(new_auto_increment_iterator(schema, 200));
+    inputs.push_back(new_auto_increment_iterator(schema, 300));
+
+    auto iter = new_merge_iterator(std::move(inputs), -1, true);
+    StorageReadOptions opts;
+    auto st = iter->init(opts);
+    EXPECT_TRUE(st.ok());
+
+    RowBlockV2 block(schema, 128);
+
+    size_t row_count = 0;
+    do {
+        block.clear();
+        st = iter->next_batch(&block);
+        for (int i = 0; i < block.num_rows(); ++i) {
+            size_t base_value = row_count;
+            auto row = block.row(i);
+            EXPECT_EQ(base_value, *(int16_t*)row.cell_ptr(0));
+            EXPECT_EQ(base_value + 1, *(int32_t*)row.cell_ptr(1));
+            EXPECT_EQ(base_value + 2, *(int64_t*)row.cell_ptr(2));
+            row_count++;
+        }
+    } while (st.ok());
+    EXPECT_TRUE(st.is_end_of_file());
+    EXPECT_EQ(300, row_count);
+
+    delete iter;
+}
+
 } // namespace doris
 
 int main(int argc, char** argv) {
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp b/be/test/vec/exec/vgeneric_iterators_test.cpp
index 405c9a9103..6189fdd821 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -140,8 +140,8 @@ TEST(VGenericIteratorsTest, Union) {
     delete iter;
 }
 
-TEST(VGenericIteratorsTest, Merge) {
-    ASSERT_TRUE(1);
+TEST(VGenericIteratorsTest, MergeAgg) {
+    EXPECT_TRUE(1);
     auto schema = create_schema();
     std::vector<RowwiseIterator*> inputs;
 
@@ -149,7 +149,7 @@ TEST(VGenericIteratorsTest, Merge) {
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
     inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
 
-    auto iter = vectorized::new_merge_iterator(inputs, MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), -1);
+    auto iter = vectorized::new_merge_iterator(inputs, MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), -1, false);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     ASSERT_TRUE(st.ok());
@@ -189,6 +189,47 @@ TEST(VGenericIteratorsTest, Merge) {
     delete iter;
 }
 
+TEST(VGenericIteratorsTest, MergeUnique) {
+    EXPECT_TRUE(1);
+    auto schema = create_schema();
+    std::vector<RowwiseIterator*> inputs;
+
+    inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
+    inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
+    inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
+
+    auto iter = vectorized::new_merge_iterator(inputs, -1, true);
+    StorageReadOptions opts;
+    auto st = iter->init(opts);
+    EXPECT_TRUE(st.ok());
+
+    vectorized::Block block;
+    create_block(schema, block);
+
+    do {
+        st = iter->next_batch(&block);
+    } while (st.ok());
+
+    EXPECT_TRUE(st.is_end_of_file());
+    EXPECT_EQ(block.rows(), 300);
+
+    auto c0 = block.get_by_position(0).column;
+    auto c1 = block.get_by_position(1).column;
+    auto c2 = block.get_by_position(2).column;
+
+    size_t row_count = 0;
+    for (size_t i = 0; i < block.rows(); ++i) {
+        size_t base_value = row_count;
+
+        EXPECT_EQ(base_value, (*c0)[i].get<int>());
+        EXPECT_EQ(base_value + 1, (*c1)[i].get<int>());
+        EXPECT_EQ(base_value + 2, (*c2)[i].get<int>());
+        row_count++;
+    }
+
+    delete iter;
+}
+
 // only used for Seq Column UT
 class SeqColumnUtIterator : public RowwiseIterator {
 public:
@@ -276,7 +317,7 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
         inputs.push_back(new SeqColumnUtIterator(schema, num_rows, rows_begin, seq_column_id, seq_id_in_every_file));
     }
 
-    auto iter = vectorized::new_merge_iterator(inputs, MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), seq_column_id);
+    auto iter = vectorized::new_merge_iterator(inputs, MemTracker::CreateTracker(-1, "VMergeIterator", nullptr, false), seq_column_id, true);
     StorageReadOptions opts;
     auto st = iter->init(opts);
     ASSERT_TRUE(st.ok());
@@ -288,18 +329,14 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
         st = iter->next_batch(&block);
     } while (st.ok());
 
-    ASSERT_TRUE(st.is_end_of_file());
-    ASSERT_EQ(block.rows(), seg_iter_num);
+    EXPECT_TRUE(st.is_end_of_file());
+    EXPECT_EQ(block.rows(), 1);
 
     auto col0 = block.get_by_position(0).column;
     auto col1 = block.get_by_position(1).column;
     auto seq_col = block.get_by_position(seq_column_id).column;
-
-    for (size_t i = 0; i < seg_iter_num; i++) {
-        size_t expected_value = seg_iter_num - i - 1; // in Descending 
-        size_t actual_value = (*seq_col)[i].get<int>();
-        ASSERT_EQ(expected_value, actual_value);
-    }
+    size_t actual_value = (*seq_col)[0].get<int>();
+    EXPECT_EQ(seg_iter_num - 1, actual_value);
 
     delete iter;
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org