You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/01/09 00:42:04 UTC

[doris] branch master updated: [refactor] delete non vec load from memtable (#15667)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new ba54634d55 [refactor] delete non vec load from memtable (#15667)
ba54634d55 is described below

commit ba54634d555feab3a2b8d1bc5c45c1c6f5b71668
Author: zbtzbtzbt <35...@users.noreply.github.com>
AuthorDate: Mon Jan 9 08:41:58 2023 +0800

    [refactor] delete non vec load from memtable (#15667)
    
    * [refactor] delete non vec load from memtable
    delete non vec load from memtable totally.
    
    remove function keys_type() in memtable.
    
    Co-authored-by: zhoubintao <12...@qq.com>
---
 be/src/olap/memtable.cpp                  | 160 +++---------------------------
 be/src/olap/memtable.h                    |  45 +--------
 be/src/olap/rowset/beta_rowset_writer.cpp |  44 --------
 be/src/olap/rowset/beta_rowset_writer.h   |   1 -
 be/src/olap/rowset/rowset_writer.h        |   3 -
 5 files changed, 16 insertions(+), 237 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index f4e01fd6c9..6bf477dd4d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -40,9 +40,9 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t
                    const std::shared_ptr<MemTracker>& insert_mem_tracker,
                    const std::shared_ptr<MemTracker>& flush_mem_tracker)
         : _tablet(std::move(tablet)),
+          _keys_type(_tablet->keys_type()),
           _schema(schema),
           _tablet_schema(tablet_schema),
-          _slot_descs(slot_descs),
           _insert_mem_tracker(insert_mem_tracker),
           _flush_mem_tracker(flush_mem_tracker),
           _schema_size(_schema->schema_size()),
@@ -65,11 +65,10 @@ MemTable::MemTable(TabletSharedPtr tablet, Schema* schema, const TabletSchema* t
 #endif
     _buffer_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get());
     _table_mem_pool = std::make_unique<MemPool>(_insert_mem_tracker.get());
-    _skip_list = nullptr;
     _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
     // TODO: Support ZOrderComparator in the future
     _vec_skip_list = std::make_unique<VecTable>(_vec_row_comparator.get(), _table_mem_pool.get(),
-                                                keys_type() == KeysType::DUP_KEYS);
+                                                _keys_type == KeysType::DUP_KEYS);
     _init_columns_offset_by_slot_descs(slot_descs, tuple_desc);
 }
 void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescriptor*>* slot_descs,
@@ -88,8 +87,7 @@ void MemTable::_init_columns_offset_by_slot_descs(const std::vector<SlotDescript
 void MemTable::_init_agg_functions(const vectorized::Block* block) {
     for (uint32_t cid = _schema->num_key_columns(); cid < _schema->num_columns(); ++cid) {
         vectorized::AggregateFunctionPtr function;
-        if (_tablet->keys_type() == KeysType::UNIQUE_KEYS &&
-            _tablet->enable_unique_key_merge_on_write()) {
+        if (_keys_type == KeysType::UNIQUE_KEYS && _tablet->enable_unique_key_merge_on_write()) {
             // In such table, non-key column's aggregation type is NONE, so we need to construct
             // the aggregate function manually.
             function = vectorized::AggregateFunctionSimpleFactory::instance().get(
@@ -122,7 +120,7 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
 }
 
 MemTable::~MemTable() {
-    if (_vec_skip_list != nullptr && keys_type() != KeysType::DUP_KEYS) {
+    if (_vec_skip_list != nullptr && _keys_type != KeysType::DUP_KEYS) {
         VecTable::Iterator it(_vec_skip_list.get());
         for (it.SeekToFirst(); it.Valid(); it.Next()) {
             // We should release agg_places here, because they are not released when a
@@ -146,14 +144,6 @@ MemTable::~MemTable() {
     DCHECK_EQ(_flush_mem_tracker->consumption(), 0);
 }
 
-MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {}
-
-int MemTable::RowCursorComparator::operator()(const char* left, const char* right) const {
-    ContiguousRow lhs_row(_schema, left);
-    ContiguousRow rhs_row(_schema, right);
-    return compare_row(lhs_row, rhs_row);
-}
-
 int MemTable::RowInBlockComparator::operator()(const RowInBlock* left,
                                                const RowInBlock* right) const {
     return _pblock->compare_at(left->_row_pos, right->_row_pos, _schema->num_key_columns(),
@@ -169,10 +159,11 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
         _input_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
         _vec_row_comparator->set_block(&_input_mutable_block);
         _output_mutable_block = vectorized::MutableBlock::build_mutable_block(&cloneBlock);
-        if (keys_type() != KeysType::DUP_KEYS) {
+        if (_keys_type != KeysType::DUP_KEYS) {
             _init_agg_functions(&target_block);
         }
     }
+
     auto num_rows = row_idxs.size();
     size_t cursor_in_mutableblock = _input_mutable_block.rows();
     _input_mutable_block.add_rows(&target_block, row_idxs.data(), row_idxs.data() + num_rows);
@@ -188,7 +179,7 @@ void MemTable::insert(const vectorized::Block* input_block, const std::vector<in
 void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
     _rows++;
     bool overwritten = false;
-    if (keys_type() == KeysType::DUP_KEYS) {
+    if (_keys_type == KeysType::DUP_KEYS) {
         // TODO: dup keys only need sort opertaion. Rethink skiplist is the beat way to sort columns?
         _vec_skip_list->Insert(row_in_block, &overwritten);
         DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
@@ -215,87 +206,6 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
     }
 }
 
-// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
-// we first allocate from _buffer_mem_pool, and then check whether it already exists in
-// _skiplist.  If it exists, we aggregate the new row into the row in skiplist.
-// otherwise, we need to copy it into _table_mem_pool before we can insert it.
-void MemTable::_insert_agg(const Tuple* tuple) {
-    _rows++;
-    uint8_t* tuple_buf = _buffer_mem_pool->allocate(_schema_size);
-    ContiguousRow src_row(_schema, tuple_buf);
-    _tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());
-
-    bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
-    if (is_exist) {
-        _merged_rows++;
-        (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
-    } else {
-        tuple_buf = _table_mem_pool->allocate(_schema_size);
-        ContiguousRow dst_row(_schema, tuple_buf);
-        _agg_object_pool.acquire_data(&_agg_buffer_pool);
-        copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get());
-        _skip_list->InsertWithHint((TableKey)tuple_buf, is_exist, &_hint);
-    }
-
-    // Make MemPool to be reusable, but does not free its memory
-    _buffer_mem_pool->clear();
-    _agg_buffer_pool.clear();
-}
-
-void MemTable::_insert_dup(const Tuple* tuple) {
-    _rows++;
-    bool overwritten = false;
-    uint8_t* tuple_buf = _table_mem_pool->allocate(_schema_size);
-    ContiguousRow row(_schema, tuple_buf);
-    _tuple_to_row(tuple, &row, _table_mem_pool.get());
-    _skip_list->Insert((TableKey)tuple_buf, &overwritten);
-    DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
-}
-
-void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) {
-    for (size_t i = 0; i < _slot_descs->size(); ++i) {
-        auto cell = row->cell(i);
-        const SlotDescriptor* slot = (*_slot_descs)[i];
-
-        bool is_null = tuple->is_null(slot->null_indicator_offset());
-        const auto* value = (const char*)tuple->get_slot(slot->tuple_offset());
-        _schema->column(i)->consume(&cell, value, is_null, mem_pool, &_agg_buffer_pool);
-    }
-}
-
-void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
-    ContiguousRow dst_row(_schema, row_in_skiplist);
-    if (_tablet_schema->has_sequence_col()) {
-        return agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
-                                            _table_mem_pool.get());
-    }
-    agg_update_row(&dst_row, src_row, _table_mem_pool.get());
-}
-
-// In the Unique Key table with primary key index, the non-key column's aggregation
-// type is NONE, to replace the data in duplicate row, we should copy the data manually.
-void MemTable::_replace_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
-    ContiguousRow dst_row(_schema, row_in_skiplist);
-    if (_tablet_schema->has_sequence_col()) {
-        const int32_t sequence_idx = _tablet_schema->sequence_col_idx();
-        auto seq_dst_cell = dst_row.cell(sequence_idx);
-        auto seq_src_cell = src_row.cell(sequence_idx);
-        auto res = _schema->column(sequence_idx)->compare_cell(seq_dst_cell, seq_src_cell);
-        // dst sequence column larger than src, don't need to replace
-        if (res > 0) {
-            return;
-        }
-    }
-    // do replace
-    for (uint32_t cid = dst_row.schema()->num_key_columns(); cid < dst_row.schema()->num_columns();
-         ++cid) {
-        auto dst_cell = dst_row.cell(cid);
-        auto src_cell = src_row.cell(cid);
-        auto column = _schema->column(cid);
-        column->deep_copy(&dst_cell, src_cell, _table_mem_pool.get());
-    }
-}
-
 void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) {
     if (_tablet_schema->has_sequence_col()) {
         auto sequence_idx = _tablet_schema->sequence_col_idx();
@@ -318,7 +228,7 @@ template <bool is_final>
 void MemTable::_collect_vskiplist_results() {
     VecTable::Iterator it(_vec_skip_list.get());
     vectorized::Block in_block = _input_mutable_block.to_block();
-    if (keys_type() == KeysType::DUP_KEYS) {
+    if (_keys_type == KeysType::DUP_KEYS) {
         std::vector<int> row_pos_vec;
         DCHECK(in_block.rows() <= std::numeric_limits<int>::max());
         row_pos_vec.reserve(in_block.rows());
@@ -380,7 +290,7 @@ void MemTable::_collect_vskiplist_results() {
 
 void MemTable::shrink_memtable_by_agg() {
     SCOPED_CONSUME_MEM_TRACKER(_insert_mem_tracker_use_hook.get());
-    if (keys_type() == KeysType::DUP_KEYS) {
+    if (_keys_type == KeysType::DUP_KEYS) {
         return;
     }
     _collect_vskiplist_results<false>();
@@ -391,8 +301,8 @@ bool MemTable::is_flush() const {
 }
 
 bool MemTable::need_to_agg() {
-    return keys_type() == KeysType::DUP_KEYS ? is_flush()
-                                             : memory_usage() >= config::memtable_max_buffer_size;
+    return _keys_type == KeysType::DUP_KEYS ? is_flush()
+                                            : memory_usage() >= config::memtable_max_buffer_size;
 }
 
 Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flush,
@@ -444,28 +354,10 @@ Status MemTable::flush() {
 
 Status MemTable::_do_flush(int64_t& duration_ns) {
     SCOPED_RAW_TIMER(&duration_ns);
-    if (_skip_list) {
-        Status st = _rowset_writer->flush_single_memtable(this, &_flush_size);
-        if (st.is<NOT_IMPLEMENTED_ERROR>()) {
-            // For alpha rowset, we do not implement "flush_single_memtable".
-            // Flush the memtable like the old way.
-            Table::Iterator it(_skip_list.get());
-            for (it.SeekToFirst(); it.Valid(); it.Next()) {
-                char* row = (char*)it.key();
-                ContiguousRow dst_row(_schema, row);
-                agg_finalize_row(&dst_row, _table_mem_pool.get());
-                RETURN_NOT_OK(_rowset_writer->add_row(dst_row));
-            }
-            RETURN_NOT_OK(_rowset_writer->flush());
-        } else {
-            RETURN_NOT_OK(st);
-        }
-    } else {
-        _collect_vskiplist_results<true>();
-        vectorized::Block block = _output_mutable_block.to_block();
-        RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
-        _flush_size = block.allocated_bytes();
-    }
+    _collect_vskiplist_results<true>();
+    vectorized::Block block = _output_mutable_block.to_block();
+    RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
+    _flush_size = block.allocated_bytes();
     return Status::OK();
 }
 
@@ -473,26 +365,4 @@ Status MemTable::close() {
     return flush();
 }
 
-MemTable::Iterator::Iterator(MemTable* memtable)
-        : _mem_table(memtable), _it(memtable->_skip_list.get()) {}
-
-void MemTable::Iterator::seek_to_first() {
-    _it.SeekToFirst();
-}
-
-bool MemTable::Iterator::valid() {
-    return _it.Valid();
-}
-
-void MemTable::Iterator::next() {
-    _it.Next();
-}
-
-ContiguousRow MemTable::Iterator::get_current_row() {
-    char* row = (char*)_it.key();
-    ContiguousRow dst_row(_mem_table->_schema, row);
-    agg_finalize_row(&dst_row, _mem_table->_table_mem_pool.get());
-    return dst_row;
-}
-
 } // namespace doris
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 9b2f141f5b..a7a8af3efa 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -50,12 +50,9 @@ public:
     ~MemTable();
 
     int64_t tablet_id() const { return _tablet->tablet_id(); }
-    KeysType keys_type() const { return _tablet->keys_type(); }
     size_t memory_usage() const {
         return _insert_mem_tracker->consumption() + _flush_mem_tracker->consumption();
     }
-
-    inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
     // insert tuple from (row_pos) to (row_pos+num_rows)
     void insert(const vectorized::Block* block, const std::vector<int>& row_idxs);
 
@@ -75,15 +72,6 @@ public:
 private:
     Status _do_flush(int64_t& duration_ns);
 
-    class RowCursorComparator : public RowComparator {
-    public:
-        RowCursorComparator(const Schema* schema);
-        int operator()(const char* left, const char* right) const override;
-
-    private:
-        const Schema* _schema;
-    };
-
     // row pos in _input_mutable_block
     struct RowInBlock {
         size_t _row_pos;
@@ -115,34 +103,9 @@ private:
     };
 
 private:
-    using Table = SkipList<char*, RowComparator>;
-    using TableKey = Table::key_type;
     using VecTable = SkipList<RowInBlock*, RowInBlockComparator>;
 
-public:
-    /// The iterator of memtable, so that the data in this memtable
-    /// can be visited outside.
-    class Iterator {
-    public:
-        Iterator(MemTable* mem_table);
-        ~Iterator() = default;
-
-        void seek_to_first();
-        bool valid();
-        void next();
-        ContiguousRow get_current_row();
-
-    private:
-        MemTable* _mem_table;
-        Table::Iterator _it;
-    };
-
 private:
-    void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
-    void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
-    void _replace_row(const ContiguousRow& src_row, TableKey row_in_skiplist);
-    void _insert_dup(const Tuple* tuple);
-    void _insert_agg(const Tuple* tuple);
     // for vectorized
     void _insert_one_row_from_block(RowInBlock* row_in_block);
     void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
@@ -152,10 +115,9 @@ private:
 
 private:
     TabletSharedPtr _tablet;
+    const KeysType _keys_type;
     Schema* _schema;
     const TabletSchema* _tablet_schema;
-    // the slot in _slot_descs are in order of tablet's schema
-    const std::vector<SlotDescriptor*>* _slot_descs;
 
     // TODO: change to unique_ptr of comparator
     std::shared_ptr<RowComparator> _row_comparator;
@@ -186,8 +148,6 @@ private:
     ObjectPool _agg_object_pool;
 
     size_t _schema_size;
-    std::unique_ptr<Table> _skip_list;
-    Table::Hint _hint;
 
     std::unique_ptr<VecTable> _vec_skip_list;
     VecTable::Hint _vec_hint;
@@ -204,9 +164,6 @@ private:
     // in unique or aggregate key model.
     int64_t _rows = 0;
     int64_t _merged_rows = 0;
-    void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
-    void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
-                                            TableKey row_in_skiplist) = nullptr;
 
     //for vectorized
     vectorized::MutableBlock _input_mutable_block;
diff --git a/be/src/olap/rowset/beta_rowset_writer.cpp b/be/src/olap/rowset/beta_rowset_writer.cpp
index 73acd4e18d..9710b95e57 100644
--- a/be/src/olap/rowset/beta_rowset_writer.cpp
+++ b/be/src/olap/rowset/beta_rowset_writer.cpp
@@ -637,50 +637,6 @@ Status BetaRowsetWriter::flush() {
     return Status::OK();
 }
 
-Status BetaRowsetWriter::flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
-    int64_t size = 0;
-    int64_t sum_size = 0;
-    // Create segment writer for each memtable, so that
-    // all memtables can be flushed in parallel.
-    std::unique_ptr<segment_v2::SegmentWriter> writer;
-
-    MemTable::Iterator it(memtable);
-    for (it.seek_to_first(); it.valid(); it.next()) {
-        if (PREDICT_FALSE(writer == nullptr)) {
-            RETURN_NOT_OK(_segcompaction_if_necessary());
-            RETURN_NOT_OK(_create_segment_writer(&writer));
-        }
-        ContiguousRow dst_row = it.get_current_row();
-        auto s = writer->append_row(dst_row);
-        _raw_num_rows_written++;
-        if (PREDICT_FALSE(!s.ok())) {
-            LOG(WARNING) << "failed to append row: " << s.to_string();
-            return Status::Error<WRITER_DATA_WRITE_ERROR>();
-        }
-
-        if (PREDICT_FALSE(writer->estimate_segment_size() >= MAX_SEGMENT_SIZE ||
-                          writer->num_rows_written() >= _context.max_rows_per_segment)) {
-            auto s = _flush_segment_writer(&writer, &size);
-            sum_size += size;
-            if (OLAP_UNLIKELY(!s.ok())) {
-                *flush_size = sum_size;
-                return s;
-            }
-        }
-    }
-
-    if (writer != nullptr) {
-        auto s = _flush_segment_writer(&writer, &size);
-        sum_size += size;
-        *flush_size = sum_size;
-        if (OLAP_UNLIKELY(!s.ok())) {
-            return s;
-        }
-    }
-
-    return Status::OK();
-}
-
 Status BetaRowsetWriter::flush_single_memtable(const vectorized::Block* block) {
     if (block->rows() == 0) {
         return Status::OK();
diff --git a/be/src/olap/rowset/beta_rowset_writer.h b/be/src/olap/rowset/beta_rowset_writer.h
index 50f78dcf5a..6fc5cfd5ee 100644
--- a/be/src/olap/rowset/beta_rowset_writer.h
+++ b/be/src/olap/rowset/beta_rowset_writer.h
@@ -56,7 +56,6 @@ public:
 
     // Return the file size flushed to disk in "flush_size"
     // This method is thread-safe.
-    Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) override;
     Status flush_single_memtable(const vectorized::Block* block) override;
 
     RowsetSharedPtr build() override;
diff --git a/be/src/olap/rowset/rowset_writer.h b/be/src/olap/rowset/rowset_writer.h
index 7164620d58..6ca0af44d5 100644
--- a/be/src/olap/rowset/rowset_writer.h
+++ b/be/src/olap/rowset/rowset_writer.h
@@ -64,9 +64,6 @@ public:
     }
     virtual Status final_flush() { return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>(); }
 
-    virtual Status flush_single_memtable(MemTable* memtable, int64_t* flush_size) {
-        return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
-    }
     virtual Status flush_single_memtable(const vectorized::Block* block) {
         return Status::Error<ErrorCode::NOT_IMPLEMENTED_ERROR>();
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org