You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2022/05/31 04:29:21 UTC
[incubator-doris] branch master updated: [Bug][Fix] One Rowset have same key output in unique table (#9858)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 0cba6b7d95 [Bug][Fix] One Rowset have same key output in unique table (#9858)
0cba6b7d95 is described below
commit 0cba6b7d95fcf8e9aeb86ce3c62f36fd2cc47a78
Author: HappenLee <ha...@hotmail.com>
AuthorDate: Tue May 31 12:29:16 2022 +0800
[Bug][Fix] One Rowset have same key output in unique table (#9858)
Co-authored-by: lihaopeng <li...@baidu.com>
---
be/src/olap/collect_iterator.cpp | 2 +-
be/src/olap/generic_iterators.cpp | 53 +++++++++++++++++---------
be/src/olap/generic_iterators.h | 3 +-
be/src/olap/reader.cpp | 1 +
be/src/olap/rowset/beta_rowset_reader.cpp | 7 ++--
be/src/olap/rowset/rowset_reader_context.h | 1 +
be/src/olap/schema_change.cpp | 2 +
be/src/olap/storage_migration_v2.cpp | 1 +
be/src/olap/tuple_reader.cpp | 25 ++++++++----
be/src/vec/olap/vcollect_iterator.cpp | 8 +---
be/src/vec/olap/vgeneric_iterators.cpp | 46 ++++++++++++++--------
be/src/vec/olap/vgeneric_iterators.h | 3 +-
be/test/olap/generic_iterators_test.cpp | 38 ++++++++++++++++++-
be/test/vec/exec/vgeneric_iterators_test.cpp | 57 +++++++++++++++++++++++-----
14 files changed, 181 insertions(+), 66 deletions(-)
diff --git a/be/src/olap/collect_iterator.cpp b/be/src/olap/collect_iterator.cpp
index 1ce67b6f12..cf7b079004 100644
--- a/be/src/olap/collect_iterator.cpp
+++ b/be/src/olap/collect_iterator.cpp
@@ -120,7 +120,7 @@ bool CollectIterator::LevelIteratorComparator::operator()(const LevelIterator* a
return cmp_res > 0;
}
- // Second: If sequence_id_idx != 0 means we need to compare sequence. sequence only use
+ // Second: If _sequence_id_idx != 0 means we need to compare sequence. sequence only use
// in unique key. so keep reverse order here
if (_sequence_id_idx != -1) {
auto seq_first_cell = first->cell(_sequence_id_idx);
diff --git a/be/src/olap/generic_iterators.cpp b/be/src/olap/generic_iterators.cpp
index 47330907d5..3fb3f7a1e7 100644
--- a/be/src/olap/generic_iterators.cpp
+++ b/be/src/olap/generic_iterators.cpp
@@ -149,6 +149,10 @@ public:
uint64_t data_id() const { return _iter->data_id(); }
+ bool need_skip() const { return _skip; }
+
+ void set_skip(bool skip) const { _skip = skip; }
+
private:
// Load next block into _block
Status _load_next_block();
@@ -160,6 +164,7 @@ private:
RowBlockV2 _block;
bool _valid = false;
+ mutable bool _skip = false;
size_t _index_in_block = -1;
};
@@ -173,6 +178,7 @@ Status MergeIteratorContext::init(const StorageReadOptions& opts) {
}
Status MergeIteratorContext::advance() {
+ _skip = false;
// NOTE: we increase _index_in_block directly to valid one check
do {
_index_in_block++;
@@ -206,10 +212,11 @@ Status MergeIteratorContext::_load_next_block() {
class MergeIterator : public RowwiseIterator {
public:
// MergeIterator takes the ownership of input iterators
- MergeIterator(std::vector<RowwiseIterator*> iters, int sequence_id_idx)
+ MergeIterator(std::vector<RowwiseIterator*> iters, int sequence_id_idx, bool is_unique)
: _origin_iters(std::move(iters)),
_sequence_id_idx(sequence_id_idx),
- _merge_heap(MergeContextComparator(_sequence_id_idx)) {}
+ _is_unique(is_unique),
+ _merge_heap(MergeContextComparator(_sequence_id_idx, is_unique)) {}
~MergeIterator() override {
while (!_merge_heap.empty()) {
@@ -230,11 +237,13 @@ private:
std::vector<RowwiseIterator*> _origin_iters;
int _sequence_id_idx;
+ bool _is_unique;
std::unique_ptr<Schema> _schema;
struct MergeContextComparator {
- explicit MergeContextComparator(int idx) : sequence_id_idx(idx) {};
+ MergeContextComparator(int idx, bool is_unique)
+ : _sequence_id_idx(idx), _is_unique(is_unique) {};
bool operator()(const MergeIteratorContext* lhs, const MergeIteratorContext* rhs) const {
auto lhs_row = lhs->current_row();
@@ -244,22 +253,29 @@ private:
return cmp_res > 0;
}
- // Second: If sequence_id_idx != 0 means we need to compare sequence. sequence only use
+ auto res = 0;
+ // Second: If _sequence_id_idx != 0 means we need to compare sequence. sequence only use
// in unique key. so keep reverse order of sequence id here
- if (sequence_id_idx != -1) {
- auto l_cell = lhs_row.cell(sequence_id_idx);
- auto r_cell = rhs_row.cell(sequence_id_idx);
- auto res = lhs_row.schema()->column(sequence_id_idx)->compare_cell(l_cell, r_cell);
- if (res != 0) return res < 0;
+ if (_sequence_id_idx != -1) {
+ auto l_cell = lhs_row.cell(_sequence_id_idx);
+ auto r_cell = rhs_row.cell(_sequence_id_idx);
+ res = lhs_row.schema()->column(_sequence_id_idx)->compare_cell(l_cell, r_cell);
}
+
// if row cursors equal, compare segment id.
// here we sort segment id in reverse order, because of the row order in AGG_KEYS
// dose no matter, but in UNIQUE_KEYS table we only read the latest is one, so we
// return the row in reverse order of segment id
- return lhs->data_id() < rhs->data_id();
+ bool result = res == 0 ? lhs->data_id() < rhs->data_id() : res < 0;
+ if (_is_unique) {
+ result ? lhs->set_skip(true) : rhs->set_skip(true);
+ }
+
+ return result;
}
- int sequence_id_idx;
+ int _sequence_id_idx;
+ bool _is_unique;
};
using MergeHeap = std::priority_queue<MergeIteratorContext*, std::vector<MergeIteratorContext*>,
@@ -289,13 +305,15 @@ Status MergeIterator::init(const StorageReadOptions& opts) {
Status MergeIterator::next_batch(RowBlockV2* block) {
size_t row_idx = 0;
- for (; row_idx < block->capacity() && !_merge_heap.empty(); ++row_idx) {
+ for (; row_idx < block->capacity() && !_merge_heap.empty();) {
auto ctx = _merge_heap.top();
_merge_heap.pop();
- RowBlockRow dst_row = block->row(row_idx);
- // copy current row to block
- copy_row(&dst_row, ctx->current_row(), block->pool());
+ if (!ctx->need_skip()) {
+ RowBlockRow dst_row = block->row(row_idx++);
+ // copy current row to block
+ copy_row(&dst_row, ctx->current_row(), block->pool());
+ }
RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
@@ -369,11 +387,12 @@ Status UnionIterator::next_batch(RowBlockV2* block) {
return Status::EndOfFile("End of UnionIterator");
}
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, int sequence_id_idx) {
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, int sequence_id_idx,
+ bool is_unique) {
if (inputs.size() == 1) {
return *(inputs.begin());
}
- return new MergeIterator(std::move(inputs), sequence_id_idx);
+ return new MergeIterator(std::move(inputs), sequence_id_idx, is_unique);
}
RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs) {
diff --git a/be/src/olap/generic_iterators.h b/be/src/olap/generic_iterators.h
index 11e024fa0f..2b849a888b 100644
--- a/be/src/olap/generic_iterators.h
+++ b/be/src/olap/generic_iterators.h
@@ -27,7 +27,8 @@ namespace doris {
//
// Inputs iterators' ownership is taken by created merge iterator. And client
// should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, int sequence_id_idx);
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*> inputs, int sequence_id_idx,
+ bool is_unique);
// Create a union iterator for input iterators. Union iterator will read
// input iterators one by one.
diff --git a/be/src/olap/reader.cpp b/be/src/olap/reader.cpp
index 4496b9f978..b6e9fddd0f 100644
--- a/be/src/olap/reader.cpp
+++ b/be/src/olap/reader.cpp
@@ -225,6 +225,7 @@ Status TabletReader::_capture_rs_readers(const ReaderParams& read_params,
_reader_context.use_page_cache = read_params.use_page_cache;
_reader_context.sequence_id_idx = _sequence_col_idx;
_reader_context.batch_size = _batch_size;
+ _reader_context.is_unique = tablet()->keys_type() == UNIQUE_KEYS;
*valid_rs_readers = *rs_readers;
diff --git a/be/src/olap/rowset/beta_rowset_reader.cpp b/be/src/olap/rowset/beta_rowset_reader.cpp
index 9ab47159a8..a532823cee 100644
--- a/be/src/olap/rowset/beta_rowset_reader.cpp
+++ b/be/src/olap/rowset/beta_rowset_reader.cpp
@@ -114,15 +114,16 @@ Status BetaRowsetReader::init(RowsetReaderContext* read_context) {
if (config::enable_storage_vectorization && read_context->is_vec) {
if (read_context->need_ordered_result &&
_rowset->rowset_meta()->is_segments_overlapping()) {
- final_iterator =
- vectorized::new_merge_iterator(iterators, read_context->sequence_id_idx);
+ final_iterator = vectorized::new_merge_iterator(
+ iterators, read_context->sequence_id_idx, read_context->is_unique);
} else {
final_iterator = vectorized::new_union_iterator(iterators);
}
} else {
if (read_context->need_ordered_result &&
_rowset->rowset_meta()->is_segments_overlapping()) {
- final_iterator = new_merge_iterator(iterators, read_context->sequence_id_idx);
+ final_iterator = new_merge_iterator(iterators, read_context->sequence_id_idx,
+ read_context->is_unique);
} else {
final_iterator = new_union_iterator(iterators);
}
diff --git a/be/src/olap/rowset/rowset_reader_context.h b/be/src/olap/rowset/rowset_reader_context.h
index 07d9340fdf..0ae42f6cf4 100644
--- a/be/src/olap/rowset/rowset_reader_context.h
+++ b/be/src/olap/rowset/rowset_reader_context.h
@@ -63,6 +63,7 @@ struct RowsetReaderContext {
int sequence_id_idx = -1;
int batch_size = 1024;
bool is_vec = false;
+ bool is_unique = false;
};
} // namespace doris
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index b382c862ea..08646bd01e 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1456,6 +1456,7 @@ Status SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletReqV2&
// for schema change, seek_columns is the same to return_columns
reader_context.seek_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
+ reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
do {
// get history data to be converted and it will check if there is hold in base tablet
@@ -1669,6 +1670,7 @@ Status SchemaChangeHandler::schema_version_convert(TabletSharedPtr base_tablet,
reader_context.return_columns = &return_columns;
reader_context.seek_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
+ reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
RowsetReaderSharedPtr rowset_reader;
RETURN_NOT_OK((*base_rowset)->create_reader(&rowset_reader));
diff --git a/be/src/olap/storage_migration_v2.cpp b/be/src/olap/storage_migration_v2.cpp
index aed8fc9621..1b3f56a68d 100644
--- a/be/src/olap/storage_migration_v2.cpp
+++ b/be/src/olap/storage_migration_v2.cpp
@@ -169,6 +169,7 @@ Status StorageMigrationV2Handler::_do_process_storage_migration_v2(
// for schema change, seek_columns is the same to return_columns
reader_context.seek_columns = &return_columns;
reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
+ reader_context.is_unique = base_tablet->keys_type() == UNIQUE_KEYS;
do {
// get history data to be converted and it will check if there is hold in base tablet
diff --git a/be/src/olap/tuple_reader.cpp b/be/src/olap/tuple_reader.cpp
index f4e5e1d16a..c7a9a2188c 100644
--- a/be/src/olap/tuple_reader.cpp
+++ b/be/src/olap/tuple_reader.cpp
@@ -186,15 +186,24 @@ Status TupleReader::_unique_key_next_row(RowCursor* row_cursor, MemPool* mem_poo
// in UNIQUE_KEY highest version is the final result, there is no need to
// merge the lower versions
direct_copy_row(row_cursor, *_next_key);
- // skip the lower version rows;
- auto res = _collect_iter.next(&_next_key, &_next_delete_flag);
- if (LIKELY(res.precise_code() != OLAP_ERR_DATA_EOF)) {
- if (UNLIKELY(!res.ok())) {
- LOG(WARNING) << "next failed: " << res;
- return res;
+ while (_next_key) {
+ // skip the lower version rows;
+ auto res = _collect_iter.next(&_next_key, &_next_delete_flag);
+ if (LIKELY(res.precise_code() != OLAP_ERR_DATA_EOF)) {
+ if (UNLIKELY(!res.ok())) {
+ LOG(WARNING) << "next failed: " << res;
+ return res;
+ }
+
+ if (!equal_row(_key_cids, *row_cursor, *_next_key)) {
+ agg_finalize_row(_value_cids, row_cursor, mem_pool);
+ break;
+ }
+ _merged_rows++;
+ cur_delete_flag = _next_delete_flag;
+ } else {
+ break;
}
- agg_finalize_row(_value_cids, row_cursor, mem_pool);
- cur_delete_flag = _next_delete_flag;
}
// if reader needs to filter delete row and current delete_flag is true,
diff --git a/be/src/vec/olap/vcollect_iterator.cpp b/be/src/vec/olap/vcollect_iterator.cpp
index 59df40bac2..3d73ab8860 100644
--- a/be/src/vec/olap/vcollect_iterator.cpp
+++ b/be/src/vec/olap/vcollect_iterator.cpp
@@ -143,14 +143,8 @@ bool VCollectIterator::LevelIteratorComparator::operator()(LevelIterator* lhs, L
// for UNIQUE_KEYS just read the highest version and no need agg_update.
// for AGG_KEYS if a version is deleted, the lower version no need to agg_update
bool lower = (cmp_res != 0) ? (cmp_res < 0) : (lhs->version() < rhs->version());
+ lower ? lhs->set_same(true) : rhs->set_same(true);
- // if lhs or rhs set same is true, means some same value already output, so need to
- // set another is same
- if (lower) {
- lhs->is_same() ? rhs->set_same(true) : lhs->set_same(true);
- } else {
- rhs->is_same() ? lhs->set_same(true) : rhs->set_same(true);
- }
return lower;
}
diff --git a/be/src/vec/olap/vgeneric_iterators.cpp b/be/src/vec/olap/vgeneric_iterators.cpp
index bd3df6adbb..363bd1c31d 100644
--- a/be/src/vec/olap/vgeneric_iterators.cpp
+++ b/be/src/vec/olap/vgeneric_iterators.cpp
@@ -119,8 +119,8 @@ Status VAutoIncrementIterator::init(const StorageReadOptions& opts) {
// }
class VMergeIteratorContext {
public:
- VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx)
- : _iter(iter), _sequence_id_idx(sequence_id_idx) {}
+ VMergeIteratorContext(RowwiseIterator* iter, int sequence_id_idx, bool is_unique)
+ : _iter(iter), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique) {}
VMergeIteratorContext(const VMergeIteratorContext&) = delete;
VMergeIteratorContext(VMergeIteratorContext&&) = delete;
VMergeIteratorContext& operator=(const VMergeIteratorContext&) = delete;
@@ -164,14 +164,17 @@ public:
return cmp_res > 0;
}
+ auto col_cmp_res = 0;
if (_sequence_id_idx != -1) {
- int col_cmp_res = this->_block.compare_column_at(_index_in_block, rhs._index_in_block,
- _sequence_id_idx, rhs._block, -1);
- if (col_cmp_res != 0) {
- return col_cmp_res < 0;
- }
+ col_cmp_res = this->_block.compare_column_at(_index_in_block, rhs._index_in_block,
+ _sequence_id_idx, rhs._block, -1);
+ }
+ auto result = col_cmp_res == 0 ? this->data_id() < rhs.data_id() : col_cmp_res < 0;
+
+ if (_is_unique) {
+ result ? this->set_skip(true) : rhs.set_skip(true);
}
- return this->data_id() < rhs.data_id();
+ return result;
}
void copy_row(vectorized::Block* block) {
@@ -202,6 +205,10 @@ public:
uint64_t data_id() const { return _iter->data_id(); }
+ bool need_skip() const { return _skip; }
+
+ void set_skip(bool skip) const { _skip = skip; }
+
private:
// Load next block into _block
Status _load_next_block();
@@ -211,10 +218,12 @@ private:
// used to store data load from iterator->next_batch(Vectorized::Block*)
vectorized::Block _block;
+ int _sequence_id_idx = -1;
+ bool _is_unique = false;
bool _valid = false;
+ mutable bool _skip = false;
size_t _index_in_block = -1;
int _block_row_max = 4096;
- int _sequence_id_idx = -1;
};
Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
@@ -229,6 +238,7 @@ Status VMergeIteratorContext::init(const StorageReadOptions& opts) {
}
Status VMergeIteratorContext::advance() {
+ _skip = false;
// NOTE: we increase _index_in_block directly to valid one check
do {
_index_in_block++;
@@ -262,8 +272,8 @@ Status VMergeIteratorContext::_load_next_block() {
class VMergeIterator : public RowwiseIterator {
public:
// VMergeIterator takes the ownership of input iterators
- VMergeIterator(std::vector<RowwiseIterator*>& iters, int sequence_id_idx)
- : _origin_iters(iters), _sequence_id_idx(sequence_id_idx) {}
+ VMergeIterator(std::vector<RowwiseIterator*>& iters, int sequence_id_idx, bool is_unique)
+ : _origin_iters(iters), _sequence_id_idx(sequence_id_idx), _is_unique(is_unique) {}
~VMergeIterator() override {
while (!_merge_heap.empty()) {
@@ -299,6 +309,7 @@ private:
int block_row_max = 0;
int _sequence_id_idx = -1;
+ bool _is_unique = false;
};
Status VMergeIterator::init(const StorageReadOptions& opts) {
@@ -308,7 +319,7 @@ Status VMergeIterator::init(const StorageReadOptions& opts) {
_schema = &(*_origin_iters.begin())->schema();
for (auto iter : _origin_iters) {
- auto ctx = std::make_unique<VMergeIteratorContext>(iter, _sequence_id_idx);
+ auto ctx = std::make_unique<VMergeIteratorContext>(iter, _sequence_id_idx, _is_unique);
RETURN_IF_ERROR(ctx->init(opts));
if (!ctx->valid()) {
continue;
@@ -330,8 +341,10 @@ Status VMergeIterator::next_batch(vectorized::Block* block) {
auto ctx = _merge_heap.top();
_merge_heap.pop();
- // copy current row to block
- ctx->copy_row(block);
+ if (!ctx->need_skip()) {
+ // copy current row to block
+ ctx->copy_row(block);
+ }
RETURN_IF_ERROR(ctx->advance());
if (ctx->valid()) {
@@ -404,11 +417,12 @@ Status VUnionIterator::next_batch(vectorized::Block* block) {
return Status::EndOfFile("End of VUnionIterator");
}
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, int sequence_id_idx) {
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, int sequence_id_idx,
+ bool is_unique) {
if (inputs.size() == 1) {
return *(inputs.begin());
}
- return new VMergeIterator(inputs, sequence_id_idx);
+ return new VMergeIterator(inputs, sequence_id_idx, is_unique);
}
RowwiseIterator* new_union_iterator(std::vector<RowwiseIterator*>& inputs) {
diff --git a/be/src/vec/olap/vgeneric_iterators.h b/be/src/vec/olap/vgeneric_iterators.h
index 79c35c97e3..dce0479875 100644
--- a/be/src/vec/olap/vgeneric_iterators.h
+++ b/be/src/vec/olap/vgeneric_iterators.h
@@ -27,7 +27,8 @@ namespace vectorized {
//
// Inputs iterators' ownership is taken by created merge iterator. And client
// should delete returned iterator after usage.
-RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, int sequence_id_idx);
+RowwiseIterator* new_merge_iterator(std::vector<RowwiseIterator*>& inputs, int sequence_id_idx,
+ bool is_unique);
// Create a union iterator for input iterators. Union iterator will read
// input iterators one by one.
diff --git a/be/test/olap/generic_iterators_test.cpp b/be/test/olap/generic_iterators_test.cpp
index 2b581ce387..340446a252 100644
--- a/be/test/olap/generic_iterators_test.cpp
+++ b/be/test/olap/generic_iterators_test.cpp
@@ -114,7 +114,7 @@ TEST(GenericIteratorsTest, Union) {
delete iter;
}
-TEST(GenericIteratorsTest, Merge) {
+TEST(GenericIteratorsTest, MergeAgg) {
auto schema = create_schema();
std::vector<RowwiseIterator*> inputs;
@@ -122,7 +122,7 @@ TEST(GenericIteratorsTest, Merge) {
inputs.push_back(new_auto_increment_iterator(schema, 200));
inputs.push_back(new_auto_increment_iterator(schema, 300));
- auto iter = new_merge_iterator(std::move(inputs), -1);
+ auto iter = new_merge_iterator(std::move(inputs), -1, false);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@@ -156,4 +156,38 @@ TEST(GenericIteratorsTest, Merge) {
delete iter;
}
+TEST(GenericIteratorsTest, MergeUnique) {
+ auto schema = create_schema();
+ std::vector<RowwiseIterator*> inputs;
+
+ inputs.push_back(new_auto_increment_iterator(schema, 100));
+ inputs.push_back(new_auto_increment_iterator(schema, 200));
+ inputs.push_back(new_auto_increment_iterator(schema, 300));
+
+ auto iter = new_merge_iterator(std::move(inputs), -1, true);
+ StorageReadOptions opts;
+ auto st = iter->init(opts);
+ EXPECT_TRUE(st.ok());
+
+ RowBlockV2 block(schema, 128);
+
+ size_t row_count = 0;
+ do {
+ block.clear();
+ st = iter->next_batch(&block);
+ for (int i = 0; i < block.num_rows(); ++i) {
+ size_t base_value = row_count;
+ auto row = block.row(i);
+ EXPECT_EQ(base_value, *(int16_t*)row.cell_ptr(0));
+ EXPECT_EQ(base_value + 1, *(int32_t*)row.cell_ptr(1));
+ EXPECT_EQ(base_value + 2, *(int64_t*)row.cell_ptr(2));
+ row_count++;
+ }
+ } while (st.ok());
+ EXPECT_TRUE(st.is_end_of_file());
+ EXPECT_EQ(300, row_count);
+
+ delete iter;
+}
+
} // namespace doris
diff --git a/be/test/vec/exec/vgeneric_iterators_test.cpp b/be/test/vec/exec/vgeneric_iterators_test.cpp
index 2a66c2556d..72fa96080c 100644
--- a/be/test/vec/exec/vgeneric_iterators_test.cpp
+++ b/be/test/vec/exec/vgeneric_iterators_test.cpp
@@ -136,7 +136,7 @@ TEST(VGenericIteratorsTest, Union) {
delete iter;
}
-TEST(VGenericIteratorsTest, Merge) {
+TEST(VGenericIteratorsTest, MergeAgg) {
EXPECT_TRUE(1);
auto schema = create_schema();
std::vector<RowwiseIterator*> inputs;
@@ -145,7 +145,7 @@ TEST(VGenericIteratorsTest, Merge) {
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
- auto iter = vectorized::new_merge_iterator(inputs, -1);
+ auto iter = vectorized::new_merge_iterator(inputs, -1, false);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@@ -185,6 +185,47 @@ TEST(VGenericIteratorsTest, Merge) {
delete iter;
}
+TEST(VGenericIteratorsTest, MergeUnique) {
+ EXPECT_TRUE(1);
+ auto schema = create_schema();
+ std::vector<RowwiseIterator*> inputs;
+
+ inputs.push_back(vectorized::new_auto_increment_iterator(schema, 100));
+ inputs.push_back(vectorized::new_auto_increment_iterator(schema, 200));
+ inputs.push_back(vectorized::new_auto_increment_iterator(schema, 300));
+
+ auto iter = vectorized::new_merge_iterator(inputs, -1, true);
+ StorageReadOptions opts;
+ auto st = iter->init(opts);
+ EXPECT_TRUE(st.ok());
+
+ vectorized::Block block;
+ create_block(schema, block);
+
+ do {
+ st = iter->next_batch(&block);
+ } while (st.ok());
+
+ EXPECT_TRUE(st.is_end_of_file());
+ EXPECT_EQ(block.rows(), 300);
+
+ auto c0 = block.get_by_position(0).column;
+ auto c1 = block.get_by_position(1).column;
+ auto c2 = block.get_by_position(2).column;
+
+ size_t row_count = 0;
+ for (size_t i = 0; i < block.rows(); ++i) {
+ size_t base_value = row_count;
+
+ EXPECT_EQ(base_value, (*c0)[i].get<int>());
+ EXPECT_EQ(base_value + 1, (*c1)[i].get<int>());
+ EXPECT_EQ(base_value + 2, (*c2)[i].get<int>());
+ row_count++;
+ }
+
+ delete iter;
+}
+
// only used for Seq Column UT
class SeqColumnUtIterator : public RowwiseIterator {
public:
@@ -275,7 +316,7 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
seq_id_in_every_file));
}
- auto iter = vectorized::new_merge_iterator(inputs, seq_column_id);
+ auto iter = vectorized::new_merge_iterator(inputs, seq_column_id, true);
StorageReadOptions opts;
auto st = iter->init(opts);
EXPECT_TRUE(st.ok());
@@ -288,17 +329,13 @@ TEST(VGenericIteratorsTest, MergeWithSeqColumn) {
} while (st.ok());
EXPECT_TRUE(st.is_end_of_file());
- EXPECT_EQ(block.rows(), seg_iter_num);
+ EXPECT_EQ(block.rows(), 1);
auto col0 = block.get_by_position(0).column;
auto col1 = block.get_by_position(1).column;
auto seq_col = block.get_by_position(seq_column_id).column;
-
- for (size_t i = 0; i < seg_iter_num; i++) {
- size_t expected_value = seg_iter_num - i - 1; // in Descending
- size_t actual_value = (*seq_col)[i].get<int>();
- EXPECT_EQ(expected_value, actual_value);
- }
+ size_t actual_value = (*seq_col)[0].get<int>();
+ EXPECT_EQ(seg_iter_num - 1, actual_value);
delete iter;
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org