You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2022/06/25 06:13:08 UTC
[doris] branch master updated: [fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409)
This is an automated email from the ASF dual-hosted git repository.
dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new eb25df5a2c [fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409)
eb25df5a2c is described below
commit eb25df5a2cb7de6e541fd4b0ed8c43ca78cd97a4
Author: Kidd <10...@users.noreply.github.com>
AuthorDate: Sat Jun 25 14:13:02 2022 +0800
[fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409)
* fix load tracker
* fix comment
---
be/src/runtime/load_channel.cpp | 10 ++-----
be/src/runtime/load_channel.h | 6 ++---
be/src/runtime/load_channel_mgr.cpp | 46 +++++++++++++++++----------------
be/src/runtime/load_channel_mgr.h | 8 ++----
be/src/runtime/tablets_channel.cpp | 2 --
be/src/runtime/tcmalloc_hook.h | 5 ++++
be/src/runtime/thread_context.h | 4 +++
be/src/runtime/thread_mem_tracker_mgr.h | 17 +++++++-----
be/src/service/internal_service.cpp | 8 +++---
9 files changed, 53 insertions(+), 53 deletions(-)
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 42fbe9dac5..d2223554eb 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,19 +25,15 @@
namespace doris {
-LoadChannel::LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit,
+LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTracker>& mem_tracker,
int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
bool is_vec)
: _load_id(load_id),
+ _mem_tracker(mem_tracker),
_timeout_s(timeout_s),
_is_high_priority(is_high_priority),
_sender_ip(sender_ip),
_is_vec(is_vec) {
- _mem_tracker = MemTracker::create_tracker(
- channel_mem_limit, "LoadChannel#senderIp=" + sender_ip,
- ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->register_load_mem_tracker(
- _load_id.to_string(), load_mem_limit),
- MemTrackerLevel::TASK);
// _last_updated_time should be set before being inserted to
// _load_channels in load_channel_mgr, or it may be erased
// immediately by gc thread.
@@ -52,7 +48,6 @@ LoadChannel::~LoadChannel() {
}
Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t index_id = params.index_id();
std::shared_ptr<TabletsChannel> channel;
{
@@ -138,7 +133,6 @@ bool LoadChannel::is_finished() {
}
Status LoadChannel::cancel() {
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
for (auto& it : _tablets_channels) {
it.second->cancel();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 38cc2ac89f..20ef476dd9 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,9 +39,8 @@ class Cache;
// corresponding to a certain load job
class LoadChannel {
public:
- LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit,
- int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
- bool is_vec);
+ LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTracker>& mem_tracker,
+ int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, bool is_ve);
~LoadChannel();
// open a new load channel if not exist
@@ -129,7 +128,6 @@ private:
template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
TabletWriterAddResult* response) {
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
int64_t index_id = request.index_id();
// 1. get tablets channel
std::shared_ptr<TabletsChannel> channel;
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index aea5479aa6..c105bd96b3 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -84,10 +84,9 @@ LoadChannelMgr::~LoadChannelMgr() {
Status LoadChannelMgr::init(int64_t process_mem_limit) {
int64_t load_mgr_mem_limit = calc_process_max_load_memory(process_mem_limit);
- _mem_tracker = MemTracker::create_tracker(load_mgr_mem_limit, "LoadChannelMgr",
- MemTracker::get_process_tracker(),
- MemTrackerLevel::OVERVIEW);
- SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+ _mem_tracker = MemTracker::create_virtual_tracker(load_mgr_mem_limit, "LoadChannelMgr",
+ MemTracker::get_process_tracker(),
+ MemTrackerLevel::OVERVIEW);
REGISTER_HOOK_METRIC(load_channel_mem_consumption,
[this]() { return _mem_tracker->consumption(); });
_last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -95,16 +94,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
return Status::OK();
}
-LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t load_mem_limit,
- int64_t channel_mem_limit, int64_t timeout_s,
- bool is_high_priority,
- const std::string& sender_ip, bool is_vec) {
- return new LoadChannel(load_id, load_mem_limit, channel_mem_limit, timeout_s, is_high_priority,
- sender_ip, is_vec);
-}
-
Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> channel;
{
@@ -114,18 +104,31 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
channel = it->second;
} else {
// create a new load channel
- int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1;
- int64_t channel_mem_limit =
- calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit());
-
int64_t timeout_in_req_s =
params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1;
int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s);
-
bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority());
- channel.reset(_create_load_channel(load_id, load_mem_limit, channel_mem_limit,
- channel_timeout_s, is_high_priority,
- params.sender_ip(), params.is_vectorized()));
+
+ int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1;
+ int64_t channel_mem_limit =
+ calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit());
+ auto channel_mem_tracker =
+ MemTracker::create_tracker(channel_mem_limit,
+ fmt::format("LoadChannel#senderIp={}#loadID={}",
+ params.sender_ip(), load_id.to_string()),
+ _mem_tracker);
+ // TODO
+ // auto channel_mem_tracker_job = std::make_shared<MemTracker>(
+ // -1,
+ // fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(),
+ // load_id.to_string()),
+ // ExecEnv::GetInstance()
+ // ->task_pool_mem_tracker_registry()
+ // ->register_load_mem_tracker(load_id.to_string(), load_mem_limit),
+ // MemTrackerLevel::TASK);
+ channel.reset(new LoadChannel(load_id, channel_mem_tracker, channel_timeout_s,
+ is_high_priority, params.sender_ip(),
+ params.is_vectorized()));
_load_channels.insert({load_id, channel});
}
}
@@ -181,7 +184,6 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
}
Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
UniqueId load_id(params.id());
std::shared_ptr<LoadChannel> cancelled_channel;
{
diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h
index 65d72534f4..39d7ed5b2b 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -61,11 +61,6 @@ public:
std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
private:
- static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t load_mem_limit,
- int64_t channel_mem_limit, int64_t timeout_s,
- bool is_high_priority, const std::string& sender_ip,
- bool is_vec);
-
template <typename Request>
Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof,
const UniqueId& load_id, const Request& request);
@@ -84,7 +79,8 @@ protected:
std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
Cache* _last_success_channel = nullptr;
- // check the total load mem consumption of this Backend
+ // check the total load channel mem consumption of this Backend
+ // TODO no used, refactor soon
std::shared_ptr<MemTracker> _mem_tracker;
CountDownLatch _stop_background_threads_latch;
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 8010956cc4..566d90fb7f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -53,7 +53,6 @@ TabletsChannel::~TabletsChannel() {
}
Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
if (_state == kOpened) {
// Normal case, already open by other sender
@@ -253,7 +252,6 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
}
Status TabletsChannel::cancel() {
- SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
std::lock_guard<std::mutex> l(_lock);
if (_state == kFinished) {
return _close_status;
diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h
index 59208b334a..af60cdc178 100644
--- a/be/src/runtime/tcmalloc_hook.h
+++ b/be/src/runtime/tcmalloc_hook.h
@@ -21,6 +21,7 @@
#include <gperftools/nallocx.h>
#include <gperftools/tcmalloc.h>
+#include "runtime/mem_tracker.h"
#include "runtime/thread_context.h"
// Notice: modify the command in New/Delete Hook should be careful enough!,
@@ -38,12 +39,16 @@
void new_hook(const void* ptr, size_t size) {
if (doris::tls_ctx()) {
doris::tls_ctx()->consume_mem(tc_nallocx(size, 0));
+ } else if (doris::ExecEnv::GetInstance()->initialized()) {
+ doris::MemTracker::get_process_tracker()->consume(tc_nallocx(size, 0));
}
}
void delete_hook(const void* ptr) {
if (doris::tls_ctx()) {
doris::tls_ctx()->release_mem(tc_malloc_size(const_cast<void*>(ptr)));
+ } else if (doris::ExecEnv::GetInstance()->initialized()) {
+ doris::MemTracker::get_process_tracker()->release(tc_malloc_size(const_cast<void*>(ptr)));
}
}
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 7b58315b09..33478b19c5 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -157,12 +157,16 @@ public:
void consume_mem(int64_t size) {
if (start_thread_mem_tracker) {
_thread_mem_tracker_mgr->cache_consume(size);
+ } else {
+ MemTracker::get_process_tracker()->consume(size);
}
}
void release_mem(int64_t size) {
if (start_thread_mem_tracker) {
_thread_mem_tracker_mgr->cache_consume(-size);
+ } else {
+ MemTracker::get_process_tracker()->release(size);
}
}
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index d582c3a46a..d8042ac0fa 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -177,6 +177,7 @@ private:
phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels;
// If true, call memtracker try_consume, otherwise call consume.
bool _check_limit;
+ bool _stop_consume = false;
int64_t _tracker_id;
// Avoid memory allocation in functions.
@@ -256,25 +257,27 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
// When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
// and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(),
// it will cause tracker->consumption to be temporarily less than 0.
- if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
- _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
+ //
+ // Temporary memory may be allocated during the consumption of the mem tracker (in the processing logic of
+ // the exceeded limit), which will lead to entering the TCMalloc Hook again, so suspend consumption to avoid
+ // falling into an infinite loop.
+ if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
+ _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
+ !_stop_consume) {
+ _stop_consume = true;
DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string();
// When switching to the current tracker last time, the remaining untracked memory.
if (_untracked_mems[_tracker_id] != 0) {
_untracked_mem += _untracked_mems[_tracker_id];
_untracked_mems[_tracker_id] = 0;
}
- // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again,
- // will enter infinite recursion. So the temporary memory allocated in mem_tracker.try_consume
- // and mem_limit_exceeded will directly call consume.
if (_check_limit) {
- _check_limit = false;
noncache_try_consume(_untracked_mem);
- _check_limit = true;
} else {
mem_tracker()->consume(_untracked_mem);
}
_untracked_mem = 0;
+ _stop_consume = false;
}
}
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index e85bdfebfc..47e156a0bf 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -206,6 +206,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
const PTabletWriterAddBlockRequest* request,
PTabletWriterAddBlockResult* response,
google::protobuf::Closure* done) {
+ SCOPED_SWITCH_BTHREAD();
// TODO(zxy) delete in 1.2 version
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
@@ -216,6 +217,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
void PInternalServiceImpl::tablet_writer_add_block_by_http(
google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) {
+ SCOPED_SWITCH_BTHREAD();
PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest();
google::protobuf::Closure* done_raw =
new NewHttpClosure<PTabletWriterAddBlockRequest>(request_raw, done);
@@ -243,8 +245,6 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
- SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
- _exec_env->load_channel_mgr()->mem_tracker());
auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
if (!st.ok()) {
@@ -264,12 +264,14 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll
const PTabletWriterAddBatchRequest* request,
PTabletWriterAddBatchResult* response,
google::protobuf::Closure* done) {
+ SCOPED_SWITCH_BTHREAD();
_tablet_writer_add_batch(cntl_base, request, response, done);
}
void PInternalServiceImpl::tablet_writer_add_batch_by_http(
google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) {
+ SCOPED_SWITCH_BTHREAD();
PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest();
google::protobuf::Closure* done_raw =
new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done);
@@ -300,8 +302,6 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
int64_t execution_time_ns = 0;
{
SCOPED_RAW_TIMER(&execution_time_ns);
- SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
- _exec_env->load_channel_mgr()->mem_tracker());
// TODO(zxy) delete in 1.2 version
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl);
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org