You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ya...@apache.org on 2022/05/07 02:27:33 UTC
[incubator-doris] branch master updated: [enhancement](load) optimize load string data and dict page write (#9123)
This is an automated email from the ASF dual-hosted git repository.
yangzhg pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 2ccaa6338c [enhancement](load) optimize load string data and dict page write (#9123)
2ccaa6338c is described below
commit 2ccaa6338c272a953c93aaf9827779f9c8543251
Author: Zhengguo Yang <ya...@gmail.com>
AuthorDate: Sat May 7 10:27:27 2022 +0800
[enhancement](load) optimize load string data and dict page write (#9123)
* [enhancement](load) optimize load string data and dict page write
---
be/src/olap/memtable.cpp | 87 ++++++++++++----------
be/src/olap/memtable.h | 12 ++-
be/src/olap/rowset/segment_v2/binary_dict_page.cpp | 34 ++++-----
be/src/olap/rowset/segment_v2/binary_plain_page.h | 18 +++--
be/src/olap/rowset/segment_v2/bitshuffle_page.h | 37 ++++++++-
5 files changed, 116 insertions(+), 72 deletions(-)
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 7e8d2fa041..7b9521e775 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -51,18 +51,28 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
_skip_list = nullptr;
_vec_row_comparator = std::make_shared<RowInBlockComparator>(_schema);
// TODO: Support ZOrderComparator in the future
- _vec_skip_list = new VecTable(_vec_row_comparator.get(), _table_mem_pool.get(),
- _keys_type == KeysType::DUP_KEYS);
+ _vec_skip_list = std::make_unique<VecTable>(
+ _vec_row_comparator.get(), _table_mem_pool.get(), _keys_type == KeysType::DUP_KEYS);
} else {
_vec_skip_list = nullptr;
+ if (_keys_type == KeysType::DUP_KEYS) {
+ _insert_fn = &MemTable::_insert_dup;
+ } else {
+ _insert_fn = &MemTable::_insert_agg;
+ }
+ if (_tablet_schema->has_sequence_col()) {
+ _aggregate_two_row_fn = &MemTable::_aggregate_two_row_with_sequence;
+ } else {
+ _aggregate_two_row_fn = &MemTable::_aggregate_two_row;
+ }
if (tablet_schema->sort_type() == SortType::ZORDER) {
_row_comparator = std::make_shared<TupleRowZOrderComparator>(
_schema, tablet_schema->sort_col_num());
} else {
_row_comparator = std::make_shared<RowCursorComparator>(_schema);
}
- _skip_list = new Table(_row_comparator.get(), _table_mem_pool.get(),
- _keys_type == KeysType::DUP_KEYS);
+ _skip_list = std::make_unique<Table>(_row_comparator.get(), _table_mem_pool.get(),
+ _keys_type == KeysType::DUP_KEYS);
}
}
@@ -86,9 +96,6 @@ void MemTable::_init_agg_functions(const vectorized::Block* block) {
}
MemTable::~MemTable() {
- delete _skip_list;
- delete _vec_skip_list;
-
std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
_mem_tracker->release(_mem_usage);
}
@@ -158,37 +165,25 @@ void MemTable::_insert_one_row_from_block(RowInBlock* row_in_block) {
}
}
-void MemTable::insert(const Tuple* tuple) {
+// For non-DUP models, for the data rows passed from the upper layer, when copying the data,
+// we first allocate from _buffer_mem_pool, and then check whether it already exists in
+// _skiplist. If it exists, we aggregate the new row into the row in skiplist.
+// otherwise, we need to copy it into _table_mem_pool before we can insert it.
+void MemTable::_insert_agg(const Tuple* tuple) {
_rows++;
- bool overwritten = false;
- uint8_t* _tuple_buf = nullptr;
- if (_keys_type == KeysType::DUP_KEYS) {
- // Will insert directly, so use memory from _table_mem_pool
- _tuple_buf = _table_mem_pool->allocate(_schema_size);
- ContiguousRow row(_schema, _tuple_buf);
- _tuple_to_row(tuple, &row, _table_mem_pool.get());
- _skip_list->Insert((TableKey)_tuple_buf, &overwritten);
- DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
- return;
- }
-
- // For non-DUP models, for the data rows passed from the upper layer, when copying the data,
- // we first allocate from _buffer_mem_pool, and then check whether it already exists in
- // _skiplist. If it exists, we aggregate the new row into the row in skiplist.
- // otherwise, we need to copy it into _table_mem_pool before we can insert it.
- _tuple_buf = _buffer_mem_pool->allocate(_schema_size);
- ContiguousRow src_row(_schema, _tuple_buf);
+ uint8_t* tuple_buf = _buffer_mem_pool->allocate(_schema_size);
+ ContiguousRow src_row(_schema, tuple_buf);
_tuple_to_row(tuple, &src_row, _buffer_mem_pool.get());
- bool is_exist = _skip_list->Find((TableKey)_tuple_buf, &_hint);
+ bool is_exist = _skip_list->Find((TableKey)tuple_buf, &_hint);
if (is_exist) {
- _aggregate_two_row(src_row, _hint.curr->key);
+ (this->*_aggregate_two_row_fn)(src_row, _hint.curr->key);
} else {
- _tuple_buf = _table_mem_pool->allocate(_schema_size);
- ContiguousRow dst_row(_schema, _tuple_buf);
+ tuple_buf = _table_mem_pool->allocate(_schema_size);
+ ContiguousRow dst_row(_schema, tuple_buf);
_agg_object_pool.acquire_data(&_agg_buffer_pool);
copy_row_in_memtable(&dst_row, src_row, _table_mem_pool.get());
- _skip_list->InsertWithHint((TableKey)_tuple_buf, is_exist, &_hint);
+ _skip_list->InsertWithHint((TableKey)tuple_buf, is_exist, &_hint);
}
// Make MemPool to be reusable, but does not free its memory
@@ -196,6 +191,16 @@ void MemTable::insert(const Tuple* tuple) {
_agg_buffer_pool.clear();
}
+void MemTable::_insert_dup(const Tuple* tuple) {
+ _rows++;
+ bool overwritten = false;
+ uint8_t* tuple_buf = _table_mem_pool->allocate(_schema_size);
+ ContiguousRow row(_schema, tuple_buf);
+ _tuple_to_row(tuple, &row, _table_mem_pool.get());
+ _skip_list->Insert((TableKey)tuple_buf, &overwritten);
+ DCHECK(!overwritten) << "Duplicate key model meet overwrite in SkipList";
+}
+
void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool) {
for (size_t i = 0; i < _slot_descs->size(); ++i) {
auto cell = row->cell(i);
@@ -209,12 +214,14 @@ void MemTable::_tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* me
void MemTable::_aggregate_two_row(const ContiguousRow& src_row, TableKey row_in_skiplist) {
ContiguousRow dst_row(_schema, row_in_skiplist);
- if (_tablet_schema->has_sequence_col()) {
- agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
- _table_mem_pool.get());
- } else {
- agg_update_row(&dst_row, src_row, _table_mem_pool.get());
- }
+ agg_update_row(&dst_row, src_row, _table_mem_pool.get());
+}
+
+void MemTable::_aggregate_two_row_with_sequence(const ContiguousRow& src_row,
+ TableKey row_in_skiplist) {
+ ContiguousRow dst_row(_schema, row_in_skiplist);
+ agg_update_row_with_sequence(&dst_row, src_row, _tablet_schema->sequence_col_idx(),
+ _table_mem_pool.get());
}
void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist) {
@@ -236,7 +243,7 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_
}
}
vectorized::Block MemTable::_collect_vskiplist_results() {
- VecTable::Iterator it(_vec_skip_list);
+ VecTable::Iterator it(_vec_skip_list.get());
vectorized::Block in_block = _input_mutable_block.to_block();
// TODO: should try to insert data by column, not by row. to opt the the code
if (_keys_type == KeysType::DUP_KEYS) {
@@ -282,7 +289,7 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
if (st == Status::OLAPInternalError(OLAP_ERR_FUNC_NOT_IMPLEMENTED)) {
// For alpha rowset, we do not implement "flush_single_memtable".
// Flush the memtable like the old way.
- Table::Iterator it(_skip_list);
+ Table::Iterator it(_skip_list.get());
for (it.SeekToFirst(); it.Valid(); it.Next()) {
char* row = (char*)it.key();
ContiguousRow dst_row(_schema, row);
@@ -307,7 +314,7 @@ Status MemTable::close() {
}
MemTable::Iterator::Iterator(MemTable* memtable)
- : _mem_table(memtable), _it(memtable->_skip_list) {}
+ : _mem_table(memtable), _it(memtable->_skip_list.get()) {}
void MemTable::Iterator::seek_to_first() {
_it.SeekToFirst();
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index ed3cbc5fa3..4922bb62cd 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -50,7 +50,7 @@ public:
size_t memory_usage() const { return _mem_tracker->consumption(); }
std::shared_ptr<MemTracker>& mem_tracker() { return _mem_tracker; }
- void insert(const Tuple* tuple);
+ inline void insert(const Tuple* tuple) { (this->*_insert_fn)(tuple); }
// insert tuple from (row_pos) to (row_pos+num_rows)
void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
@@ -140,6 +140,9 @@ public:
private:
void _tuple_to_row(const Tuple* tuple, ContiguousRow* row, MemPool* mem_pool);
void _aggregate_two_row(const ContiguousRow& new_row, TableKey row_in_skiplist);
+ void _aggregate_two_row_with_sequence(const ContiguousRow& new_row, TableKey row_in_skiplist);
+ void _insert_dup(const Tuple* tuple);
+ void _insert_agg(const Tuple* tuple);
// for vectorized
void _insert_one_row_from_block(RowInBlock* row_in_block);
void _aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_in_skiplist);
@@ -171,10 +174,10 @@ private:
ObjectPool _agg_object_pool;
size_t _schema_size;
- Table* _skip_list;
+ std::unique_ptr<Table> _skip_list;
Table::Hint _hint;
- VecTable* _vec_skip_list;
+ std::unique_ptr<VecTable> _vec_skip_list;
VecTable::Hint _vec_hint;
RowsetWriter* _rowset_writer;
@@ -185,6 +188,9 @@ private:
// This is not the rows in this memtable, because rows may be merged
// in unique or aggragate key model.
int64_t _rows = 0;
+ void (MemTable::*_insert_fn)(const Tuple* tuple) = nullptr;
+ void (MemTable::*_aggregate_two_row_fn)(const ContiguousRow& new_row,
+ TableKey row_in_skiplist) = nullptr;
//for vectorized
vectorized::MutableBlock _input_mutable_block;
diff --git a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
index a52dde719a..f8c67e5e21 100644
--- a/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
+++ b/be/src/olap/rowset/segment_v2/binary_dict_page.cpp
@@ -23,9 +23,9 @@
#include "util/slice.h" // for Slice
#include "vec/columns/column.h"
#include "vec/columns/column_dictionary.h"
-#include "vec/columns/column_vector.h"
-#include "vec/columns/column_string.h"
#include "vec/columns/column_nullable.h"
+#include "vec/columns/column_string.h"
+#include "vec/columns/column_vector.h"
#include "vec/columns/predicate_column.h"
namespace doris {
@@ -66,6 +66,8 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
const Slice* src = reinterpret_cast<const Slice*>(vals);
size_t num_added = 0;
uint32_t value_code = -1;
+ auto* actual_builder =
+ down_cast<BitshufflePageBuilder<OLAP_FIELD_TYPE_INT>*>(_data_page_builder.get());
if (_data_page_builder->count() == 0) {
_first_value.assign_copy(reinterpret_cast<const uint8_t*>(src->get_data()),
@@ -90,13 +92,18 @@ Status BinaryDictPageBuilder::add(const uint8_t* vals, size_t* count) {
dict_item.relocate(item_mem);
}
value_code = _dictionary.size();
+ size_t add_count = 1;
+ RETURN_IF_ERROR(_dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item),
+ &add_count));
+ if (add_count == 0) {
+ // current dict page is full, stop processing remaining inputs
+ break;
+ }
_dictionary.emplace(dict_item, value_code);
- _dict_items.push_back(dict_item);
- _dict_builder->update_prepared_size(dict_item.size);
}
size_t add_count = 1;
- RETURN_IF_ERROR(_data_page_builder->add(reinterpret_cast<const uint8_t*>(&value_code),
- &add_count));
+ RETURN_IF_ERROR(actual_builder->single_add(
+ reinterpret_cast<const uint8_t*>(&value_code), &add_count));
if (add_count == 0) {
// current data page is full, stop processing remaining inputs
break;
@@ -144,17 +151,7 @@ uint64_t BinaryDictPageBuilder::size() const {
}
Status BinaryDictPageBuilder::get_dictionary_page(OwnedSlice* dictionary_page) {
- _dictionary.clear();
- _dict_builder->reset();
- size_t add_count = 1;
- // here do not check is_page_full of dict_builder
- // because it is checked in add
- for (auto& dict_item : _dict_items) {
- RETURN_IF_ERROR(
- _dict_builder->add(reinterpret_cast<const uint8_t*>(&dict_item), &add_count));
- }
*dictionary_page = _dict_builder->finish();
- _dict_items.clear();
return Status::OK();
}
@@ -180,10 +177,7 @@ Status BinaryDictPageBuilder::get_last_value(void* value) const {
}
uint32_t value_code;
RETURN_IF_ERROR(_data_page_builder->get_last_value(&value_code));
- // TODO _dict_items is cleared in get_dictionary_page, which could cause
- // get_last_value to fail when it's called after get_dictionary_page.
- // the solution is to read last value from _dict_builder instead of _dict_items
- *reinterpret_cast<Slice*>(value) = _dict_items[value_code];
+ *reinterpret_cast<Slice*>(value) = _dict_builder->get(value_code);
return Status::OK();
}
diff --git a/be/src/olap/rowset/segment_v2/binary_plain_page.h b/be/src/olap/rowset/segment_v2/binary_plain_page.h
index e655baca9c..e8c8f2a18e 100644
--- a/be/src/olap/rowset/segment_v2/binary_plain_page.h
+++ b/be/src/olap/rowset/segment_v2/binary_plain_page.h
@@ -47,14 +47,13 @@ namespace segment_v2 {
class BinaryPlainPageBuilder : public PageBuilder {
public:
BinaryPlainPageBuilder(const PageBuilderOptions& options)
- : _size_estimate(0), _prepared_size(0), _options(options) {
+ : _size_estimate(0), _options(options) {
reset();
}
bool is_page_full() override {
// data_page_size is 0, do not limit the page size
- return _options.data_page_size != 0 && (_size_estimate > _options.data_page_size ||
- _prepared_size > _options.data_page_size);
+ return _options.data_page_size != 0 && _size_estimate > _options.data_page_size;
}
Status add(const uint8_t* vals, size_t* count) override {
@@ -101,7 +100,6 @@ public:
_buffer.clear();
_buffer.reserve(_options.data_page_size == 0 ? 1024 : _options.data_page_size);
_size_estimate = sizeof(uint32_t);
- _prepared_size = sizeof(uint32_t);
_finished = false;
_last_value_size = 0;
}
@@ -127,11 +125,16 @@ public:
return Status::OK();
}
- void update_prepared_size(size_t added_size) {
- _prepared_size += added_size;
- _prepared_size += sizeof(uint32_t);
+ inline Slice operator[](size_t idx) const {
+ DCHECK(!_finished);
+ DCHECK_LT(idx, _offsets.size());
+ size_t value_size =
+ (idx < _offsets.size() - 1) ? _offsets[idx + 1] - _offsets[idx] : _last_value_size;
+ return Slice(&_buffer[_offsets[idx]], value_size);
}
+ inline Slice get(std::size_t idx) const { return (*this)[idx]; }
+
private:
void _copy_value_at(size_t idx, faststring* value) const {
size_t value_size =
@@ -141,7 +144,6 @@ private:
faststring _buffer;
size_t _size_estimate;
- size_t _prepared_size;
// Offsets of each entry, relative to the start of the page
std::vector<uint32_t> _offsets;
bool _finished;
diff --git a/be/src/olap/rowset/segment_v2/bitshuffle_page.h b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
index ceadcf6579..50edb30c2a 100644
--- a/be/src/olap/rowset/segment_v2/bitshuffle_page.h
+++ b/be/src/olap/rowset/segment_v2/bitshuffle_page.h
@@ -93,13 +93,48 @@ public:
bool is_page_full() override { return _remain_element_capacity == 0; }
Status add(const uint8_t* vals, size_t* count) override {
+ return add_internal<false>(vals, count);
+ }
+
+ Status single_add(const uint8_t* vals, size_t* count) {
+ return add_internal<true>(vals, count);
+ }
+
+ template <bool single>
+ inline Status add_internal(const uint8_t* vals, size_t* count) {
DCHECK(!_finished);
+ if (_remain_element_capacity <= 0) {
+ *count = 0;
+ return Status::RuntimeError("page is full.");
+ }
int to_add = std::min<int>(_remain_element_capacity, *count);
- _data.append(vals, to_add * SIZE_OF_TYPE);
+ int to_add_size = to_add * SIZE_OF_TYPE;
+ size_t orig_size = _data.size();
+ _data.resize(orig_size + to_add_size);
_count += to_add;
_remain_element_capacity -= to_add;
// return added number through count
*count = to_add;
+ if constexpr (single) {
+ if constexpr (SIZE_OF_TYPE == 1) {
+ *reinterpret_cast<uint8_t*>(&_data[orig_size]) = *vals;
+ return Status::OK();
+ } else if constexpr (SIZE_OF_TYPE == 2) {
+ *reinterpret_cast<uint16_t*>(&_data[orig_size]) =
+ *reinterpret_cast<const uint16_t*>(vals);
+ return Status::OK();
+ } else if constexpr (SIZE_OF_TYPE == 4) {
+ *reinterpret_cast<uint32_t*>(&_data[orig_size]) =
+ *reinterpret_cast<const uint32_t*>(vals);
+ return Status::OK();
+ } else if constexpr (SIZE_OF_TYPE == 8) {
+ *reinterpret_cast<uint64_t*>(&_data[orig_size]) =
+ *reinterpret_cast<const uint64_t*>(vals);
+ return Status::OK();
+ }
+ }
+ // when single is true and SIZE_OF_TYPE > 8 or single is false
+ memcpy(&_data[orig_size], vals, to_add_size);
return Status::OK();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org