You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/10/20 07:35:34 UTC

[doris] branch branch-1.1-lts updated: [branch-1.1-lts](cherry-pick) Fix bthread local consume mem tracker (#13491)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new b3235ee49f [branch-1.1-lts](cherry-pick) Fix bthread local consume mem tracker (#13491)
b3235ee49f is described below

commit b3235ee49fb65d5488699b1262541a17e735cf20
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Thu Oct 20 15:35:26 2022 +0800

    [branch-1.1-lts](cherry-pick) Fix bthread local consume mem tracker (#13491)
    
    * [enhancement](memtracker) Fix bthread local consume mem tracker (#13368)
    
    Previously, bthread_getspecific was called every time bthread local was used. In the test at #10823, it was found that frequent calls to bthread_getspecific had performance problems.
    
    So a cache is implemented on pthread local based on the btls key, but the btls key cannot correctly sense bthread switching.
    
    So, based on bthread_self to get the bthread id to implement the cache.
    
    * fix ut
    
    * fix ut
---
 be/src/runtime/exec_env.h                      |   8 +-
 be/src/runtime/exec_env_init.cpp               |   1 +
 be/src/runtime/memory/mem_tracker_limiter.cpp  |   6 +-
 be/src/runtime/memory/mem_tracker_limiter.h    |   6 ++
 be/src/runtime/memory/tcmalloc_hook.h          |  21 +----
 be/src/runtime/memory/thread_mem_tracker_mgr.h |  71 +++++++++++----
 be/src/runtime/thread_context.cpp              |  49 +++--------
 be/src/runtime/thread_context.h                | 115 +++++++++++++++++--------
 be/src/service/internal_service.cpp            |  25 ------
 be/test/testutil/run_all_tests.cpp             |   9 ++
 10 files changed, 170 insertions(+), 141 deletions(-)

diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index 4016506fe0..03603c4624 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -99,6 +99,7 @@ public:
     // declarations for classes in scoped_ptrs.
     ~ExecEnv();
 
+    const bool initialized() { return _is_init; }
     const std::string& token() const;
     ExternalScanContextMgr* external_scan_context_mgr() { return _external_scan_context_mgr; }
     DataStreamMgr* stream_mgr() { return _stream_mgr; }
@@ -123,16 +124,19 @@ public:
 
     std::shared_ptr<MemTrackerLimiter> new_process_mem_tracker() { return _process_mem_tracker; }
     void set_global_mem_tracker(const std::shared_ptr<MemTrackerLimiter>& process_tracker,
-                                const std::shared_ptr<MemTrackerLimiter>& orphan_tracker) {
+                                const std::shared_ptr<MemTrackerLimiter>& orphan_tracker,
+                                const std::shared_ptr<MemTrackerLimiter>& bthread_mem_tracker) {
         _process_mem_tracker = process_tracker;
         _orphan_mem_tracker = orphan_tracker;
         _orphan_mem_tracker_raw = orphan_tracker.get();
+        _bthread_mem_tracker = bthread_mem_tracker;
     }
     std::shared_ptr<NewMemTracker> allocator_cache_mem_tracker() {
         return _allocator_cache_mem_tracker;
     }
     std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker() { return _orphan_mem_tracker; }
     MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
+    std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker() { return _bthread_mem_tracker; }
     std::shared_ptr<MemTrackerLimiter> query_pool_mem_tracker() { return _query_pool_mem_tracker; }
     std::shared_ptr<MemTrackerLimiter> load_pool_mem_tracker() { return _load_pool_mem_tracker; }
     MemTrackerTaskPool* task_pool_mem_tracker_registry() { return _task_pool_mem_tracker_registry; }
@@ -218,6 +222,8 @@ private:
     // and the consumption of the orphan mem tracker is close to 0, but greater than 0.
     std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker;
     MemTrackerLimiter* _orphan_mem_tracker_raw;
+    // Bthread default mem tracker
+    std::shared_ptr<MemTrackerLimiter> _bthread_mem_tracker;
     // The ancestor for all querys tracker.
     std::shared_ptr<MemTrackerLimiter> _query_pool_mem_tracker;
     // The ancestor for all load tracker.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 68518e905b..8cb431ed51 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -207,6 +207,7 @@ Status ExecEnv::_init_mem_tracker() {
             std::make_shared<MemTrackerLimiter>(global_memory_limit_bytes, "Process");
     _orphan_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Orphan", _process_mem_tracker);
     _orphan_mem_tracker_raw = _orphan_mem_tracker.get();
+    _bthread_mem_tracker = std::make_shared<MemTrackerLimiter>(-1, "Bthread", _orphan_mem_tracker);
     thread_context()->_thread_mem_tracker_mgr->init();
     thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
 #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 9b4eedea65..48a2102e22 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -74,7 +74,7 @@ MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& labe
 MemTrackerLimiter::~MemTrackerLimiter() {
     // TCMalloc hook will be triggered during destructor memtracker, may cause crash.
 #ifndef BE_TEST
-    if (_label == "Process") doris::thread_context_ptr._init = false;
+    if (_label == "Process") doris::thread_context_ptr.init = false;
 #endif
     DCHECK(remain_child_count() == 0 || _label == "Process");
     // In order to ensure `consumption of all limiter trackers` + `orphan tracker consumption` = `process tracker consumption`
@@ -84,9 +84,7 @@ MemTrackerLimiter::~MemTrackerLimiter() {
                 _consumption->current_value());
     }
     if (_reset_zero) {
-        ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
-                _consumption->current_value());
-        cache_consume_local(-_consumption->current_value());
+        reset_zero();
         _all_ancestors.clear();
         _all_ancestors.push_back(ExecEnv::GetInstance()->orphan_mem_tracker_raw());
     }
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 001493bc80..e6126c15a6 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -136,6 +136,12 @@ public:
     void enable_print_log_usage() { _print_log_usage = true; }
     void enable_reset_zero() { _reset_zero = true; }
 
+    void reset_zero() {
+        ExecEnv::GetInstance()->orphan_mem_tracker_raw()->cache_consume_local(
+                _consumption->current_value());
+        cache_consume_local(-_consumption->current_value());
+    }
+
     // Logs the usage of this tracker limiter and optionally its children (recursively).
     // If 'logged_consumption' is non-nullptr, sets the consumption value logged.
     // 'max_recursive_depth' specifies the maximum number of levels of children
diff --git a/be/src/runtime/memory/tcmalloc_hook.h b/be/src/runtime/memory/tcmalloc_hook.h
index 627f42795d..6ec9352ad3 100644
--- a/be/src/runtime/memory/tcmalloc_hook.h
+++ b/be/src/runtime/memory/tcmalloc_hook.h
@@ -36,28 +36,11 @@
 //  destructor to control the behavior of consume can lead to unexpected behavior,
 //  like this: if (LIKELY(doris::start_thread_mem_tracker)) {
 void new_hook(const void* ptr, size_t size) {
-    if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) {
-        // Currently in bthread, consume thread context mem tracker in bthread tls.
-        doris::update_bthread_context();
-        doris::bthread_context->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
-    } else if (doris::thread_context_ptr._init) {
-        doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(tc_nallocx(size, 0));
-    } else {
-        doris::ThreadMemTrackerMgr::consume_no_attach(tc_nallocx(size, 0));
-    }
+    MEM_MALLOC_HOOK(tc_nallocx(size, 0));
 }
 
 void delete_hook(const void* ptr) {
-    if (doris::btls_key != doris::EMPTY_BTLS_KEY && doris::bthread_context != nullptr) {
-        doris::update_bthread_context();
-        doris::bthread_context->_thread_mem_tracker_mgr->consume(
-                -tc_malloc_size(const_cast<void*>(ptr)));
-    } else if (doris::thread_context_ptr._init) {
-        doris::thread_context_ptr._ptr->_thread_mem_tracker_mgr->consume(
-                -tc_malloc_size(const_cast<void*>(ptr)));
-    } else {
-        doris::ThreadMemTrackerMgr::consume_no_attach(-tc_malloc_size(const_cast<void*>(ptr)));
-    }
+    MEM_FREE_HOOK(tc_malloc_size(const_cast<void*>(ptr)));
 }
 
 void init_hook() {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 5b2c4a5e0c..d0e4e6534a 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -44,25 +44,37 @@ public:
     ThreadMemTrackerMgr() {}
 
     ~ThreadMemTrackerMgr() {
-        flush_untracked_mem<false>();
-        DCHECK(_consumer_tracker_stack.empty());
-        DCHECK(_limiter_tracker_stack.size() == 1);
+        // if _init == false, exec env is not initialized when init(). and never consumed mem tracker once.
+        if (_init) {
+            flush_untracked_mem<false>();
+            if (bthread_self() == 0) {
+                DCHECK(_consumer_tracker_stack.empty());
+                DCHECK(_limiter_tracker_stack.size() == 1)
+                        << ", limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
+            }
+        }
     }
 
     // only for tcmalloc hook
     static void consume_no_attach(int64_t size) {
-        ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
+        if (ExecEnv::GetInstance()->initialized()) {
+            ExecEnv::GetInstance()->orphan_mem_tracker_raw()->consume(size);
+        }
     }
 
     // After thread initialization, calling `init` again must call `clear_untracked_mems` first
     // to avoid memory tracking loss.
     void init();
+    void init_impl();
+    void clear();
 
     // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker
     void attach_limiter_tracker(const std::string& task_id, const TUniqueId& fragment_instance_id,
                                 const std::shared_ptr<MemTrackerLimiter>& mem_tracker);
-
     void detach_limiter_tracker();
+    // Usually there are only two layers, the first is the default trackerOrphan;
+    // the second is the query tracker or bthread tracker.
+    int64_t get_attach_layers() { return _limiter_tracker_stack.size(); }
 
     // Must be fast enough! Thread update_tracker may be called very frequently.
     // So for performance, add tracker as early as possible, and then call update_tracker<Existed>.
@@ -82,9 +94,13 @@ public:
     bool is_attach_query() { return _fragment_instance_id_stack.back() != TUniqueId(); }
 
     std::shared_ptr<MemTrackerLimiter> limiter_mem_tracker() {
+        if (!_init) init();
         return _limiter_tracker_stack.back();
     }
-    MemTrackerLimiter* limiter_mem_tracker_raw() { return _limiter_tracker_raw; }
+    MemTrackerLimiter* limiter_mem_tracker_raw() {
+        if (!_init) init();
+        return _limiter_tracker_raw;
+    }
 
     void set_check_limit(bool check_limit) { _check_limit = check_limit; }
     void set_check_attach(bool check_attach) { _check_attach = check_attach; }
@@ -109,6 +125,8 @@ private:
     void exceeded(const std::string& failed_msg);
 
 private:
+    // is false: ExecEnv::GetInstance()->initialized() = false when thread local is initialized
+    bool _init = false;
     // Cache untracked mem, only update to _untracked_mems when switching mem tracker.
     // Frequent calls to unordered_map _untracked_mems[] in consume will degrade performance.
     int64_t _untracked_mem = 0;
@@ -117,7 +135,7 @@ private:
 
     // _limiter_tracker_stack[0] = orphan_mem_tracker
     std::vector<std::shared_ptr<MemTrackerLimiter>> _limiter_tracker_stack;
-    MemTrackerLimiter* _limiter_tracker_raw;
+    MemTrackerLimiter* _limiter_tracker_raw = nullptr;
     std::vector<NewMemTracker*> _consumer_tracker_stack;
 
     // If true, call memtracker try_consume, otherwise call consume.
@@ -131,26 +149,39 @@ private:
 };
 
 inline void ThreadMemTrackerMgr::init() {
+    DCHECK(_limiter_tracker_stack.size() == 0);
+    DCHECK(_limiter_tracker_raw == nullptr);
     DCHECK(_consumer_tracker_stack.empty());
-    // _limiter_tracker_stack[0] = orphan_mem_tracker
-    DCHECK(_limiter_tracker_stack.size() <= 1)
-            << "limiter_tracker_stack.size(): " << _limiter_tracker_stack.size();
+    init_impl();
+}
+
+inline void ThreadMemTrackerMgr::init_impl() {
 #ifdef BE_TEST
     if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
         std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
                 std::make_shared<MemTrackerLimiter>(-1, "Process");
-        std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
+        std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
                 std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
-        ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
+        std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
+                std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
+        ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
     }
 #endif // BE_TEST
-    if (_limiter_tracker_stack.size() == 0) {
-        _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
-        _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
-        _task_id_stack.push_back("");
-        _fragment_instance_id_stack.push_back(TUniqueId());
-    }
+    _limiter_tracker_stack.push_back(ExecEnv::GetInstance()->orphan_mem_tracker());
+    _limiter_tracker_raw = ExecEnv::GetInstance()->orphan_mem_tracker_raw();
+    _task_id_stack.push_back("");
+    _fragment_instance_id_stack.push_back(TUniqueId());
     _check_limit = true;
+    _init = true;
+}
+
+inline void ThreadMemTrackerMgr::clear() {
+    flush_untracked_mem<false>();
+    std::vector<std::shared_ptr<MemTrackerLimiter>>().swap(_limiter_tracker_stack);
+    std::vector<NewMemTracker*>().swap(_consumer_tracker_stack);
+    std::vector<std::string>().swap(_task_id_stack);
+    std::vector<TUniqueId>().swap(_fragment_instance_id_stack);
+    init_impl();
 }
 
 inline void ThreadMemTrackerMgr::push_consumer_tracker(NewMemTracker* tracker) {
@@ -174,7 +205,7 @@ inline void ThreadMemTrackerMgr::consume(int64_t size) {
     // it will cause tracker->consumption to be temporarily less than 0.
     if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
          _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
-        !_stop_consume) {
+        !_stop_consume && ExecEnv::GetInstance()->initialized()) {
         if (_check_limit) {
             flush_untracked_mem<true>();
         } else {
@@ -188,6 +219,8 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
     // Temporary memory may be allocated during the consumption of the mem tracker, which will lead to entering
     // the TCMalloc Hook again, so suspend consumption to avoid falling into an infinite loop.
     _stop_consume = true;
+    if (!_init) init();
+    DCHECK(_limiter_tracker_raw);
     old_untracked_mem = _untracked_mem;
     DCHECK(_limiter_tracker_raw);
     if (CheckLimit) {
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index b993308db8..2b4f9f19e6 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -25,7 +25,7 @@ DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, _ptr);
 
 ThreadContextPtr::ThreadContextPtr() {
     INIT_STATIC_THREAD_LOCAL(ThreadContext, _ptr);
-    _init = true;
+    init = true;
 }
 
 AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
@@ -39,10 +39,12 @@ AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
 #else
     if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
         std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
-            std::make_shared<MemTrackerLimiter>(-1, "Process");
-        std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
+                std::make_shared<MemTrackerLimiter>(-1, "Process");
+        std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
                 std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
-        ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
+        std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
+                std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
+        ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
     }
     thread_context()->attach_task(type, task_id, fragment_instance_id, ExecEnv::GetInstance()->orphan_mem_tracker());
 #endif // BE_TEST
@@ -61,10 +63,12 @@ AttachTask::AttachTask(RuntimeState* runtime_state) {
 #else
     if (ExecEnv::GetInstance()->new_process_mem_tracker() == nullptr) {
         std::shared_ptr<MemTrackerLimiter> process_mem_tracker =
-            std::make_shared<MemTrackerLimiter>(-1, "Process");
-        std::shared_ptr<MemTrackerLimiter> _orphan_mem_tracker =
+                std::make_shared<MemTrackerLimiter>(-1, "Process");
+        std::shared_ptr<MemTrackerLimiter> orphan_mem_tracker =
                 std::make_shared<MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
-        ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, _orphan_mem_tracker);
+        std::shared_ptr<MemTrackerLimiter> bthread_mem_tracker =
+                std::make_shared<MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
+        ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker, bthread_mem_tracker);
     }
     thread_context()->attach_task(ThreadContext::TaskType::QUERY, "", TUniqueId(), ExecEnv::GetInstance()->orphan_mem_tracker());
 #endif // BE_TEST
@@ -92,35 +96,4 @@ AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
 #endif // USE_MEM_TRACKER
 }
 
-SwitchBthread::SwitchBthread() {
-#ifdef USE_MEM_TRACKER
-    _bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-    // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
-    if (_bthread_context == nullptr) {
-        // Create thread-local data on demand.
-        _bthread_context = new ThreadContext;
-        // set the data so that next time bthread_getspecific in the thread returns the data.
-        CHECK_EQ(0, bthread_setspecific(btls_key, _bthread_context));
-    } else {
-        DCHECK(_bthread_context->type() == ThreadContext::TaskType::UNKNOWN);
-        _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem<false>();
-    }
-    _bthread_context->_thread_mem_tracker_mgr->init();
-    _bthread_context->set_type(ThreadContext::TaskType::BRPC);
-    bthread_context_key = btls_key;
-    bthread_context = _bthread_context;
-#endif
-}
-
-SwitchBthread::~SwitchBthread() {
-#ifdef USE_MEM_TRACKER
-    DCHECK(_bthread_context != nullptr);
-    _bthread_context->_thread_mem_tracker_mgr->flush_untracked_mem<false>();
-    _bthread_context->_thread_mem_tracker_mgr->init();
-    _bthread_context->set_type(ThreadContext::TaskType::UNKNOWN);
-    bthread_context = nullptr;
-    bthread_context_key = EMPTY_BTLS_KEY;
-#endif // USE_MEM_TRACKER
-}
-
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 421c59811b..b6143264a8 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -29,15 +29,17 @@
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 #include "runtime/threadlocal.h"
 
+#ifdef USE_MEM_TRACKER
 // Add thread mem tracker consumer during query execution.
 #define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) \
     auto VARNAME_LINENUM(add_mem_consumer) = doris::AddThreadMemTrackerConsumer(mem_tracker)
-
 // Attach to task when thread starts
 #define SCOPED_ATTACH_TASK(arg1, ...) \
     auto VARNAME_LINENUM(attach_task) = AttachTask(arg1, ##__VA_ARGS__)
-
-#define SCOPED_SWITCH_BTHREAD_TLS() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread()
+#else
+#define SCOPED_CONSUME_MEM_TRACKER(mem_tracker) (void)0
+#define SCOPED_ATTACH_TASK(arg1, ...) (void)0
+#endif
 
 namespace doris {
 
@@ -75,7 +77,7 @@ public:
     // Cannot add destructor `~ThreadContextPtr`, otherwise it will no longer be of type POD, the reason is as above.
 
     // TCMalloc hook is triggered during ThreadContext construction, which may lead to deadlock.
-    bool _init = false;
+    bool init = false;
 
     DECLARE_STATIC_THREAD_LOCAL(ThreadContext, _ptr);
 };
@@ -85,7 +87,7 @@ inline thread_local ThreadContextPtr thread_context_ptr;
 // To avoid performance problems caused by frequently calling `bthread_getspecific` to obtain bthread TLS
 // in tcmalloc hook, cache the key and value of bthread TLS in pthread TLS.
 inline thread_local ThreadContext* bthread_context;
-inline thread_local bthread_key_t bthread_context_key;
+inline thread_local bthread_t bthread_id;
 
 // The thread context saves some info about a working thread.
 // 2 required info:
@@ -116,18 +118,18 @@ public:
     }
 
     ~ThreadContext() {
-        // Restore to the memory state before _init=true to ensure accurate overall memory statistics.
+        // Restore to the memory state before init=true to ensure accurate overall memory statistics.
         // Thereby ensuring that the memory alloc size is not tracked during the initialization of the
-        // ThreadContext before `_init = true in ThreadContextPtr()`,
+        // ThreadContext before `init = true in ThreadContextPtr()`,
         // Equal to the size of the memory release that is not tracked during the destruction of the
-        // ThreadContext after `_init = false in ~ThreadContextPtr()`,
-        init();
-        thread_context_ptr._init = false;
+        // ThreadContext after `init = false in ~ThreadContextPtr()`,
+        if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->clear();
+        thread_context_ptr.init = false;
     }
 
     void init() {
         _type = TaskType::UNKNOWN;
-        _thread_mem_tracker_mgr->init();
+        if (ExecEnv::GetInstance()->initialized()) _thread_mem_tracker_mgr->init();
         _thread_id = get_thread_id();
     }
 
@@ -174,17 +176,42 @@ private:
     TUniqueId _fragment_instance_id;
 };
 
-static void update_bthread_context() {
-    if (btls_key != bthread_context_key) {
-        // pthread switch occurs, updating bthread_context and bthread_context_key cached in pthread tls.
-        bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-        bthread_context_key = btls_key;
+static void attach_bthread() {
+    bthread_id = bthread_self();
+    bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+    if (bthread_context == nullptr) {
+        // A new bthread starts, two scenarios:
+        // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
+        // 2. There are not enough reusable btls in btls pool.
+#ifndef BE_TEST
+        DCHECK(ExecEnv::GetInstance()->initialized());
+#endif
+        // Create thread-local data on demand.
+        bthread_context = new ThreadContext;
+        std::shared_ptr<MemTrackerLimiter> btls_tracker =
+                std::make_shared<MemTrackerLimiter>(-1, "Bthread:id=" + std::to_string(bthread_id),
+                                                    ExecEnv::GetInstance()->bthread_mem_tracker());
+        bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker);
+        // set the data so that next time bthread_getspecific in the thread returns the data.
+        CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
+    } else {
+        // two scenarios:
+        // 1. A new bthread starts, but get a reuses btls.
+        // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
+        // So tracker call reset 0 like reuses btls.
+        DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2);
+        bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero();
     }
 }
 
 static ThreadContext* thread_context() {
-    if (btls_key != EMPTY_BTLS_KEY && bthread_context != nullptr) {
-        update_bthread_context();
+    if (bthread_self() != 0) {
+        if (bthread_self() != bthread_id) {
+            // A new bthread starts or pthread switch occurs.
+            thread_context_ptr.init = false;
+            attach_bthread();
+            thread_context_ptr.init = true;
+        }
         return bthread_context;
     } else {
         return thread_context_ptr._ptr;
@@ -222,18 +249,6 @@ public:
     ~AddThreadMemTrackerConsumer();
 };
 
-class SwitchBthread {
-public:
-    explicit SwitchBthread();
-
-    ~SwitchBthread();
-
-private:
-#ifdef USE_MEM_TRACKER
-    ThreadContext* _bthread_context;
-#endif
-};
-
 class StopCheckThreadMemTrackerLimit {
 public:
     explicit StopCheckThreadMemTrackerLimit() {
@@ -246,6 +261,7 @@ public:
 };
 
 // The following macros are used to fix the tracking accuracy of caches etc.
+#ifdef USE_MEM_TRACKER
 #define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() \
     auto VARNAME_LINENUM(stop_check_limit) = StopCheckThreadMemTrackerLimit()
 #define CONSUME_THREAD_MEM_TRACKER(size) \
@@ -258,9 +274,38 @@ public:
 #define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) \
     tracker->transfer_to(                               \
             size, doris::thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker_raw())
-#define RETURN_LIMIT_EXCEEDED(state, msg, ...)                                      \
-    return doris::thread_context()                                                  \
-            ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()                    \
-            ->mem_limit_exceeded(state, fmt::format("exec node:<{}>, {}", "", msg), \
-                                 ##__VA_ARGS__);
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...)                                              \
+    return doris::thread_context()                                                          \
+            ->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()                            \
+            ->mem_limit_exceeded(                                                           \
+                    state,                                                                  \
+                    fmt::format("exec node:<{}>, {}", "", msg),                             \
+                    ##__VA_ARGS__);
+// Mem Hook to consume thread mem tracker
+#define MEM_MALLOC_HOOK(size)                                                \
+    do {                                                                     \
+        if (doris::thread_context_ptr.init) {                                \
+            doris::thread_context()->_thread_mem_tracker_mgr->consume(size); \
+        } else {                                                             \
+            doris::ThreadMemTrackerMgr::consume_no_attach(size);             \
+        }                                                                    \
+    } while (0)
+#define MEM_FREE_HOOK(size)                                                   \
+    do {                                                                      \
+        if (doris::thread_context_ptr.init) {                                 \
+            doris::thread_context()->_thread_mem_tracker_mgr->consume(-size); \
+        } else {                                                              \
+            doris::ThreadMemTrackerMgr::consume_no_attach(-size);             \
+        }                                                                     \
+    } while (0)
+#else
+#define STOP_CHECK_THREAD_MEM_TRACKER_LIMIT() (void)0
+#define CONSUME_THREAD_MEM_TRACKER(size) (void)0
+#define RELEASE_THREAD_MEM_TRACKER(size) (void)0
+#define THREAD_MEM_TRACKER_TRANSFER_TO(size, tracker) (void)0
+#define THREAD_MEM_TRACKER_TRANSFER_FROM(size, tracker) (void)0
+#define RETURN_LIMIT_EXCEEDED(state, msg, ...) (void)0
+#define MEM_MALLOC_HOOK(size) (void)0
+#define MEM_FREE_HOOK(size) (void)0
+#endif
 } // namespace doris
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 2b9ee2fb3f..b7bc9a179b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -91,7 +91,6 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
                                             const PTransmitDataParams* request,
                                             PTransmitDataResult* response,
                                             google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
              << " node=" << request->node_id();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -105,7 +104,6 @@ void PInternalServiceImpl<T>::transmit_data_by_http(google::protobuf::RpcControl
                                                  const PEmptyRequest* request,
                                                  PTransmitDataResult* response,
                                                  google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     PTransmitDataParams* request_raw = new PTransmitDataParams();
     google::protobuf::Closure* done_raw =
             new NewHttpClosure<PTransmitDataParams>(request_raw, done);
@@ -163,7 +161,6 @@ void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController
                                                  const PTabletWriterOpenRequest* request,
                                                  PTabletWriterOpenResult* response,
                                                  google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id()
              << ", txn_id=" << request->txn_id();
     brpc::ClosureGuard closure_guard(done);
@@ -181,7 +178,6 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
                                                  const PExecPlanFragmentRequest* request,
                                                  PExecPlanFragmentResult* response,
                                                  google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
     bool compact = request->has_compact() ? request->compact() : false;
@@ -207,7 +203,6 @@ void PInternalServiceImpl<T>::exec_plan_fragment_start(google::protobuf::RpcCont
                                                     const PExecPlanFragmentStartRequest* request,
                                                     PExecPlanFragmentResult* result,
                                                     google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     auto st = _exec_env->fragment_mgr()->start_query_execution(request);
     st.to_protobuf(result->mutable_status());
@@ -218,7 +213,6 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
                                                    const PTabletWriterAddBatchRequest* request,
                                                    PTabletWriterAddBatchResult* response,
                                                    google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     _tablet_writer_add_batch(cntl_base, request, response, done);
 }
 
@@ -226,7 +220,6 @@ template <typename T>
 void PInternalServiceImpl<T>::tablet_writer_add_batch_by_http(
         google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
         PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest();
     google::protobuf::Closure* done_raw =
             new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done);
@@ -281,7 +274,6 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
                                                    const PTabletWriterCancelRequest* request,
                                                    PTabletWriterCancelResult* response,
                                                    google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id()
              << ", sender_id=" << request->sender_id();
     brpc::ClosureGuard closure_guard(done);
@@ -327,7 +319,6 @@ void PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll
                                                    const PCancelPlanFragmentRequest* request,
                                                    PCancelPlanFragmentResult* result,
                                                    google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId tid;
     tid.__set_hi(request->finst_id().hi());
@@ -352,7 +343,6 @@ template <typename T>
 void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_base,
                                          const PFetchDataRequest* request, PFetchDataResult* result,
                                          google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
     _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
@@ -362,7 +352,6 @@ template <typename T>
 void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controller,
                                        const PProxyRequest* request, PProxyResult* response,
                                        google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     // PProxyRequest is defined in gensrc/proto/internal_service.proto
     // Currently it supports 2 kinds of requests:
@@ -425,7 +414,6 @@ void PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController* cont
                                            const PUpdateCacheRequest* request,
                                            PCacheResponse* response,
                                            google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->update(request, response);
 }
@@ -435,7 +423,6 @@ void PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController* contr
                                           const PFetchCacheRequest* request,
                                           PFetchCacheResult* result,
                                           google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->fetch(request, result);
 }
@@ -445,7 +432,6 @@ void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* contr
                                           const PClearCacheRequest* request,
                                           PCacheResponse* response,
                                           google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->clear(request, response);
 }
@@ -455,7 +441,6 @@ void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* co
                                            const ::doris::PMergeFilterRequest* request,
                                            ::doris::PMergeFilterResponse* response,
                                            ::google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     auto buf = static_cast<brpc::Controller*>(controller)->request_attachment();
     Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data());
@@ -470,7 +455,6 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
                                            const ::doris::PPublishFilterRequest* request,
                                            ::doris::PPublishFilterResponse* response,
                                            ::google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
     UniqueId unique_id(request->query_id());
@@ -487,7 +471,6 @@ template <typename T>
 void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* controller,
                                         const PSendDataRequest* request, PSendDataResult* response,
                                         google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -511,7 +494,6 @@ template <typename T>
 void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller,
                                      const PCommitRequest* request, PCommitResult* response,
                                      google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -530,7 +512,6 @@ template <typename T>
 void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controller,
                                        const PRollbackRequest* request, PRollbackResult* response,
                                        google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -550,7 +531,6 @@ void PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController
                                                  const PConstantExprRequest* request,
                                                  PConstantExprResult* response,
                                                  google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
 
@@ -587,7 +567,6 @@ void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn
                                           const PTransmitDataParams* request,
                                           PTransmitDataResult* response,
                                           google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     // TODO(zxy) delete in 1.2 version
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_block<PTransmitDataParams>(request, cntl);
@@ -600,7 +579,6 @@ void PInternalServiceImpl<T>::transmit_block_by_http(google::protobuf::RpcContro
                                                   const PEmptyRequest* request,
                                                   PTransmitDataResult* response,
                                                   google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     PTransmitDataParams* request_raw = new PTransmitDataParams();
     google::protobuf::Closure* done_raw =
             new NewHttpClosure<PTransmitDataParams>(request_raw, done);
@@ -658,7 +636,6 @@ void PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController*
                                                 const PCheckRPCChannelRequest* request,
                                                 PCheckRPCChannelResponse* response,
                                                 google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(0);
     if (request->data().size() != request->size()) {
@@ -686,7 +663,6 @@ void PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController*
                                                 const PResetRPCChannelRequest* request,
                                                 PResetRPCChannelResponse* response,
                                                 google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(0);
     if (request->all()) {
@@ -721,7 +697,6 @@ void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_b
                                          const PHandShakeRequest* request,
                                          PHandShakeResponse* response,
                                          google::protobuf::Closure* done) {
-    SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     if (request->has_hello()) {
         response->set_hello(request->hello());
diff --git a/be/test/testutil/run_all_tests.cpp b/be/test/testutil/run_all_tests.cpp
index 842282e8c6..aa2d0a74cf 100644
--- a/be/test/testutil/run_all_tests.cpp
+++ b/be/test/testutil/run_all_tests.cpp
@@ -27,6 +27,15 @@
 #include "util/mem_info.h"
 
 int main(int argc, char** argv) {
+    std::shared_ptr<doris::MemTrackerLimiter> process_mem_tracker =
+            std::make_shared<doris::MemTrackerLimiter>(-1, "Process");
+    std::shared_ptr<doris::MemTrackerLimiter> orphan_mem_tracker =
+            std::make_shared<doris::MemTrackerLimiter>(-1, "Orphan", process_mem_tracker);
+    std::shared_ptr<doris::MemTrackerLimiter> bthread_mem_tracker =
+            std::make_shared<doris::MemTrackerLimiter>(-1, "Bthread", orphan_mem_tracker);
+    doris::ExecEnv::GetInstance()->set_global_mem_tracker(process_mem_tracker, orphan_mem_tracker,
+                                                          bthread_mem_tracker);
+    doris::thread_context()->_thread_mem_tracker_mgr->init();
     doris::StoragePageCache::create_global_cache(1 << 30, 10);
     doris::SegmentLoader::create_global_instance(1000);
     std::string conf = std::string(getenv("DORIS_HOME")) + "/conf/be.conf";


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org