You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/09 00:42:04 UTC
[doris] branch master updated: [refactor] delete non vec load from memtable (#15667)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ba54634d55 [refactor] delete non vec load from memtable (#15667)
ba54634d55 is described below
commit ba54634d555feab3a2b8d1bc5c45c1c6f5b71668
Author: zbtzbtzbt <35...@users.noreply.github.com>
AuthorDate: Mon Jan 9 08:41:58 2023 +0800
[refactor] delete non vec load from memtable (#15667)
* [refactor] delete non vec load from memtable
delete non vec load from memtable totally.
remove function keys_type() in memtable.
Co-authored-by: zhoubintao <12...@qq.com>
---
be/src/olap/memtable.cpp | 160 +++---------------------------
be/src/olap/memtable.h | 45 +--------
be/src/olap/rowset/beta_rowset_writer.cpp | 44 --------
be/src/olap/rowset/beta_rowset_writer.h | 1 -
be/src/olap/rowset/rowset_writer.h | 3 -
5 files changed, 16 insertions(+), 237 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index f4e01fd6c9..6bf477dd4d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -40,9 +40,9 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t
const std::shared_ptr<MemTracker>& insert_mem_tracker,
const std::shared_ptr<MemTracker>& flush_mem_tracker)
: _tablet(std::move(tablet)),
+ _keys_type(_tablet->keys_type()),
_schema(schema),
_tablet_schema(tablet_schema),
- _slot_descs(slot_descs),
_insert_mem_tracker(insert_mem_tracker),
_flush_mem_tracker(flush_mem_tracker),
_schema_size(_schema->schema_size()),
@@ -65,11 +65,10 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t
#endif
_buffer_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get());
_table_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get());
- _skip_list = nullptr;
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
// TODO: Support ZOrderComparator in the future
_vec_skip_list = std::make_unique<VecTable>(_vec_row_comparator.get(), _table_mem_pool.get(),
- keys_type() == KeysType::DUP_KEYS);
+ _keys_type == KeysType::DUP_KEYS);
_init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
}
void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
@@ -88,8 +87,7 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescript
void MemTable::_init_agg_functions(const vectorized::Block* block) {
for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
vectorized::AggregateFunctionPtr function;
- if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
- _tablet->enable_unique_key_merge_on_write()) {
+ if (_keys_type == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) {
// In such table, non-key column's aggregation type is NONE, so we need to construct
// the aggregate function manually.
function = vectorized::AggregateFunctionSimpleFactory::instance().get(
@@ -122,7 +120,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
}
MemTable::~MemTable() {
- if (_vec_skip_list != nullptr && keys_type() != KeysType::DUP_KEYS) {
+ if (_vec_skip_list != nullptr && _keys_type != KeysType::DUP_KEYS) {
VecTable::Iterator it(_vec_skip_list.get());
for (it.SeekToFirst(); it.Valid(); it.Next()) {
// We should release agg_places here, because they are not released when a
@@ -146,14 +144,6 @@ MemTable::~MemTable() {
DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
}
-MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {}
-
-int MemTable::RowCursorComparator::operator()(const char* left, const char* right) const {
- ContiguousRow lhs_row(_schema, left);
- ContiguousRow rhs_row(_schema, right);
- return compare_row(lhs_row, rhs_row);
-}
-
int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
const RowInBlock* right) const {
return _pblock->compare_at(left->_row_pos, right->_row_pos, _schema->num_key_columns(),
@@ -169,10 +159,11 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
_input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
_vec_row_comparator->set_block(&_input_mutable_block);
_output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
- if (keys_type() != KeysType::DUP_KEYS) {
+ if (_keys_type != KeysType::DUP_KEYS) {
_init_agg_functions(&target_block);
}
}
+
auto num_rows = row_idxs.size();
size_t cursor_in_mutableblock = _input_mutable_block.rows();
_input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows);
@@ -188,7 +179,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
_rows++;
bool overwritten = false;
- if (keys_type() == KeysType::DUP_KEYS) {
+ if (_keys_type == KeysType::DUP_KEYS) {
// TODO: dup keys only need sort opertaion. Rethink skiplist is the beat way to sort columns?
_vec_skip_list->Insert(row_in_block, &overwritten);
DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
@@ -215,87 +206,6 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
}
}
-// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
-// we first allocate from _buffer_mem_pool, and then check whether it already exists in
-// _skiplist. If it exists, we aggregate the new row into the row in skiplist.
-// otherwise, we need to copy it into _table_mem_pool before we can insert it.
-void MemTable::_insert_agg(const Tuple* tuple) {
- _rows++;
- uint8_t* tuple_buf = _buffer_mem_pool->allocate(_schema_size);
- ContiguousRow src_row(_schema, tuple_buf);
- _tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());
-
- bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
- if (is_exist) {
- _merged_rows++;
- (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
- } else {
- tuple_buf = _table_mem_pool->allocate(_schema_size);
- ContiguousRow dst_row(_schema, tuple_buf);
- _agg_object_pool.acquire_data(&_agg_buffer_pool);
- copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get());
- _skip_list->InsertWithHint((TableKey)tuple_buf, is_exist, &_hint);
- }
-
- // Make MemPool to be reusable, but does not free its memory
- _buffer_mem_pool->clear();
- _agg_buffer_pool.clear();
-}
-
-void MemTable::_insert_dup(const Tuple* tuple) {
- _rows++;
- bool overwritten = false;
- uint8_t* tuple_buf = _table_mem_pool->allocate(_schema_size);
- ContiguousRow row(_schema, tuple_buf);
- _tuple_to_row(tuple, &row, _table_mem_pool.get());
- _skip_list->Insert((TableKey)tuple_buf, &overwritten);
- DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
-}
-
-void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) {
- for (size_t i = 0; i < _slot_descs->size(); ++i) {
- auto cell = row->cell(i);
- const SlotDescriptor* slot = (*_slot_descs)[i];
-
- bool is_null = tuple->is_null(slot->null_indicator_offset());
- const auto* value = (const char*)tuple->get_slot(slot->tuple_offset());
- _schema->column(i)->consume(&cell, value, is_null, mem_pool, &_agg_buffer_pool);
- }
-}
-
-void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
- ContiguousRow dst_row(_schema, row_in_skiplist);
- if (_tablet_schema->has_sequence_col()) {
- return agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
- _table_mem_pool.get());
- }
- agg_update_row(&dst_row, src_row, _table_mem_pool.get());
-}
-
-// In the Unique Key table with primary key index, the non-key column's aggregation
-// type is NONE, to replace the data in duplicate row, we should copy the data manually.
-void MemTable::_replace_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
- ContiguousRow dst_row(_schema, row_in_skiplist);
- if (_tablet_schema->has_sequence_col()) {
- const int32_t sequence_idx = _tablet_schema->sequence_col_idx();
- auto seq_dst_cell = dst_row.cell(sequence_idx);
- auto seq_src_cell = src_row.cell(sequence_idx);
- auto res = _schema->column(sequence_idx)->compare_cell(seq_dst_cell, seq_src_cell);
- // dst sequence column larger than src, don't need to replace
- if (res > 0) {
- return;
- }
- }
- // do replace
- for (uint32_t cid = dst_row.schema()->num_key_columns(); cid < dst_row.schema()->num_columns();
- ++cid) {
- auto dst_cell = dst_row.cell(cid);
- auto src_cell = src_row.cell(cid);
- auto column = _schema->column(cid);
- column->deep_copy(&dst_cell, src_cell, _table_mem_pool.get());
- }
-}
-
void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) {
if (_tablet_schema->has_sequence_col()) {
auto sequence_idx = _tablet_schema->sequence_col_idx();
@@ -318,7 +228,7 @@ template <bool is_final>
void MemTable::_collect_vskiplist_results() {
VecTable::Iterator it(_vec_skip_list.get());
vectorized::Block in_block = _input_mutable_block.to_block();
- if (keys_type() == KeysType::DUP_KEYS) {
+ if (_keys_type == KeysType::DUP_KEYS) {
std::vector<int> row_pos_vec;
DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
row_pos_vec.reserve(in_block.rows());
@@ -380,7 +290,7 @@ void MemTable::_collect_vskiplist_results() {
void MemTable::shrink_memtable_by_agg() {
SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
- if (keys_type() == KeysType::DUP_KEYS) {
+ if (_keys_type == KeysType::DUP_KEYS) {
return;
}
_collect_vskiplist_results<false>();
@@ -391,8 +301,8 @@ bool MemTable::is_flush() const {
}
bool MemTable::need_to_agg() {
- return keys_type() == KeysType::DUP_KEYS ? is_flush()
- : memory_usage() >= config::memtable_max_buffer_size;
+ return _keys_type == KeysType::DUP_KEYS ? is_flush()
+ : memory_usage() >= config::memtable_max_buffer_size;
}
Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flush,
@@ -444,28 +354,10 @@ Status MemTable::flush() {
Status MemTable::_do_flush(int64_t& duration_ns) {
SCOPED_RAW_TIMER(&duration_ns);
- if (_skip_list) {
- Status st = _rowset_writer->flush_single_memtable(this, &_flush_size);
- if (st.is<NOT_IMPLEMENTED_ERROR>()) {
- // For alpha rowset, we do not implement "flush_single_memtable".
- // Flush the memtable like the old way.
- Table::Iterator it(_skip_list.get());
- for (it.SeekToFirst(); it.Valid(); it.Next()) {
- char* row = (char*)it.key();
- ContiguousRow dst_row(_schema, row);
- agg_finalize_row(&dst_row, _table_mem_pool.get());
- RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
- }
- RETURN_NOT_OK(_rowset_writer->flush());
- } else {
- RETURN_NOT_OK(st);
- }
- } else {
- _collect_vskiplist_results<true>();
- vectorized::Block block = _output_mutable_block.to_block();
- RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
- _flush_size = block.allocated_bytes();
- }
+ _collect_vskiplist_results<true>();
+ vectorized::Block block = _output_mutable_block.to_block();
+ RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
+ _flush_size = block.allocated_bytes();
return Status::OK();
}
@@ -473,26 +365,4 @@ Status MemTable::close() {
return flush();
}
-MemTable::Iterator::Iterator(MemTable* memtable)
- : _mem_table(memtable), _it(memtable->_skip_list.get()) {}
-
-void MemTable::Iterator::seek_to_first() {
- _it.SeekToFirst();
-}
-
-bool MemTable::Iterator::valid() {
- return _it.Valid();
-}
-
-void MemTable::Iterator::next() {
- _it.Next();
-}
-
-ContiguousRow MemTable::Iterator::get_current_row() {
- char* row = (char*)_it.key();
- ContiguousRow dst_row(_mem_table->_schema, row);
- agg_finalize_row(&dst_row, _mem_table->_table_mem_pool.get());
- return dst_row;
-}
-
} // namespace doris
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 9b2f141f5b..a7a8af3efa 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -50,12 +50,9 @@ public:
~MemTable();
int64_t tablet_id() const { return _tablet->tablet_id(); }
- KeysType keys_type() const { return _tablet->keys_type(); }
size_t memory_usage() const {
return _insert_mem_tracker->consumption() + _flush_mem_tracker->consumption();
}
-
- inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
// insert tuple from (row_pos) to (row_pos+num_rows)
void insert(const vectorized::Block* block, const std::vector<int>& row_idxs);
@@ -75,15 +72,6 @@ public:
private:
Status _do_flush(int64_t& duration_ns);
- class RowCursorComparator : public RowComparator {
- public:
- RowCursorComparator(const Schema* schema);
- int operator()(const char* left, const char* right) const override;
-
- private:
- const Schema* _schema;
- };
-
// row pos in _input_mutable_block
struct RowInBlock {
size_t _row_pos;
@@ -115,34 +103,9 @@ private:
};
private:
- using Table = SkipList<char*, RowComparator>;
- using TableKey = Table::key_type;
using VecTable = SkipList<RowInBlock*, RowInBlockComparator>;
-public:
- /// The iterator of memtable, so that the data in this memtable
- /// can be visited outside.
- class Iterator {
- public:
- Iterator(MemTable* mem_table);
- ~Iterator() = default;
-
- void seek_to_first();
- bool valid();
- void next();
- ContiguousRow get_current_row();
-
- private:
- MemTable* _mem_table;
- Table::Iterator _it;
- };
-
private:
- void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
- void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
- void _replace_row(const ContiguousRow& src_row, TableKey row_in_skiplist);
- void _insert_dup(const Tuple* tuple);
- void _insert_agg(const Tuple* tuple);
// for vectorized
void _insert_one_row_from_block(RowInBlock* row_in_block);
void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
@@ -152,10 +115,9 @@ private:
private:
TabletSharedPtr _tablet;
+ const KeysType _keys_type;
Schema* _schema;
const TabletSchema* _tablet_schema;
- // the slot in _slot_descs are in order of tablet's schema
- const std::vector<SlotDescriptor*>* _slot_descs;
// TODO: change to unique_ptr of comparator
std::shared_ptr<RowComparator> _row_comparator;
@@ -186,8 +148,6 @@ private:
ObjectPool _agg_object_pool;
size_t _schema_size;
- std::unique_ptr<Table> _skip_list;
- Table::Hint _hint;
std::unique_ptr<VecTable> _vec_skip_list;
VecTable::Hint _vec_hint;
@@ -204,9 +164,6 @@ private:
// in unique or aggregate key model.
int64_t _rows = 0;
int64_t _merged_rows = 0;
- void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
- void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
- TableKey row_in_skiplist) = nullptr;
//for vectorized
vectorized::MutableBlock _input_mutable_block;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 73acd4e18d..9710b95e57 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -637,50 +637,6 @@ Status BetaRowsetWriter::flush() {
return Status::OK();
}
-Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
- int64_t size = 0;
- int64_t sum_size = 0;
- // Create segment writer for each memtable, so that
- // all memtables can be flushed in parallel.
- std::unique_ptr<segment_v2::SegmentWriter> writer;
-
- MemTable::Iterator it(memtable);
- for (it.seek_to_first(); it.valid(); it.next()) {
- if (PREDICT_FALSE(writer == nullptr)) {
- RETURN_NOT_OK(_segcompaction_if_necessary());
- RETURN_NOT_OK(_create_segment_writer(&writer));
- }
- ContiguousRow dst_row = it.get_current_row();
- auto s = writer->append_row(dst_row);
- _raw_num_rows_written++;
- if (PREDICT_FALSE(!s.ok())) {
- LOG(WARNING) << "failed to append row: " << s.to_string();
- return Status::Error<WRITER_DATA_WRITE_ERROR>();
- }
-
- if (PREDICT_FALSE(writer->estimate_segment_size() >= MAX_SEGMENT_SIZE ||
- writer->num_rows_written() >= _context.max_rows_per_segment)) {
- auto s = _flush_segment_writer(&writer, &size);
- sum_size += size;
- if (OLAP_UNLIKELY(!s.ok())) {
- *flush_size = sum_size;
- return s;
- }
- }
- }
-
- if (writer != nullptr) {
- auto s = _flush_segment_writer(&writer, &size);
- sum_size += size;
- *flush_size = sum_size;
- if (OLAP_UNLIKELY(!s.ok())) {
- return s;
- }
- }
-
- return Status::OK();
-}
-
Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) {
if (block->rows() == 0) {
return Status::OK();
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 50f78dcf5a..6fc5cfd5ee 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -56,7 +56,6 @@ public:
// Return the file size flushed to disk in "flush_size"
// This method is thread-safe.
- Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) override;
Status flush_single_memtable(const vectorized::Block* block) override;
RowsetSharedPtr build() override;
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 7164620d58..6ca0af44d5 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -64,9 +64,6 @@ public:
}
virtual Status final_flush() { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(); }
- virtual Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
- return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
- }
virtual Status flush_single_memtable(const vectorized::Block* block) {
return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org