You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2022/05/07 02:27:33 UTC

[incubator-doris] branch master updated: [enhancement](load) optimize load string data and dict page write (#9123)

This is an automated email from the ASF dual-hosted git repository.

yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 2ccaa6338c [enhancement](load) optimize load string data and dict page write (#9123)
2ccaa6338c is described below

commit 2ccaa6338c272a953c93aaf9827779f9c8543251
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Sat May 7 10:27:27 2022 +0800

    [enhancement](load) optimize load string data and dict page write (#9123)
    
    * [enhancement](load) optimize load string data and dict page write
---
 be/src/olap/memtable.cpp                           | 87 ++++++++++++----------
 be/src/olap/memtable.h                             | 12 ++-
 be/src/olap/rowset/segment_v2/binary_dict_page.cpp | 34 ++++-----
 be/src/olap/rowset/segment_v2/binary_plain_page.h  | 18 +++--
 be/src/olap/rowset/segment_v2/bitshuffle_page.h    | 37 ++++++++-
 5 files changed, 116 insertions(+), 72 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 7e8d2fa041..7b9521e775 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -51,18 +51,28 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
         _skip_list = nullptr;
         _vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
         // TODO: Support ZOrderComparator in the future
-        _vec_skip_list = new VecTable(_vec_row_comparator.get(), _table_mem_pool.get(),
-                                      _keys_type == KeysType::DUP_KEYS);
+        _vec_skip_list = std::make_unique<VecTable>(
+                _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
     } else {
         _vec_skip_list = nullptr;
+        if (_keys_type == KeysType::DUP_KEYS) {
+            _insert_fn = &MemTable::_insert_dup;
+        } else {
+            _insert_fn = &MemTable::_insert_agg;
+        }
+        if (_tablet_schema->has_sequence_col()) {
+            _aggregate_two_row_fn = &MemTable::_aggregate_two_row_with_sequence;
+        } else {
+            _aggregate_two_row_fn = &MemTable::_aggregate_two_row;
+        }
         if (tablet_schema->sort_type() == SortType::ZORDER) {
             _row_comparator = std::make_shared<TupleRowZOrderComparator>(
                     _schema, tablet_schema->sort_col_num());
         } else {
             _row_comparator = std::make_shared<RowCursorComparator>(_schema);
         }
-        _skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(),
-                               _keys_type == KeysType::DUP_KEYS);
+        _skip_list = std::make_unique<Table>(_row_comparator.get(), _table_mem_pool.get(),
+                                             _keys_type == KeysType::DUP_KEYS);
     }
 }
 
@@ -86,9 +96,6 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
 }
 
 MemTable::~MemTable() {
-    delete _skip_list;
-    delete _vec_skip_list;
-
     std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
     _mem_tracker->release(_mem_usage);
 }
@@ -158,37 +165,25 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
     }
 }
 
-void MemTable::insert(const Tuple* tuple) {
+// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
+// we first allocate from _buffer_mem_pool, and then check whether it already exists in
+// _skiplist.  If it exists, we aggregate the new row into the row in skiplist.
+// otherwise, we need to copy it into _table_mem_pool before we can insert it.
+void MemTable::_insert_agg(const Tuple* tuple) {
     _rows++;
-    bool overwritten = false;
-    uint8_t* _tuple_buf = nullptr;
-    if (_keys_type == KeysType::DUP_KEYS) {
-        // Will insert directly, so use memory from _table_mem_pool
-        _tuple_buf = _table_mem_pool->allocate(_schema_size);
-        ContiguousRow row(_schema, _tuple_buf);
-        _tuple_to_row(tuple, &row, _table_mem_pool.get());
-        _skip_list->Insert((TableKey)_tuple_buf, &overwritten);
-        DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
-        return;
-    }
-
-    // For non-DUP models, for the data rows passed from the upper layer, when copying the data,
-    // we first allocate from _buffer_mem_pool, and then check whether it already exists in
-    // _skiplist.  If it exists, we aggregate the new row into the row in skiplist.
-    // otherwise, we need to copy it into _table_mem_pool before we can insert it.
-    _tuple_buf = _buffer_mem_pool->allocate(_schema_size);
-    ContiguousRow src_row(_schema, _tuple_buf);
+    uint8_t* tuple_buf = _buffer_mem_pool->allocate(_schema_size);
+    ContiguousRow src_row(_schema, tuple_buf);
     _tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());
 
-    bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint);
+    bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
     if (is_exist) {
-        _aggregate_two_row(src_row, _hint.curr->key);
+        (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
     } else {
-        _tuple_buf = _table_mem_pool->allocate(_schema_size);
-        ContiguousRow dst_row(_schema, _tuple_buf);
+        tuple_buf = _table_mem_pool->allocate(_schema_size);
+        ContiguousRow dst_row(_schema, tuple_buf);
         _agg_object_pool.acquire_data(&_agg_buffer_pool);
         copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get());
-        _skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint);
+        _skip_list->InsertWithHint((TableKey)tuple_buf, is_exist, &_hint);
     }
 
     // Make MemPool to be reusable, but does not free its memory
@@ -196,6 +191,16 @@ void MemTable::insert(const Tuple* tuple) {
     _agg_buffer_pool.clear();
 }
 
+void MemTable::_insert_dup(const Tuple* tuple) {
+    _rows++;
+    bool overwritten = false;
+    uint8_t* tuple_buf = _table_mem_pool->allocate(_schema_size);
+    ContiguousRow row(_schema, tuple_buf);
+    _tuple_to_row(tuple, &row, _table_mem_pool.get());
+    _skip_list->Insert((TableKey)tuple_buf, &overwritten);
+    DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
+}
+
 void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) {
     for (size_t i = 0; i < _slot_descs->size(); ++i) {
         auto cell = row->cell(i);
@@ -209,12 +214,14 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me
 
 void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
     ContiguousRow dst_row(_schema, row_in_skiplist);
-    if (_tablet_schema->has_sequence_col()) {
-        agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
-                                     _table_mem_pool.get());
-    } else {
-        agg_update_row(&dst_row, src_row, _table_mem_pool.get());
-    }
+    agg_update_row(&dst_row, src_row, _table_mem_pool.get());
+}
+
+void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row,
+                                                TableKey row_in_skiplist) {
+    ContiguousRow dst_row(_schema, row_in_skiplist);
+    agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
+                                 _table_mem_pool.get());
 }
 
 void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) {
@@ -236,7 +243,7 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_
     }
 }
 vectorized::Block MemTable::_collect_vskiplist_results() {
-    VecTable::Iterator it(_vec_skip_list);
+    VecTable::Iterator it(_vec_skip_list.get());
     vectorized::Block in_block = _input_mutable_block.to_block();
     // TODO: should try to insert data by column, not by row. to opt the the code
     if (_keys_type == KeysType::DUP_KEYS) {
@@ -282,7 +289,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
         if (st == Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED)) {
             // For alpha rowset, we do not implement "flush_single_memtable".
             // Flush the memtable like the old way.
-            Table::Iterator it(_skip_list);
+            Table::Iterator it(_skip_list.get());
             for (it.SeekToFirst(); it.Valid(); it.Next()) {
                 char* row = (char*)it.key();
                 ContiguousRow dst_row(_schema, row);
@@ -307,7 +314,7 @@ Status MemTable::close() {
 }
 
 MemTable::Iterator::Iterator(MemTable* memtable)
-        : _mem_table(memtable), _it(memtable->_skip_list) {}
+        : _mem_table(memtable), _it(memtable->_skip_list.get()) {}
 
 void MemTable::Iterator::seek_to_first() {
     _it.SeekToFirst();
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index ed3cbc5fa3..4922bb62cd 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -50,7 +50,7 @@ public:
     size_t memory_usage() const { return _mem_tracker->consumption(); }
     std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; }
 
-    void insert(const Tuple* tuple);
+    inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
     // insert tuple from (row_pos) to (row_pos+num_rows)
     void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
 
@@ -140,6 +140,9 @@ public:
 private:
     void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
     void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
+    void _aggregate_two_row_with_sequence(const ContiguousRow& new_row, TableKey row_in_skiplist);
+    void _insert_dup(const Tuple* tuple);
+    void _insert_agg(const Tuple* tuple);
     // for vectorized
     void _insert_one_row_from_block(RowInBlock* row_in_block);
     void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
@@ -171,10 +174,10 @@ private:
     ObjectPool _agg_object_pool;
 
     size_t _schema_size;
-    Table* _skip_list;
+    std::unique_ptr<Table> _skip_list;
     Table::Hint _hint;
 
-    VecTable* _vec_skip_list;
+    std::unique_ptr<VecTable> _vec_skip_list;
     VecTable::Hint _vec_hint;
 
     RowsetWriter* _rowset_writer;
@@ -185,6 +188,9 @@ private:
     // This is not the rows in this memtable, because rows may be merged
     // in unique or aggragate key model.
     int64_t _rows = 0;
+    void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
+    void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
+                                            TableKey row_in_skiplist) = nullptr;
 
     //for vectorized
     vectorized::MutableBlock _input_mutable_block;
diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
index a52dde719a..f8c67e5e21 100644
--- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
+++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
@@ -23,9 +23,9 @@
 #include "util/slice.h" // for Slice
 #include "vec/columns/column.h"
 #include "vec/columns/column_dictionary.h"
-#include "vec/columns/column_vector.h"
-#include "vec/columns/column_string.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
 #include "vec/columns/predicate_column.h"
 
 namespace doris {
@@ -66,6 +66,8 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
         const Slice* src = reinterpret_cast<const Slice*>(vals);
         size_t num_added = 0;
         uint32_t value_code = -1;
+        auto* actual_builder =
+                down_cast<BitshufflePageBuilder<OLAP_FIELD_TYPE_INT>*>(_data_page_builder.get());
 
         if (_data_page_builder->count() == 0) {
             _first_value.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()),
@@ -90,13 +92,18 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
                     dict_item.relocate(item_mem);
                 }
                 value_code = _dictionary.size();
+                size_t add_count = 1;
+                RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item),
+                                                   &add_count));
+                if (add_count == 0) {
+                    // current dict page is full, stop processing remaining inputs
+                    break;
+                }
                 _dictionary.emplace(dict_item, value_code);
-                _dict_items.push_back(dict_item);
-                _dict_builder->update_prepared_size(dict_item.size);
             }
             size_t add_count = 1;
-            RETURN_IF_ERROR(_data_page_builder->add(reinterpret_cast<const uint8_t*>(&value_code),
-                                                    &add_count));
+            RETURN_IF_ERROR(actual_builder->single_add(
+                    reinterpret_cast<const uint8_t*>(&value_code), &add_count));
             if (add_count == 0) {
                 // current data page is full, stop processing remaining inputs
                 break;
@@ -144,17 +151,7 @@ uint64_t BinaryDictPageBuilder::size() const {
 }
 
 Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
-    _dictionary.clear();
-    _dict_builder->reset();
-    size_t add_count = 1;
-    // here do not check is_page_full of dict_builder
-    // because it is checked in add
-    for (auto& dict_item : _dict_items) {
-        RETURN_IF_ERROR(
-                _dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), &add_count));
-    }
     *dictionary_page = _dict_builder->finish();
-    _dict_items.clear();
     return Status::OK();
 }
 
@@ -180,10 +177,7 @@ Status BinaryDictPageBuilder::get_last_value(void* value) const {
     }
     uint32_t value_code;
     RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code));
-    // TODO _dict_items is cleared in get_dictionary_page, which could cause
-    // get_last_value to fail when it's called after get_dictionary_page.
-    // the solution is to read last value from _dict_builder instead of _dict_items
-    *reinterpret_cast<Slice*>(value) = _dict_items[value_code];
+    *reinterpret_cast<Slice*>(value) = _dict_builder->get(value_code);
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h
index e655baca9c..e8c8f2a18e 100644
--- a/be/src/olap/rowset/segment_v2/binary_plain_page.h
+++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h
@@ -47,14 +47,13 @@ namespace segment_v2 {
 class BinaryPlainPageBuilder : public PageBuilder {
 public:
     BinaryPlainPageBuilder(const PageBuilderOptions& options)
-            : _size_estimate(0), _prepared_size(0), _options(options) {
+            : _size_estimate(0), _options(options) {
         reset();
     }
 
     bool is_page_full() override {
         // data_page_size is 0, do not limit the page size
-        return _options.data_page_size != 0 && (_size_estimate > _options.data_page_size ||
-                                                _prepared_size > _options.data_page_size);
+        return _options.data_page_size != 0 && _size_estimate > _options.data_page_size;
     }
 
     Status add(const uint8_t* vals, size_t* count) override {
@@ -101,7 +100,6 @@ public:
         _buffer.clear();
         _buffer.reserve(_options.data_page_size == 0 ? 1024 : _options.data_page_size);
         _size_estimate = sizeof(uint32_t);
-        _prepared_size = sizeof(uint32_t);
         _finished = false;
         _last_value_size = 0;
     }
@@ -127,11 +125,16 @@ public:
         return Status::OK();
     }
 
-    void update_prepared_size(size_t added_size) {
-        _prepared_size += added_size;
-        _prepared_size += sizeof(uint32_t);
+    inline Slice operator[](size_t idx) const {
+        DCHECK(!_finished);
+        DCHECK_LT(idx, _offsets.size());
+        size_t value_size =
+                (idx < _offsets.size() - 1) ? _offsets[idx + 1] - _offsets[idx] : _last_value_size;
+        return Slice(&_buffer[_offsets[idx]], value_size);
     }
 
+    inline Slice get(std::size_t idx) const { return (*this)[idx]; }
+
 private:
     void _copy_value_at(size_t idx, faststring* value) const {
         size_t value_size =
@@ -141,7 +144,6 @@ private:
 
     faststring _buffer;
     size_t _size_estimate;
-    size_t _prepared_size;
     // Offsets of each entry, relative to the start of the page
     std::vector<uint32_t> _offsets;
     bool _finished;
diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page.h b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
index ceadcf6579..50edb30c2a 100644
--- a/be/src/olap/rowset/segment_v2/bitshuffle_page.h
+++ b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
@@ -93,13 +93,48 @@ public:
     bool is_page_full() override { return _remain_element_capacity == 0; }
 
     Status add(const uint8_t* vals, size_t* count) override {
+        return add_internal<false>(vals, count);
+    }
+
+    Status single_add(const uint8_t* vals, size_t* count) {
+        return add_internal<true>(vals, count);
+    }
+
+    template <bool single>
+    inline Status add_internal(const uint8_t* vals, size_t* count) {
         DCHECK(!_finished);
+        if (_remain_element_capacity <= 0) {
+            *count = 0;
+            return Status::RuntimeError("page is full.");
+        }
         int to_add = std::min<int>(_remain_element_capacity, *count);
-        _data.append(vals, to_add * SIZE_OF_TYPE);
+        int to_add_size = to_add * SIZE_OF_TYPE;
+        size_t orig_size = _data.size();
+        _data.resize(orig_size + to_add_size);
         _count += to_add;
         _remain_element_capacity -= to_add;
         // return added number through count
         *count = to_add;
+        if constexpr (single) {
+            if constexpr (SIZE_OF_TYPE == 1) {
+                *reinterpret_cast<uint8_t*>(&_data[orig_size]) = *vals;
+                return Status::OK();
+            } else if constexpr (SIZE_OF_TYPE == 2) {
+                *reinterpret_cast<uint16_t*>(&_data[orig_size]) =
+                        *reinterpret_cast<const uint16_t*>(vals);
+                return Status::OK();
+            } else if constexpr (SIZE_OF_TYPE == 4) {
+                *reinterpret_cast<uint32_t*>(&_data[orig_size]) =
+                        *reinterpret_cast<const uint32_t*>(vals);
+                return Status::OK();
+            } else if constexpr (SIZE_OF_TYPE == 8) {
+                *reinterpret_cast<uint64_t*>(&_data[orig_size]) =
+                        *reinterpret_cast<const uint64_t*>(vals);
+                return Status::OK();
+            }
+        }
+        // when single is true and SIZE_OF_TYPE > 8 or single is false
+        memcpy(&_data[orig_size], vals, to_add_size);
         return Status::OK();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org