You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by li...@apache.org on 2022/05/25 13:12:31 UTC
[incubator-doris] branch master updated: [stream-load-vec]: memtable flush only if necessary after aggregated (#9459)
This is an automated email from the ASF dual-hosted git repository.
lihaopeng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git
The following commit(s) were added to refs/heads/master by this push:
new 73e31a2179 [stream-load-vec]: memtable flush only if necessary after aggregated (#9459)
73e31a2179 is described below
commit 73e31a21799c15abde903502ad5b73138b01c336
Author: spaces-x <we...@gmail.com>
AuthorDate: Wed May 25 21:12:24 2022 +0800
[stream-load-vec]: memtable flush only if necessary after aggregated (#9459)
Co-authored-by: weixiang <we...@meituan.com>
---
be/src/common/config.h | 3 +++
be/src/olap/delta_writer.cpp | 9 ++++---
be/src/olap/memtable.cpp | 56 +++++++++++++++++++++++++++++++++++---------
be/src/olap/memtable.h | 10 +++++++-
be/src/vec/core/block.cpp | 19 +++++++++++++++
be/src/vec/core/block.h | 7 ++++++
6 files changed, 89 insertions(+), 15 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index fcfc2e3f78..a73efcf24d 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -464,6 +464,9 @@ CONF_Int32(memory_max_alignment, "16");
// write buffer size before flush
CONF_mInt64(write_buffer_size, "209715200");
+// max buffer size used in memtable for the aggregated table
+CONF_mInt64(memtable_max_buffer_size, "419430400");
+
// following 2 configs limit the memory consumption of load process on a Backend.
// eg: memory limit to 80% of mem limit config but up to 100GB(default)
// NOTICE(cmy): set these default values very large because we don't want to
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 11ee242449..fe0ffeddbb 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -203,9 +203,12 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
}
}
- if (_mem_table->memory_usage() >= config::write_buffer_size) {
- RETURN_NOT_OK(_flush_memtable_async());
- _reset_mem_table();
+ if (_mem_table->need_to_agg()) {
+ _mem_table->shrink_memtable_by_agg();
+ if (_mem_table->is_flush()) {
+ RETURN_NOT_OK(_flush_memtable_async());
+ _reset_mem_table();
+ }
}
return Status::OK();
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index c7f94ead4f..cf58f7e4ba 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -126,11 +126,10 @@ void MemTable::insert(const vectorized::Block* block, size_t row_pos, size_t num
}
}
size_t cursor_in_mutableblock = _input_mutable_block.rows();
- size_t oldsize = _input_mutable_block.allocated_bytes();
_input_mutable_block.add_rows(block, row_pos, num_rows);
- size_t newsize = _input_mutable_block.allocated_bytes();
- _mem_usage += newsize - oldsize;
- _mem_tracker->consume(newsize - oldsize);
+ size_t input_size = block->allocated_bytes() * num_rows / block->rows();
+ _mem_usage += input_size;
+ _mem_tracker->consume(input_size);
for (int i = 0; i < num_rows; i++) {
_row_in_blocks.emplace_back(new RowInBlock {cursor_in_mutableblock + i});
@@ -242,7 +241,8 @@ void MemTable::_aggregate_two_row_in_block(RowInBlock* new_row, RowInBlock* row_
new_row->_row_pos, nullptr);
}
}
-vectorized::Block MemTable::_collect_vskiplist_results() {
+template <bool is_final>
+void MemTable::_collect_vskiplist_results() {
VecTable::Iterator it(_vec_skip_list.get());
vectorized::Block in_block = _input_mutable_block.to_block();
// TODO: should try to insert data by column, not by row. to opt the code
@@ -251,6 +251,7 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
_output_mutable_block.add_row(&in_block, it.key()->_row_pos);
}
} else {
+ size_t idx = 0;
for (it.SeekToFirst(); it.Valid(); it.Next()) {
auto& block_data = in_block.get_columns_with_type_and_name();
// move key columns
@@ -263,11 +264,46 @@ vectorized::Block MemTable::_collect_vskiplist_results() {
auto function = _agg_functions[i];
function->insert_result_into(it.key()->_agg_places[i],
*(_output_mutable_block.get_column_by_position(i)));
- function->destroy(it.key()->_agg_places[i]);
+ if constexpr (is_final) {
+ function->destroy(it.key()->_agg_places[i]);
+ }
}
+ if constexpr (!is_final) {
+ // re-index the row_pos in VSkipList
+ it.key()->_row_pos = idx;
+ idx++;
+ }
+ }
+ if constexpr (!is_final) {
+ // if is not final, we collect the agg results to input_block and then continue to insert
+ size_t shrunked_after_agg = _output_mutable_block.allocated_bytes();
+ _mem_tracker->consume(shrunked_after_agg - _mem_usage);
+ _mem_usage = shrunked_after_agg;
+ _input_mutable_block.swap(_output_mutable_block);
+ //TODO(weixang):opt here.
+ std::unique_ptr<vectorized::Block> empty_input_block =
+ std::move(in_block.create_same_struct_block(0));
+ _output_mutable_block =
+ vectorized::MutableBlock::build_mutable_block(empty_input_block.get());
+ _output_mutable_block.clear_column_data();
}
}
- return _output_mutable_block.to_block();
+}
+
+void MemTable::shrink_memtable_by_agg() {
+ if (_keys_type == KeysType::DUP_KEYS) {
+ return;
+ }
+ _collect_vskiplist_results<false>();
+}
+
+bool MemTable::is_flush() {
+ return memory_usage() >= config::write_buffer_size;
+}
+
+bool MemTable::need_to_agg() {
+ return _keys_type == KeysType::DUP_KEYS ? is_flush()
+ : memory_usage() >= config::memtable_max_buffer_size;
}
Status MemTable::flush() {
@@ -301,10 +337,8 @@ Status MemTable::_do_flush(int64_t& duration_ns) {
RETURN_NOT_OK(st);
}
} else {
- vectorized::Block block = _collect_vskiplist_results();
- // beta rowset flush parallel, segment write add block is not
- // thread safe, so use tmp variable segment_write instead of
- // member variable
+ _collect_vskiplist_results<true>();
+ vectorized::Block block = _output_mutable_block.to_block();
RETURN_NOT_OK(_rowset_writer->flush_single_memtable(&block));
_flush_size = block.allocated_bytes();
}
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index 910cd92270..c73b39b39d 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -54,6 +54,12 @@ public:
// insert tuple from (row_pos) to (row_pos+num_rows)
void insert(const vectorized::Block* block, size_t row_pos, size_t num_rows);
+ void shrink_memtable_by_agg();
+
+ bool is_flush();
+
+ bool need_to_agg();
+
/// Flush
Status flush();
Status close();
@@ -195,7 +201,9 @@ private:
//for vectorized
vectorized::MutableBlock _input_mutable_block;
vectorized::MutableBlock _output_mutable_block;
- vectorized::Block _collect_vskiplist_results();
+
+ template <bool is_final>
+ void _collect_vskiplist_results();
bool _is_first_insertion;
void _init_agg_functions(const vectorized::Block* block);
diff --git a/be/src/vec/core/block.cpp b/be/src/vec/core/block.cpp
index 4866a1129e..821231684f 100644
--- a/be/src/vec/core/block.cpp
+++ b/be/src/vec/core/block.cpp
@@ -881,6 +881,17 @@ size_t MutableBlock::rows() const {
return 0;
}
+void MutableBlock::swap(MutableBlock& another) noexcept {
+ _columns.swap(another._columns);
+ _data_types.swap(another._data_types);
+}
+
+void MutableBlock::swap(MutableBlock&& another) noexcept {
+ clear();
+ _columns = std::move(another._columns);
+ _data_types = std::move(another._data_types);
+}
+
void MutableBlock::add_row(const Block* block, int row) {
auto& block_data = block->get_columns_with_type_and_name();
for (size_t i = 0; i < _columns.size(); ++i) {
@@ -1008,4 +1019,12 @@ size_t MutableBlock::allocated_bytes() const {
return res;
}
+void MutableBlock::clear_column_data() noexcept {
+ for (auto& col : _columns) {
+ if (col) {
+ col->clear();
+ }
+ }
+}
+
} // namespace doris::vectorized
diff --git a/be/src/vec/core/block.h b/be/src/vec/core/block.h
index 729f531291..532e9257aa 100644
--- a/be/src/vec/core/block.h
+++ b/be/src/vec/core/block.h
@@ -428,6 +428,10 @@ public:
Block to_block(int start_column, int end_column);
+ void swap(MutableBlock& other) noexcept;
+
+ void swap(MutableBlock&& other) noexcept;
+
void add_row(const Block* block, int row);
void add_rows(const Block* block, const int* row_begin, const int* row_end);
void add_rows(const Block* block, size_t row_begin, size_t length);
@@ -438,6 +442,9 @@ public:
_columns.clear();
_data_types.clear();
}
+
+ void clear_column_data() noexcept;
+
size_t allocated_bytes() const;
};
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org