You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/05/15 17:08:19 UTC

[incubator-doris] 04/05: [enhancement](load) optimize load string data and dict page write (#9123)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch dev-1.0.1
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git

commit 77932f066c1d857a0cc251547ab53c1a572c77ca
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Sat May 7 10:27:27 2022 +0800

    [enhancement](load) optimize load string data and dict page write (#9123)
    
    * [enhancement](load) optimize load string data and dict page write
---
 be/src/olap/memtable.cpp                           | 83 ++++++++++++----------
 be/src/olap/memtable.h                             | 10 ++-
 be/src/olap/rowset/segment_v2/binary_dict_page.cpp | 34 ++++-----
 be/src/olap/rowset/segment_v2/binary_plain_page.h  | 18 ++---
 be/src/olap/rowset/segment_v2/bitshuffle_page.h    | 37 +++++++++-
 5 files changed, 114 insertions(+), 68 deletions(-)

diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 460638d836..a7d6728ebe 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -43,18 +43,27 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
           _table_mem_pool(new MemPool(_mem_tracker.get())),
           _schema_size(_schema->schema_size()),
           _rowset_writer(rowset_writer) {
+    if (_keys_type == KeysType::DUP_KEYS) {
+        _insert_fn = &MemTable::_insert_dup;
+    } else {
+        _insert_fn = &MemTable::_insert_agg;
+    }
+    if (_tablet_schema->has_sequence_col()) {
+        _aggregate_two_row_fn = &MemTable::_aggregate_two_row_with_sequence;
+    } else {
+        _aggregate_two_row_fn = &MemTable::_aggregate_two_row;
+    }
     if (tablet_schema->sort_type() == SortType::ZORDER) {
-        _row_comparator =
-                std::make_shared<TupleRowZOrderComparator>(_schema, tablet_schema->sort_col_num());
+        _row_comparator = std::make_shared<TupleRowZOrderComparator>(
+                _schema, tablet_schema->sort_col_num());
     } else {
         _row_comparator = std::make_shared<RowCursorComparator>(_schema);
     }
-    _skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(),
-                           _keys_type == KeysType::DUP_KEYS);
+    _skip_list = std::make_unique<Table>(_row_comparator.get(), _table_mem_pool.get(),
+                                         _keys_type == KeysType::DUP_KEYS);
 }
 
 MemTable::~MemTable() {
-    delete _skip_list;
 }
 
 MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {}
@@ -65,37 +74,25 @@ int MemTable::RowCursorComparator::operator()(const char* left, const char* righ
     return compare_row(lhs_row, rhs_row);
 }
 
-void MemTable::insert(const Tuple* tuple) {
+// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
+// we first allocate from _buffer_mem_pool, and then check whether it already exists in
+// _skiplist.  If it exists, we aggregate the new row into the row in skiplist.
+// otherwise, we need to copy it into _table_mem_pool before we can insert it.
+void MemTable::_insert_agg(const Tuple* tuple) {
     _rows++;
-    bool overwritten = false;
-    uint8_t* _tuple_buf = nullptr;
-    if (_keys_type == KeysType::DUP_KEYS) {
-        // Will insert directly, so use memory from _table_mem_pool
-        _tuple_buf = _table_mem_pool->allocate(_schema_size);
-        ContiguousRow row(_schema, _tuple_buf);
-        _tuple_to_row(tuple, &row, _table_mem_pool.get());
-        _skip_list->Insert((TableKey)_tuple_buf, &overwritten);
-        DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
-        return;
-    }
-
-    // For non-DUP models, for the data rows passed from the upper layer, when copying the data,
-    // we first allocate from _buffer_mem_pool, and then check whether it already exists in
-    // _skiplist.  If it exists, we aggregate the new row into the row in skiplist.
-    // otherwise, we need to copy it into _table_mem_pool before we can insert it.
-    _tuple_buf = _buffer_mem_pool->allocate(_schema_size);
-    ContiguousRow src_row(_schema, _tuple_buf);
+    uint8_t* tuple_buf = _buffer_mem_pool->allocate(_schema_size);
+    ContiguousRow src_row(_schema, tuple_buf);
     _tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());
 
-    bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint);
+    bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
     if (is_exist) {
-        _aggregate_two_row(src_row, _hint.curr->key);
+        (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
     } else {
-        _tuple_buf = _table_mem_pool->allocate(_schema_size);
-        ContiguousRow dst_row(_schema, _tuple_buf);
+        tuple_buf = _table_mem_pool->allocate(_schema_size);
+        ContiguousRow dst_row(_schema, tuple_buf);
         _agg_object_pool.acquire_data(&_agg_buffer_pool);
         copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get());
-        _skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint);
+        _skip_list->InsertWithHint((TableKey)tuple_buf, is_exist, &_hint);
     }
 
     // Make MemPool to be reusable, but does not free its memory
@@ -103,6 +100,16 @@ void MemTable::insert(const Tuple* tuple) {
     _agg_buffer_pool.clear();
 }
 
+void MemTable::_insert_dup(const Tuple* tuple) {
+    _rows++;
+    bool overwritten = false;
+    uint8_t* tuple_buf = _table_mem_pool->allocate(_schema_size);
+    ContiguousRow row(_schema, tuple_buf);
+    _tuple_to_row(tuple, &row, _table_mem_pool.get());
+    _skip_list->Insert((TableKey)tuple_buf, &overwritten);
+    DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
+}
+
 void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) {
     for (size_t i = 0; i < _slot_descs->size(); ++i) {
         auto cell = row->cell(i);
@@ -116,12 +123,14 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me
 
 void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
     ContiguousRow dst_row(_schema, row_in_skiplist);
-    if (_tablet_schema->has_sequence_col()) {
-        agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
-                                     _table_mem_pool.get());
-    } else {
-        agg_update_row(&dst_row, src_row, _table_mem_pool.get());
-    }
+    agg_update_row(&dst_row, src_row, _table_mem_pool.get());
+}
+
+void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row,
+                                                TableKey row_in_skiplist) {
+    ContiguousRow dst_row(_schema, row_in_skiplist);
+    agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
+                                 _table_mem_pool.get());
 }
 
 OLAPStatus MemTable::flush() {
@@ -134,7 +143,7 @@ OLAPStatus MemTable::flush() {
         if (st == OLAP_ERR_FUNC_NOT_IMPLEMENTED) {
             // For alpha rowset, we do not implement "flush_single_memtable".
             // Flush the memtable like the old way.
-            Table::Iterator it(_skip_list);
+            Table::Iterator it(_skip_list.get());
             for (it.SeekToFirst(); it.Valid(); it.Next()) {
                 char* row = (char*)it.key();
                 ContiguousRow dst_row(_schema, row);
@@ -158,7 +167,7 @@ OLAPStatus MemTable::close() {
 }
 
 MemTable::Iterator::Iterator(MemTable* memtable)
-        : _mem_table(memtable), _it(memtable->_skip_list) {}
+        : _mem_table(memtable), _it(memtable->_skip_list.get()) {}
 
 void MemTable::Iterator::seek_to_first() {
     _it.SeekToFirst();
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 4282ba5c06..d400b74903 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -46,7 +46,7 @@ public:
 
     int64_t tablet_id() const { return _tablet_id; }
     size_t memory_usage() const { return _mem_tracker->consumption(); }
-    void insert(const Tuple* tuple);
+    inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
     /// Flush
     OLAPStatus flush();
     OLAPStatus close();
@@ -88,6 +88,9 @@ public:
 private:
     void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
     void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
+    void _aggregate_two_row_with_sequence(const ContiguousRow& new_row, TableKey row_in_skiplist);
+    void _insert_dup(const Tuple* tuple);
+    void _insert_agg(const Tuple* tuple);
 
     int64_t _tablet_id;
     Schema* _schema;
@@ -112,7 +115,7 @@ private:
     ObjectPool _agg_object_pool;
 
     size_t _schema_size;
-    Table* _skip_list;
+    std::unique_ptr<Table> _skip_list;
     Table::Hint _hint;
 
     RowsetWriter* _rowset_writer;
@@ -123,6 +126,9 @@ private:
     // This is not the rows in this memtable, because rows may be merged
     // in unique or aggragate key model.
     int64_t _rows = 0;
+    void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
+    void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
+                                            TableKey row_in_skiplist) = nullptr;
 
 }; // class MemTable
 
diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
index 95a9fd5c7b..9cee35f997 100644
--- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
+++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
@@ -23,9 +23,9 @@
 #include "util/slice.h" // for Slice
 #include "vec/columns/column.h"
 #include "vec/columns/column_dictionary.h"
-#include "vec/columns/column_vector.h"
-#include "vec/columns/column_string.h"
 #include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
 #include "vec/columns/predicate_column.h"
 
 namespace doris {
@@ -67,6 +67,8 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
         const Slice* src = reinterpret_cast<const Slice*>(vals);
         size_t num_added = 0;
         uint32_t value_code = -1;
+        auto* actual_builder =
+                down_cast<BitshufflePageBuilder<OLAP_FIELD_TYPE_INT>*>(_data_page_builder.get());
 
         if (_data_page_builder->count() == 0) {
             _first_value.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()),
@@ -91,13 +93,18 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
                     dict_item.relocate(item_mem);
                 }
                 value_code = _dictionary.size();
+                size_t add_count = 1;
+                RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item),
+                                                   &add_count));
+                if (add_count == 0) {
+                    // current dict page is full, stop processing remaining inputs
+                    break;
+                }
                 _dictionary.emplace(dict_item, value_code);
-                _dict_items.push_back(dict_item);
-                _dict_builder->update_prepared_size(dict_item.size);
             }
             size_t add_count = 1;
-            RETURN_IF_ERROR(_data_page_builder->add(reinterpret_cast<const uint8_t*>(&value_code),
-                                                    &add_count));
+            RETURN_IF_ERROR(actual_builder->single_add(
+                    reinterpret_cast<const uint8_t*>(&value_code), &add_count));
             if (add_count == 0) {
                 // current data page is full, stop processing remaining inputs
                 break;
@@ -145,17 +152,7 @@ uint64_t BinaryDictPageBuilder::size() const {
 }
 
 Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
-    _dictionary.clear();
-    _dict_builder->reset();
-    size_t add_count = 1;
-    // here do not check is_page_full of dict_builder
-    // because it is checked in add
-    for (auto& dict_item : _dict_items) {
-        RETURN_IF_ERROR(
-                _dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), &add_count));
-    }
     *dictionary_page = _dict_builder->finish();
-    _dict_items.clear();
     return Status::OK();
 }
 
@@ -181,10 +178,7 @@ Status BinaryDictPageBuilder::get_last_value(void* value) const {
     }
     uint32_t value_code;
     RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code));
-    // TODO _dict_items is cleared in get_dictionary_page, which could cause
-    // get_last_value to fail when it's called after get_dictionary_page.
-    // the solution is to read last value from _dict_builder instead of _dict_items
-    *reinterpret_cast<Slice*>(value) = _dict_items[value_code];
+    *reinterpret_cast<Slice*>(value) = _dict_builder->get(value_code);
     return Status::OK();
 }
 
diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h
index 061884fb2b..5658d1cb77 100644
--- a/be/src/olap/rowset/segment_v2/binary_plain_page.h
+++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h
@@ -47,14 +47,13 @@ namespace segment_v2 {
 class BinaryPlainPageBuilder : public PageBuilder {
 public:
     BinaryPlainPageBuilder(const PageBuilderOptions& options)
-            : _size_estimate(0), _prepared_size(0), _options(options) {
+            : _size_estimate(0), _options(options) {
         reset();
     }
 
     bool is_page_full() override {
         // data_page_size is 0, do not limit the page size
-        return _options.data_page_size != 0 && (_size_estimate > _options.data_page_size ||
-                                                _prepared_size > _options.data_page_size);
+        return _options.data_page_size != 0 && _size_estimate > _options.data_page_size;
     }
 
     Status add(const uint8_t* vals, size_t* count) override {
@@ -101,7 +100,6 @@ public:
         _buffer.clear();
         _buffer.reserve(_options.data_page_size == 0 ? 1024 : _options.data_page_size);
         _size_estimate = sizeof(uint32_t);
-        _prepared_size = sizeof(uint32_t);
         _finished = false;
         _last_value_size = 0;
     }
@@ -127,11 +125,16 @@ public:
         return Status::OK();
     }
 
-    void update_prepared_size(size_t added_size) {
-        _prepared_size += added_size;
-        _prepared_size += sizeof(uint32_t);
+    inline Slice operator[](size_t idx) const {
+        DCHECK(!_finished);
+        DCHECK_LT(idx, _offsets.size());
+        size_t value_size =
+                (idx < _offsets.size() - 1) ? _offsets[idx + 1] - _offsets[idx] : _last_value_size;
+        return Slice(&_buffer[_offsets[idx]], value_size);
     }
 
+    inline Slice get(std::size_t idx) const { return (*this)[idx]; }
+
 private:
     void _copy_value_at(size_t idx, faststring* value) const {
         size_t value_size =
@@ -141,7 +144,6 @@ private:
 
     faststring _buffer;
     size_t _size_estimate;
-    size_t _prepared_size;
     // Offsets of each entry, relative to the start of the page
     std::vector<uint32_t> _offsets;
     bool _finished;
diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page.h b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
index fe7ba566fa..9bada3f764 100644
--- a/be/src/olap/rowset/segment_v2/bitshuffle_page.h
+++ b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
@@ -93,13 +93,48 @@ public:
     bool is_page_full() override { return _remain_element_capacity == 0; }
 
     Status add(const uint8_t* vals, size_t* count) override {
+        return add_internal<false>(vals, count);
+    }
+
+    Status single_add(const uint8_t* vals, size_t* count) {
+        return add_internal<true>(vals, count);
+    }
+
+    template <bool single>
+    inline Status add_internal(const uint8_t* vals, size_t* count) {
         DCHECK(!_finished);
+        if (_remain_element_capacity <= 0) {
+            *count = 0;
+            return Status::RuntimeError("page is full.");
+        }
         int to_add = std::min<int>(_remain_element_capacity, *count);
-        _data.append(vals, to_add * SIZE_OF_TYPE);
+        int to_add_size = to_add * SIZE_OF_TYPE;
+        size_t orig_size = _data.size();
+        _data.resize(orig_size + to_add_size);
         _count += to_add;
         _remain_element_capacity -= to_add;
         // return added number through count
         *count = to_add;
+        if constexpr (single) {
+            if constexpr (SIZE_OF_TYPE == 1) {
+                *reinterpret_cast<uint8_t*>(&_data[orig_size]) = *vals;
+                return Status::OK();
+            } else if constexpr (SIZE_OF_TYPE == 2) {
+                *reinterpret_cast<uint16_t*>(&_data[orig_size]) =
+                        *reinterpret_cast<const uint16_t*>(vals);
+                return Status::OK();
+            } else if constexpr (SIZE_OF_TYPE == 4) {
+                *reinterpret_cast<uint32_t*>(&_data[orig_size]) =
+                        *reinterpret_cast<const uint32_t*>(vals);
+                return Status::OK();
+            } else if constexpr (SIZE_OF_TYPE == 8) {
+                *reinterpret_cast<uint64_t*>(&_data[orig_size]) =
+                        *reinterpret_cast<const uint64_t*>(vals);
+                return Status::OK();
+            }
+        }
+        // when single is true and SIZE_OF_TYPE > 8 or single is false
+        memcpy(&_data[orig_size], vals, to_add_size);
         return Status::OK();
     }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org