You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by zo...@apache.org on 2023/06/19 10:34:51 UTC
[doris] branch master updated: [improvement](load) avoid producing small segment (#20852)
This is an automated email from the ASF dual-hosted git repository.
zouxinyi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new a3342cb088 [improvement](load) avoid producing small segment (#20852)
a3342cb088 is described below
commit a3342cb088702e60bfd0fba83dca8d2e5cf65990
Author: Yongqiang YANG <98...@users.noreply.github.com>
AuthorDate: Mon Jun 19 18:34:44 2023 +0800
[improvement](load) avoid producing small segment (#20852)
avoid producing small segment
---
be/src/olap/delta_writer.cpp | 38 +++++++++++++++++--------------------
be/src/olap/delta_writer.h | 14 +-------------
be/src/runtime/load_channel.h | 4 ++--
be/src/runtime/load_channel_mgr.cpp | 7 ++++---
be/src/runtime/tablets_channel.cpp | 10 ++++++++++
be/src/runtime/tablets_channel.h | 7 ++-----
6 files changed, 36 insertions(+), 44 deletions(-)
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index df9167484b..5c0d7dc2d2 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -529,27 +529,6 @@ Status DeltaWriter::cancel_with_status(const Status& st) {
return Status::OK();
}
-void DeltaWriter::save_mem_consumption_snapshot() {
- std::lock_guard<std::mutex> l(_lock);
- _mem_consumption_snapshot = mem_consumption(MemType::ALL);
- if (_mem_table == nullptr) {
- _memtable_consumption_snapshot = 0;
- } else {
- _memtable_consumption_snapshot = _mem_table->memory_usage();
- }
-}
-
-int64_t DeltaWriter::get_memtable_consumption_inflush() const {
- if (!_is_init || _flush_token->get_stats().flush_running_count == 0) {
- return 0;
- }
- return _mem_consumption_snapshot - _memtable_consumption_snapshot;
-}
-
-int64_t DeltaWriter::get_memtable_consumption_snapshot() const {
- return _memtable_consumption_snapshot;
-}
-
int64_t DeltaWriter::mem_consumption(MemType mem) {
if (_flush_token == nullptr) {
// This method may be called before this writer is initialized.
@@ -573,6 +552,23 @@ int64_t DeltaWriter::mem_consumption(MemType mem) {
return mem_usage;
}
+int64_t DeltaWriter::active_memtable_mem_consumption() {
+ if (_flush_token == nullptr) {
+ // This method may be called before this writer is initialized.
+ // So _flush_token may be null.
+ return 0;
+ }
+ int64_t mem_usage = 0;
+ {
+ std::lock_guard<SpinLock> l(_mem_table_tracker_lock);
+ if (_mem_table_insert_trackers.size() > 0) {
+ mem_usage += (*_mem_table_insert_trackers.rbegin())->consumption();
+ mem_usage += (*_mem_table_flush_trackers.rbegin())->consumption();
+ }
+ }
+ return mem_usage;
+}
+
int64_t DeltaWriter::partition_id() const {
return _req.partition_id;
}
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index b91144751c..52b407876f 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -116,6 +116,7 @@ public:
int64_t partition_id() const;
int64_t mem_consumption(MemType mem);
+ int64_t active_memtable_mem_consumption();
// Wait all memtable in flush queue to be flushed
Status wait_flush();
@@ -124,12 +125,6 @@ public:
int32_t schema_hash() { return _tablet->schema_hash(); }
- void save_mem_consumption_snapshot();
-
- int64_t get_memtable_consumption_inflush() const;
-
- int64_t get_memtable_consumption_snapshot() const;
-
void finish_slave_tablet_pull_rowset(int64_t node_id, bool is_succeed);
int64_t total_received_rows() const { return _total_received_rows; }
@@ -181,13 +176,6 @@ private:
std::mutex _lock;
- // memory consumption snapshot for current delta_writer, only
- // used for std::sort
- int64_t _mem_consumption_snapshot = 0;
- // memory consumption snapshot for current memtable, only
- // used for std::sort
- int64_t _memtable_consumption_snapshot = 0;
-
std::unordered_set<int64_t> _unfinished_slave_node;
PSuccessSlaveTabletNodeIds _success_slave_node_ids;
std::shared_mutex _slave_node_lock;
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index e746014fde..5648cb0ae1 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -87,13 +87,13 @@ public:
return mem_usage;
}
- void get_writers_mem_consumption_snapshot(
+ void get_active_memtable_mem_consumption(
std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>*
writers_mem_snap) {
std::lock_guard<SpinLock> l(_tablets_channels_lock);
for (auto& it : _tablets_channels) {
std::multimap<int64_t, int64_t, std::greater<int64_t>> tablets_channel_mem;
- it.second->get_writers_mem_consumption_snapshot(&tablets_channel_mem);
+ it.second->get_active_memtable_mem_consumption(&tablets_channel_mem);
writers_mem_snap->emplace_back(it.first, std::move(tablets_channel_mem));
}
}
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index 1a9fd86f2d..a9d94aedb9 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -360,12 +360,12 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
for (auto& kv : _load_channels) {
if (kv.second->is_high_priority()) {
// do not select high priority channel to reduce memory
- // to avoid blocking them.
+ // to avoid blocking the.
continue;
}
std::vector<std::pair<int64_t, std::multimap<int64_t, int64_t, std::greater<int64_t>>>>
writers_mem_snap;
- kv.second->get_writers_mem_consumption_snapshot(&writers_mem_snap);
+ kv.second->get_active_memtable_mem_consumption(&writers_mem_snap);
for (auto item : writers_mem_snap) {
// multimap is empty
if (item.second.empty()) {
@@ -412,8 +412,9 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
<< " delta writers (total mem: "
<< PrettyPrinter::print_bytes(mem_consumption_in_picked_writer) << ", max mem: "
<< PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.front()))
+ << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.front())
<< ", min mem:" << PrettyPrinter::print_bytes(std::get<3>(writers_to_reduce_mem.back()))
- << "), ";
+ << ", tablet_id: " << std::get<2>(writers_to_reduce_mem.back()) << "), ";
if (proc_mem_no_allocator_cache < process_soft_mem_limit) {
oss << "because total load mem consumption "
<< PrettyPrinter::print_bytes(_mem_tracker->consumption()) << " has exceeded";
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index b7ac5dcd64..05b17c5931 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -282,6 +282,16 @@ int64_t TabletsChannel::mem_consumption() {
return write_mem_usage + flush_mem_usage;
}
+void TabletsChannel::get_active_memtable_mem_consumption(
+ std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) {
+ mem_consumptions->clear();
+ std::lock_guard<SpinLock> l(_tablet_writers_lock);
+ for (auto& it : _tablet_writers) {
+ int64_t active_memtable_mem = it.second->active_memtable_mem_consumption();
+ mem_consumptions->emplace(active_memtable_mem, it.first);
+ }
+}
+
// Old logic,used for opening all writers of all partitions.
Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
std::vector<SlotDescriptor*>* index_slots = nullptr;
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index 58cfd961ff..b9235925b0 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -115,11 +115,8 @@ public:
int64_t mem_consumption();
- void get_writers_mem_consumption_snapshot(
- std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions) {
- std::lock_guard<SpinLock> l(_tablet_writers_lock);
- *mem_consumptions = _mem_consumptions;
- }
+ void get_active_memtable_mem_consumption(
+ std::multimap<int64_t, int64_t, std::greater<int64_t>>* mem_consumptions);
void flush_memtable_async(int64_t tablet_id);
void wait_flush(int64_t tablet_id);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org