You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by ga...@apache.org on 2023/01/11 02:07:43 UTC
[doris] branch master updated: [refactor] separate agg and flush in memtable (#15713)
This is an automated email from the ASF dual-hosted git repository.
gabriellee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new fe5e5d2bf4 [refactor] separate agg and flush in memtable (#15713)
fe5e5d2bf4 is described below
commit fe5e5d2bf427e7c29f47692e3f7f86212c74b375
Author: zbtzbtzbt <bi...@163.com>
AuthorDate: Wed Jan 11 10:07:34 2023 +0800
[refactor] separate agg and flush in memtable (#15713)
---
be/src/common/config.h | 9 ++++-----
be/src/olap/delta_writer.cpp | 14 +++++++-------
be/src/olap/memtable.cpp | 11 +++++++----
be/src/olap/memtable.h | 4 ++--
be/src/olap/push_handler.cpp | 2 +-
5 files changed, 21 insertions(+), 19 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 91b9eecc44..394ffd71c4 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -490,13 +490,12 @@ CONF_mInt64(load_channel_memory_refresh_sleep_time_ms, "100");
// Alignment
CONF_Int32(memory_max_alignment, "16");
-// write buffer size before flush
+// max write buffer size before flush, default 200MB
CONF_mInt64(write_buffer_size, "209715200");
-
-// max buffer size used in memtable for the aggregated table
-CONF_mInt64(memtable_max_buffer_size, "419430400");
+// max buffer size used in memtable for the aggregated table, default 400MB
+CONF_mInt64(write_buffer_size_for_agg, "419430400");
// write buffer size in push task for sparkload, default 1GB
-CONF_mInt64(flush_size_for_sparkload, "1073741824");
+CONF_mInt64(write_buffer_size_for_sparkload, "1073741824");
// following 2 configs limit the memory consumption of load process on a Backend.
// eg: memory limit to 80% of mem limit config but up to 100GB(default)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index ee17286b6b..1dd22c6e6c 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -169,14 +169,14 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
_total_received_rows += row_idxs.size();
_mem_table->insert(block, row_idxs);
- if (_mem_table->need_to_agg()) {
+ if (UNLIKELY(_mem_table->need_agg())) {
_mem_table->shrink_memtable_by_agg();
- if (_mem_table->is_flush()) {
- auto s = _flush_memtable_async();
- _reset_mem_table();
- if (UNLIKELY(!s.ok())) {
- return s;
- }
+ }
+ if (UNLIKELY(_mem_table->need_flush())) {
+ auto s = _flush_memtable_async();
+ _reset_mem_table();
+ if (UNLIKELY(!s.ok())) {
+ return s;
}
}
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 48c4e605cf..ea1188e874 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -296,13 +296,16 @@ void MemTable::shrink_memtable_by_agg() {
_collect_vskiplist_results<false>();
}
-bool MemTable::is_flush() const {
+bool MemTable::need_flush() const {
return memory_usage() >= config::write_buffer_size;
}
-bool MemTable::need_to_agg() {
- return _keys_type == KeysType::DUP_KEYS ? is_flush()
- : memory_usage() >= config::memtable_max_buffer_size;
+bool MemTable::need_agg() const {
+ if (_keys_type == KeysType::AGG_KEYS) {
+ return memory_usage() >= config::write_buffer_size_for_agg;
+ }
+
+ return false;
}
Status MemTable::_generate_delete_bitmap(int64_t atomic_num_segments_before_flush,
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index a7a8af3efa..a908fa358a 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -58,9 +58,9 @@ public:
void shrink_memtable_by_agg();
- bool is_flush() const;
+ bool need_flush() const;
- bool need_to_agg();
+ bool need_agg() const;
/// Flush
Status flush();
diff --git a/be/src/olap/push_handler.cpp b/be/src/olap/push_handler.cpp
index 55472c6d82..155b322c85 100644
--- a/be/src/olap/push_handler.cpp
+++ b/be/src/olap/push_handler.cpp
@@ -250,7 +250,7 @@ Status PushHandler::_convert_v2(TabletSharedPtr cur_tablet, RowsetSharedPtr* cur
VLOG_NOTICE << "start to convert etl file to delta.";
while (!reader->eof()) {
if (reader->mem_pool()->mem_tracker()->consumption() >
- config::flush_size_for_sparkload) {
+ config::write_buffer_size_for_sparkload) {
RETURN_NOT_OK(rowset_writer->flush());
reader->mem_pool()->free_all();
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org