You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/20 07:35:34 UTC
[doris] branch branch-1.1-lts updated: [branch-1.1-lts](cherry-pick) Fix bthread local consume mem tracker (#13491)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new b3235ee49f [branch-1.1-lts](cherry-pick) Fix bthread local consume mem tracker (#13491)
b3235ee49f is described below
commit b3235ee49fb65d5488699b1262541a17e735cf20
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Thu Oct 20 15:35:26 2022 +0800
[branch-1.1-lts](cherry-pick) Fix bthread local consume mem tracker (#13491)
* [enhancement](memtracker) Fix bthread local consume mem tracker (#13368)
Previously, bthread_getspecific was called every time bthread local was used. In the test at #10823, it was found that frequent calls to bthread_getspecific had performance problems.
So a cache is implemented on pthread local based on the btls key, but the btls key cannot correctly sense bthread switching.
So, based on bthread_self to get the bthread id to implement the cache.
* fix ut
* fix ut
---
be/src/runtime/exec_env.h | 8 +-
be/src/runtime/exec_env_init.cpp | 1 +
be/src/runtime/memory/mem_tracker_limiter.cpp | 6 +-
be/src/runtime/memory/mem_tracker_limiter.h | 6 ++
be/src/runtime/memory/tcmalloc_hook.h | 21 +----
be/src/runtime/memory/thread_mem_tracker_mgr.h | 71 +++++++++++----
be/src/runtime/thread_context.cpp | 49 +++--------
be/src/runtime/thread_context.h | 115 +++++++++++++++++--------
be/src/service/internal_service.cpp | 25 ------
be/test/testutil/run_all_tests.cpp | 9 ++
10 files changed, 170 insertions(+), 141 deletions(-)
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4016506fe0..03603c4624 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -99,6 +99,7 @@ public:
// declarations for classes in scoped_ptrs.
~ExecEnv();
+ const bool initialized() { return _is_init; }
const std::string& token() const;
ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; }
DataStreamMgr* stream_mgr() { return _stream_mgr; }
@@ -123,16 +124,19 @@ public:
std::shared_ptr<MemTrackerLimiter> new_process_mem_tracker() { return _process_mem_tracker; }
void set_global_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& process_tracker,
- const std::shared_ptr<MemTrackerLimiter>& orphan_tracker) {
+ const std::shared_ptr<MemTrackerLimiter>& orphan_tracker,
+ const std::shared_ptr<MemTrackerLimiter>& bthread_mem_tracker) {
_process_mem_tracker = process_tracker;
_orphan_mem_tracker = orphan_tracker;
_orphan_mem_tracker_raw = orphan_tracker.get();
+ _bthread_mem_tracker = bthread_mem_tracker;
}
std::shared_ptr<NewMemTracker> allocator_cache_mem_tracker() {
return _allocator_cache_mem_tracker;
}
std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; }
MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
+ std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker() { return _bthread_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; }
std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; }
MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; }
@@ -218,6 +222,8 @@ private:
// and the consumption of the orphan mem tracker is close to 0, but greater than 0.
std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
MemTrackerLimiter* _orphan_mem_tracker_raw;
+ // Bthread default mem tracker
+ std::shared_ptr<MemTrackerLimiter> _bthread_mem_tracker;
// The ancestor for all querys tracker.
std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
// The ancestor for all load tracker.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 68518e905b..8cb431ed51 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -207,6 +207,7 @@ Status ExecEnv::_init_mem_tracker() {
std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process");
_orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", _process_mem_tracker);
_orphan_mem_tracker_raw = _orphan_mem_tracker.get();
+ _bthread_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Bthread", _orphan_mem_tracker);
thread_context()->_thread_mem_tracker_mgr->init();
thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
#if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 9b4eedea65..48a2102e22 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -74,7 +74,7 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe
MemTrackerLimiter::~MemTrackerLimiter() {
// TCMalloc hook will be triggered during destructor memtracker, may cause crash.
#ifndef BE_TEST
- if (_label == "Process") doris::thread_context_ptr._init = false;
+ if (_label == "Process") doris::thread_context_ptr.init = false;
#endif
DCHECK(remain_child_count() == 0 || _label == "Process");
// In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
@@ -84,9 +84,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
_consumption->current_value());
}
if (_reset_zero) {
- ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
- _consumption->current_value());
- cache_consume_local(-_consumption->current_value());
+ reset_zero();
_all_ancestors.clear();
_all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw());
}
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 001493bc80..e6126c15a6 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -136,6 +136,12 @@ public:
void enable_print_log_usage() { _print_log_usage = true; }
void enable_reset_zero() { _reset_zero = true; }
+ void reset_zero() {
+ ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
+ _consumption->current_value());
+ cache_consume_local(-_consumption->current_value());
+ }
+
// Logs the usage of this tracker limiter and optionally its children (recursively).
// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
// 'max_recursive_depth' specifies the maximum number of levels of children
diff --git a/be/src/runtime/memory/tcmalloc_hook.h b/be/src/runtime/memory/tcmalloc_hook.h
index 627f42795d..6ec9352ad3 100644
--- a/be/src/runtime/memory/tcmalloc_hook.h
+++ b/be/src/runtime/memory/tcmalloc_hook.h
@@ -36,28 +36,11 @@
// destructor to control the behavior of consume can lead to unexpected behavior,
// like this: if (LIKELY(doris::start_thread_mem_tracker)) {
void new_hook(const void* ptr, size_t size) {
- if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) {
- // Currently in bthread, consume thread context mem tracker in bthread tls.
- doris::update_bthread_context();
- doris::bthread_context->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
- } else if (doris::thread_context_ptr._init) {
- doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
- } else {
- doris::ThreadMemTrackerMgr::consume_no_attach(tc_nallocx(size, 0));
- }
+ MEM_MALLOC_HOOK(tc_nallocx(size, 0));
}
void delete_hook(const void* ptr) {
- if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) {
- doris::update_bthread_context();
- doris::bthread_context->_thread_mem_tracker_mgr->consume(
- -tc_malloc_size(const_cast<void*>(ptr)));
- } else if (doris::thread_context_ptr._init) {
- doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(
- -tc_malloc_size(const_cast<void*>(ptr)));
- } else {
- doris::ThreadMemTrackerMgr::consume_no_attach(-tc_malloc_size(const_cast<void*>(ptr)));
- }
+ MEM_FREE_HOOK(tc_malloc_size(const_cast<void*>(ptr)));
}
void init_hook() {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 5b2c4a5e0c..d0e4e6534a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -44,25 +44,37 @@ public:
ThreadMemTrackerMgr() {}
~ThreadMemTrackerMgr() {
- flush_untracked_mem<false>();
- DCHECK(_consumer_tracker_stack.empty());
- DCHECK(_limiter_tracker_stack.size() == 1);
+ // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
+ if (_init) {
+ flush_untracked_mem<false>();
+ if (bthread_self() == 0) {
+ DCHECK(_consumer_tracker_stack.empty());
+ DCHECK(_limiter_tracker_stack.size() == 1)
+ << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
+ }
+ }
}
// only for tcmalloc hook
static void consume_no_attach(int64_t size) {
- ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
+ if (ExecEnv::GetInstance()->initialized()) {
+ ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
+ }
}
// After thread initialization, calling `init` again must call `clear_untracked_mems` first
// to avoid memory tracking loss.
void init();
+ void init_impl();
+ void clear();
// After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker
void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id,
const std::shared_ptr<MemTrackerLimiter>& mem_tracker);
-
void detach_limiter_tracker();
+ // Usually there are only two layers, the first is the default trackerOrphan;
+ // the second is the query tracker or bthread tracker.
+ int64_t get_attach_layers() { return _limiter_tracker_stack.size(); }
// Must be fast enough! Thread update_tracker may be called very frequently.
// So for performance, add tracker as early as possible, and then call update_tracker<Existed>.
@@ -82,9 +94,13 @@ public:
bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }
std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
+ if (!_init) init();
return _limiter_tracker_stack.back();
}
- MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; }
+ MemTrackerLimiter* limiter_mem_tracker_raw() {
+ if (!_init) init();
+ return _limiter_tracker_raw;
+ }
void set_check_limit(bool check_limit) { _check_limit = check_limit; }
void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -109,6 +125,8 @@ private:
void exceeded(const std::string& failed_msg);
private:
+ // is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized
+ bool _init = false;
// Cache untracked mem, only update to _untracked_mems when switching mem tracker.
// Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance.
int64_t _untracked_mem = 0;
@@ -117,7 +135,7 @@ private:
// _limiter_tracker_stack[0] = orphan_mem_tracker
std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack;
- MemTrackerLimiter* _limiter_tracker_raw;
+ MemTrackerLimiter* _limiter_tracker_raw = nullptr;
std::vector<NewMemTracker*> _consumer_tracker_stack;
// If true, call memtracker try_consume, otherwise call consume.
@@ -131,26 +149,39 @@ private:
};
inline void ThreadMemTrackerMgr::init() {
+ DCHECK(_limiter_tracker_stack.size() == 0);
+ DCHECK(_limiter_tracker_raw == nullptr);
DCHECK(_consumer_tracker_stack.empty());
- // _limiter_tracker_stack[0] = orphan_mem_tracker
- DCHECK(_limiter_tracker_stack.size() <= 1)
- << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
+ init_impl();
+}
+
+inline void ThreadMemTrackerMgr::init_impl() {
#ifdef BE_TEST
if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Process");
- std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
+ std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
- ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
+ std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
+ std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
+ ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
}
#endif // BE_TEST
- if (_limiter_tracker_stack.size() == 0) {
- _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
- _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
- _task_id_stack.push_back("");
- _fragment_instance_id_stack.push_back(TUniqueId());
- }
+ _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
+ _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
+ _task_id_stack.push_back("");
+ _fragment_instance_id_stack.push_back(TUniqueId());
_check_limit = true;
+ _init = true;
+}
+
+inline void ThreadMemTrackerMgr::clear() {
+ flush_untracked_mem<false>();
+ std::vector<std::shared_ptr<MemTrackerLimiter>>().swap(_limiter_tracker_stack);
+ std::vector<NewMemTracker*>().swap(_consumer_tracker_stack);
+ std::vector<std::string>().swap(_task_id_stack);
+ std::vector<TUniqueId>().swap(_fragment_instance_id_stack);
+ init_impl();
}
inline void ThreadMemTrackerMgr::push_consumer_tracker(NewMemTracker* tracker) {
@@ -174,7 +205,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) {
// it will cause tracker->consumption to be temporarily less than 0.
if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
_untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
- !_stop_consume) {
+ !_stop_consume && ExecEnv::GetInstance()->initialized()) {
if (_check_limit) {
flush_untracked_mem<true>();
} else {
@@ -188,6 +219,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
// Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
// the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
_stop_consume = true;
+ if (!_init) init();
+ DCHECK(_limiter_tracker_raw);
old_untracked_mem = _untracked_mem;
DCHECK(_limiter_tracker_raw);
if (CheckLimit) {
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index b993308db8..2b4f9f19e6 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -25,7 +25,7 @@ DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr);
ThreadContextPtr::ThreadContextPtr() {
INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr);
- _init = true;
+ init = true;
}
AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
@@ -39,10 +39,12 @@ AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
#else
if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
- std::make_shared<MemTrackerLimiter>(-1, "Process");
- std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
+ std::make_shared<MemTrackerLimiter>(-1, "Process");
+ std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
- ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
+ std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
+ std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
+ ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
}
thread_context()->attach_task(type, task_id, fragment_instance_id, ExecEnv::GetInstance()->orphan_mem_tracker());
#endif // BE_TEST
@@ -61,10 +63,12 @@ AttachTask::AttachTask(RuntimeState* runtime_state) {
#else
if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
- std::make_shared<MemTrackerLimiter>(-1, "Process");
- std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
+ std::make_shared<MemTrackerLimiter>(-1, "Process");
+ std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
- ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
+ std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
+ std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
+ ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
}
thread_context()->attach_task(ThreadContext::TaskType::QUERY, "", TUniqueId(), ExecEnv::GetInstance()->orphan_mem_tracker());
#endif // BE_TEST
@@ -92,35 +96,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
#endif // USE_MEM_TRACKER
}
-SwitchBthread::SwitchBthread() {
-#ifdef USE_MEM_TRACKER
- _bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
- // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
- if (_bthread_context == nullptr) {
- // Create thread-local data on demand.
- _bthread_context = new ThreadContext;
- // set the data so that next time bthread_getspecific in the thread returns the data.
- CHECK_EQ(0, bthread_setspecific(btls_key, _bthread_context));
- } else {
- DCHECK(_bthread_context->type() == ThreadContext::TaskType::UNKNOWN);
- _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem<false>();
- }
- _bthread_context->_thread_mem_tracker_mgr->init();
- _bthread_context->set_type(ThreadContext::TaskType::BRPC);
- bthread_context_key = btls_key;
- bthread_context = _bthread_context;
-#endif
-}
-
-SwitchBthread::~SwitchBthread() {
-#ifdef USE_MEM_TRACKER
- DCHECK(_bthread_context != nullptr);
- _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem<false>();
- _bthread_context->_thread_mem_tracker_mgr->init();
- _bthread_context->set_type(ThreadContext::TaskType::UNKNOWN);
- bthread_context = nullptr;
- bthread_context_key = EMPTY_BTLS_KEY;
-#endif // USE_MEM_TRACKER
-}
-
} // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 421c59811b..b6143264a8 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -29,15 +29,17 @@
#include "runtime/memory/thread_mem_tracker_mgr.h"
#include "runtime/threadlocal.h"
+#ifdef USE_MEM_TRACKER
// Add thread mem tracker consumer during query execution.
#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker)
-
// Attach to task when thread starts
#define SCOPED_ATTACH_TASK(arg1, ...) \
auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__)
-
-#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread()
+#else
+#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0
+#define SCOPED_ATTACH_TASK(arg1, ...) (void)0
+#endif
namespace doris {
@@ -75,7 +77,7 @@ public:
// Cannot add destructor `~ThreadContextPtr`, otherwise it will no longer be of type POD, the reason is as above.
// TCMalloc hook is triggered during ThreadContext construction, which may lead to deadlock.
- bool _init = false;
+ bool init = false;
DECLARE_STATIC_THREAD_LOCAL(ThreadContext, _ptr);
};
@@ -85,7 +87,7 @@ inline thread_local ThreadContextPtr thread_context_ptr;
// To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS
// in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS.
inline thread_local ThreadContext* bthread_context;
-inline thread_local bthread_key_t bthread_context_key;
+inline thread_local bthread_t bthread_id;
// The thread context saves some info about a working thread.
// 2 required info:
@@ -116,18 +118,18 @@ public:
}
~ThreadContext() {
- // Restore to the memory state before _init=true to ensure accurate overall memory statistics.
+ // Restore to the memory state before init=true to ensure accurate overall memory statistics.
// Thereby ensuring that the memory alloc size is not tracked during the initialization of the
- // ThreadContext before `_init = true in ThreadContextPtr()`,
+ // ThreadContext before `init = true in ThreadContextPtr()`,
// Equal to the size of the memory release that is not tracked during the destruction of the
- // ThreadContext after `_init = false in ~ThreadContextPtr()`,
- init();
- thread_context_ptr._init = false;
+ // ThreadContext after `init = false in ~ThreadContextPtr()`,
+ if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->clear();
+ thread_context_ptr.init = false;
}
void init() {
_type = TaskType::UNKNOWN;
- _thread_mem_tracker_mgr->init();
+ if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->init();
_thread_id = get_thread_id();
}
@@ -174,17 +176,42 @@ private:
TUniqueId _fragment_instance_id;
};
-static void update_bthread_context() {
- if (btls_key != bthread_context_key) {
- // pthread switch occurs, updating bthread_context and bthread_context_key cached in pthread tls.
- bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
- bthread_context_key = btls_key;
+static void attach_bthread() {
+ bthread_id = bthread_self();
+ bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+ if (bthread_context == nullptr) {
+ // A new bthread starts, two scenarios:
+ // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
+ // 2. There are not enough reusable btls in btls pool.
+#ifndef BE_TEST
+ DCHECK(ExecEnv::GetInstance()->initialized());
+#endif
+ // Create thread-local data on demand.
+ bthread_context = new ThreadContext;
+ std::shared_ptr<MemTrackerLimiter> btls_tracker =
+ std::make_shared<MemTrackerLimiter>(-1, "Bthread:id=" + std::to_string(bthread_id),
+ ExecEnv::GetInstance()->bthread_mem_tracker());
+ bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker);
+ // set the data so that next time bthread_getspecific in the thread returns the data.
+ CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
+ } else {
+ // two scenarios:
+ // 1. A new bthread starts, but get a reuses btls.
+ // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
+ // So tracker call reset 0 like reuses btls.
+ DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2);
+ bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero();
}
}
static ThreadContext* thread_context() {
- if (btls_key != EMPTY_BTLS_KEY && bthread_context != nullptr) {
- update_bthread_context();
+ if (bthread_self() != 0) {
+ if (bthread_self() != bthread_id) {
+ // A new bthread starts or pthread switch occurs.
+ thread_context_ptr.init = false;
+ attach_bthread();
+ thread_context_ptr.init = true;
+ }
return bthread_context;
} else {
return thread_context_ptr._ptr;
@@ -222,18 +249,6 @@ public:
~AddThreadMemTrackerConsumer();
};
-class SwitchBthread {
-public:
- explicit SwitchBthread();
-
- ~SwitchBthread();
-
-private:
-#ifdef USE_MEM_TRACKER
- ThreadContext* _bthread_context;
-#endif
-};
-
class StopCheckThreadMemTrackerLimit {
public:
explicit StopCheckThreadMemTrackerLimit() {
@@ -246,6 +261,7 @@ public:
};
// The following macros are used to fix the tracking accuracy of caches etc.
+#ifdef USE_MEM_TRACKER
#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \
auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit()
#define CONSUME_THREAD_MEM_TRACKER(size) \
@@ -258,9 +274,38 @@ public:
#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
tracker->transfer_to( \
size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw())
-#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
- return doris::thread_context() \
- ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() \
- ->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "", msg), \
- ##__VA_ARGS__);
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...) \
+ return doris::thread_context() \
+ ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw() \
+ ->mem_limit_exceeded( \
+ state, \
+ fmt::format("exec node:<{}>, {}", "", msg), \
+ ##__VA_ARGS__);
+// Mem Hook to consume thread mem tracker
+#define MEM_MALLOC_HOOK(size) \
+ do { \
+ if (doris::thread_context_ptr.init) { \
+ doris::thread_context()->_thread_mem_tracker_mgr->consume(size); \
+ } else { \
+ doris::ThreadMemTrackerMgr::consume_no_attach(size); \
+ } \
+ } while (0)
+#define MEM_FREE_HOOK(size) \
+ do { \
+ if (doris::thread_context_ptr.init) { \
+ doris::thread_context()->_thread_mem_tracker_mgr->consume(-size); \
+ } else { \
+ doris::ThreadMemTrackerMgr::consume_no_attach(-size); \
+ } \
+ } while (0)
+#else
+#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() (void)0
+#define CONSUME_THREAD_MEM_TRACKER(size) (void)0
+#define RELEASE_THREAD_MEM_TRACKER(size) (void)0
+#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0
+#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...) (void)0
+#define MEM_MALLOC_HOOK(size) (void)0
+#define MEM_FREE_HOOK(size) (void)0
+#endif
} // namespace doris
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 2b9ee2fb3f..b7bc9a179b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -91,7 +91,6 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
<< " node=" << request->node_id();
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -105,7 +104,6 @@ void PInternalServiceImpl<T>::transmit_data_by_http(google::protobuf::RpcControl
const PEmptyRequest* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
PTransmitDataParams* request_raw = new PTransmitDataParams();
google::protobuf::Closure* done_raw =
new NewHttpClosure<PTransmitDataParams>(request_raw, done);
@@ -163,7 +161,6 @@ void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController
const PTabletWriterOpenRequest* request,
PTabletWriterOpenResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id()
<< ", txn_id=" << request->txn_id();
brpc::ClosureGuard closure_guard(done);
@@ -181,7 +178,6 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
const PExecPlanFragmentRequest* request,
PExecPlanFragmentResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
auto st = Status::OK();
bool compact = request->has_compact() ? request->compact() : false;
@@ -207,7 +203,6 @@ void PInternalServiceImpl<T>::exec_plan_fragment_start(google::protobuf::RpcCont
const PExecPlanFragmentStartRequest* request,
PExecPlanFragmentResult* result,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
auto st = _exec_env->fragment_mgr()->start_query_execution(request);
st.to_protobuf(result->mutable_status());
@@ -218,7 +213,6 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
const PTabletWriterAddBatchRequest* request,
PTabletWriterAddBatchResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
_tablet_writer_add_batch(cntl_base, request, response, done);
}
@@ -226,7 +220,6 @@ template <typename T>
void PInternalServiceImpl<T>::tablet_writer_add_batch_by_http(
google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest();
google::protobuf::Closure* done_raw =
new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done);
@@ -281,7 +274,6 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
const PTabletWriterCancelRequest* request,
PTabletWriterCancelResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id()
<< ", sender_id=" << request->sender_id();
brpc::ClosureGuard closure_guard(done);
@@ -327,7 +319,6 @@ void PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll
const PCancelPlanFragmentRequest* request,
PCancelPlanFragmentResult* result,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
TUniqueId tid;
tid.__set_hi(request->finst_id().hi());
@@ -352,7 +343,6 @@ template <typename T>
void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_base,
const PFetchDataRequest* request, PFetchDataResult* result,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
_exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
@@ -362,7 +352,6 @@ template <typename T>
void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controller,
const PProxyRequest* request, PProxyResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
// PProxyRequest is defined in gensrc/proto/internal_service.proto
// Currently it supports 2 kinds of requests:
@@ -425,7 +414,6 @@ void PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController* cont
const PUpdateCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->update(request, response);
}
@@ -435,7 +423,6 @@ void PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController* contr
const PFetchCacheRequest* request,
PFetchCacheResult* result,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->fetch(request, result);
}
@@ -445,7 +432,6 @@ void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* contr
const PClearCacheRequest* request,
PCacheResponse* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
_exec_env->result_cache()->clear(request, response);
}
@@ -455,7 +441,6 @@ void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* co
const ::doris::PMergeFilterRequest* request,
::doris::PMergeFilterResponse* response,
::google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
auto buf = static_cast<brpc::Controller*>(controller)->request_attachment();
Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data());
@@ -470,7 +455,6 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
const ::doris::PPublishFilterRequest* request,
::doris::PPublishFilterResponse* response,
::google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
UniqueId unique_id(request->query_id());
@@ -487,7 +471,6 @@ template <typename T>
void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* controller,
const PSendDataRequest* request, PSendDataResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
TUniqueId fragment_instance_id;
fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -511,7 +494,6 @@ template <typename T>
void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller,
const PCommitRequest* request, PCommitResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
TUniqueId fragment_instance_id;
fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -530,7 +512,6 @@ template <typename T>
void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controller,
const PRollbackRequest* request, PRollbackResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
TUniqueId fragment_instance_id;
fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -550,7 +531,6 @@ void PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController
const PConstantExprRequest* request,
PConstantExprResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -587,7 +567,6 @@ void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn
const PTransmitDataParams* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
// TODO(zxy) delete in 1.2 version
brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
@@ -600,7 +579,6 @@ void PInternalServiceImpl<T>::transmit_block_by_http(google::protobuf::RpcContro
const PEmptyRequest* request,
PTransmitDataResult* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
PTransmitDataParams* request_raw = new PTransmitDataParams();
google::protobuf::Closure* done_raw =
new NewHttpClosure<PTransmitDataParams>(request_raw, done);
@@ -658,7 +636,6 @@ void PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController*
const PCheckRPCChannelRequest* request,
PCheckRPCChannelResponse* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(0);
if (request->data().size() != request->size()) {
@@ -686,7 +663,6 @@ void PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController*
const PResetRPCChannelRequest* request,
PResetRPCChannelResponse* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
response->mutable_status()->set_status_code(0);
if (request->all()) {
@@ -721,7 +697,6 @@ void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_b
const PHandShakeRequest* request,
PHandShakeResponse* response,
google::protobuf::Closure* done) {
- SCOPED_SWITCH_BTHREAD_TLS();
brpc::ClosureGuard closure_guard(done);
if (request->has_hello()) {
response->set_hello(request->hello());
diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp
index 842282e8c6..aa2d0a74cf 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -27,6 +27,15 @@
#include "util/mem_info.h"
int main(int argc, char** argv) {
+ std::shared_ptr<doris::MemTrackerLimiter> process_mem_tracker =
+ std::make_shared<doris::MemTrackerLimiter>(-1, "Process");
+ std::shared_ptr<doris::MemTrackerLimiter> orphan_mem_tracker =
+ std::make_shared<doris::MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
+ std::shared_ptr<doris::MemTrackerLimiter> bthread_mem_tracker =
+ std::make_shared<doris::MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
+ doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker,
+ bthread_mem_tracker);
+ doris::thread_context()->_thread_mem_tracker_mgr->init();
doris::StoragePageCache::create_global_cache(1 << 30, 10);
doris::SegmentLoader::create_global_instance(1000);
std::string conf = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org