You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/27 12:34:08 UTC

[incubator-doris] branch master updated: [feature-wip] (memory tracker) (step5) Fix track bthread, fix track vectorized query (#9145)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 26bc462e1c [feature-wip] (memory tracker) (step5) Fix track bthread, fix track vectorized query (#9145)
26bc462e1c is described below

commit 26bc462e1c46dd751de15d3b477d59015a948476
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed Apr 27 20:34:02 2022 +0800

    [feature-wip] (memory tracker) (step5) Fix track bthread, fix track vectorized query (#9145)
    
    1. fix track bthread
    - Bthread, a high performance M:N thread library used by brpc. In Doris, a brpc server response runs on one bthread, possibly on multiple pthreads. Currently, MemTracker consumption relies on pthread local variables (TLS).
    - This caused pthread TLS MemTracker confusion when switching pthread TLS MemTracker in brpc server response. So replacing pthread TLS with bthread TLS in the brpc server response saves the MemTracker.
    Ref: https://github.com/apache/incubator-brpc/blob/731730da85f6af5c25012b4c83ab5bb371320cf8/docs/en/server.md#bthread-local
    
    2. fix track vectorized query
    - Added track mmap. Currently, mmap allocates memory in many places of the vectorized execution engine.
    - Refactored ThreadContext to avoid dependency conflicts and make it easier to debug.
    - Fix some bugs.
---
 be/src/common/utils.h                         |   2 +
 be/src/exec/exchange_node.cpp                 |   2 +-
 be/src/exec/olap_scanner.cpp                  |   2 +-
 be/src/exec/tablet_sink.cpp                   |   3 +-
 be/src/exec/tablet_sink.h                     |   4 +-
 be/src/olap/byte_buffer.cpp                   |  11 +-
 be/src/olap/lru_cache.cpp                     |   3 +-
 be/src/olap/out_stream.cpp                    |   2 +-
 be/src/olap/out_stream.h                      |   1 -
 be/src/olap/rowset/column_writer.cpp          |   3 +-
 be/src/olap/rowset/column_writer.h            |   1 -
 be/src/olap/rowset/segment_v2/segment.cpp     |   5 +-
 be/src/runtime/bufferpool/system_allocator.cc |   6 +
 be/src/runtime/disk_io_mgr.cc                 |   8 +-
 be/src/runtime/fold_constant_executor.cpp     |   2 +-
 be/src/runtime/fragment_mgr.cpp               |   3 +-
 be/src/runtime/mem_pool.cpp                   |   2 +-
 be/src/runtime/mem_tracker.cpp                |  20 ++-
 be/src/runtime/mem_tracker.h                  |   4 +
 be/src/runtime/memory/chunk_allocator.cpp     |  16 +--
 be/src/runtime/memory/system_allocator.cpp    |   5 +
 be/src/runtime/plan_fragment_executor.cpp     |   8 +-
 be/src/runtime/row_batch.cpp                  |   4 +-
 be/src/runtime/runtime_filter_mgr.cpp         |   4 +-
 be/src/runtime/sorted_run_merger.cc           |   2 +-
 be/src/runtime/tcmalloc_hook.h                |   4 +-
 be/src/runtime/thread_context.cpp             | 127 ++++++++++++++++++
 be/src/runtime/thread_context.h               | 181 +++++++++++---------------
 be/src/runtime/thread_mem_tracker_mgr.cpp     |  18 +--
 be/src/runtime/thread_mem_tracker_mgr.h       | 158 ++++++++++++++--------
 be/src/service/brpc.h                         |  29 +----
 be/src/service/{brpc.h => brpc_conflict.h}    |  18 +--
 be/src/service/internal_service.cpp           |  30 +++++
 be/src/util/bit_util.h                        |   1 -
 be/src/util/doris_metrics.cpp                 |   2 +
 be/src/util/doris_metrics.h                   |   2 +
 be/src/util/file_utils.cpp                    |   3 +
 be/src/vec/common/allocator.h                 |  16 ++-
 be/src/vec/exec/vexchange_node.cpp            |   2 +-
 39 files changed, 452 insertions(+), 262 deletions(-)

diff --git a/be/src/common/utils.h b/be/src/common/utils.h
index c0ca459423..91eb4427c9 100644
--- a/be/src/common/utils.h
+++ b/be/src/common/utils.h
@@ -21,7 +21,9 @@
 
 namespace doris {
 
+#ifndef ARRAY_SIZE
 #define ARRAY_SIZE(a) (sizeof(a)/sizeof((a)[0]))
+#endif
 
 struct AuthInfo {
     std::string user;
diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 53473f90a2..f1c221b65c 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -80,6 +80,7 @@ Status ExchangeNode::prepare(RuntimeState* state) {
 Status ExchangeNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     if (_is_merging) {
         RETURN_IF_ERROR(_sort_exec_exprs.open(state));
@@ -215,7 +216,6 @@ Status ExchangeNode::get_next_merging(RuntimeState* state, RowBatch* output_batc
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Exchange, while merging next."));
 
-    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     RETURN_IF_ERROR(_stream_recvr->get_next(output_batch, eos));
     while ((_num_rows_skipped < _offset)) {
         _num_rows_skipped += output_batch->num_rows();
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 53f9af57a8..68a522236c 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -52,7 +52,7 @@ OlapScanner::OlapScanner(RuntimeState* runtime_state, OlapScanNode* parent, bool
           _version(-1),
           _mem_tracker(MemTracker::create_tracker(
                   tracker->limit(),
-                  tracker->label() + ":OlapScanner:" + thread_local_ctx.get()->thread_id_str(),
+                  tracker->label() + ":OlapScanner:" + tls_ctx()->thread_id_str(),
                   tracker)) {}
 
 Status OlapScanner::prepare(
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index e8b35c16d1..9928b7e26f 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -50,7 +50,7 @@ NodeChannel::NodeChannel(OlapTableSink* parent, IndexChannel* index_channel, int
         _tuple_data_buffer_ptr = &_tuple_data_buffer;
     }
     _node_channel_tracker =
-            MemTracker::create_tracker(-1, "NodeChannel" + thread_local_ctx.get()->thread_id_str());
+            MemTracker::create_tracker(-1, "NodeChannel" + tls_ctx()->thread_id_str());
 }
 
 NodeChannel::~NodeChannel() noexcept {
@@ -654,6 +654,7 @@ void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
 
 void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err,
                                   int64_t tablet_id) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
     const auto& it = _tablets_by_channel.find(node_id);
     if (it == _tablets_by_channel.end()) {
         return;
diff --git a/be/src/exec/tablet_sink.h b/be/src/exec/tablet_sink.h
index 2fa75587e7..1a902e834b 100644
--- a/be/src/exec/tablet_sink.h
+++ b/be/src/exec/tablet_sink.h
@@ -33,6 +33,7 @@
 #include "exec/tablet_info.h"
 #include "gen_cpp/Types_types.h"
 #include "gen_cpp/internal_service.pb.h"
+#include "runtime/thread_context.h"
 #include "util/bitmap.h"
 #include "util/countdown_latch.h"
 #include "util/ref_count_closure.h"
@@ -325,6 +326,7 @@ public:
 
     void for_each_node_channel(
             const std::function<void(const std::shared_ptr<NodeChannel>&)>& func) {
+        SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
         for (auto& it : _node_channels) {
             func(it.second);
         }
@@ -365,7 +367,7 @@ private:
     std::unordered_map<int64_t, std::string> _failed_channels_msgs;
     Status _intolerable_failure_status = Status::OK();
 
-    std::shared_ptr<MemTracker> _index_channel_tracker; // TODO(zxy) use after
+    std::shared_ptr<MemTracker> _index_channel_tracker;
 };
 
 // Write data to Olap Table.
diff --git a/be/src/olap/byte_buffer.cpp b/be/src/olap/byte_buffer.cpp
index a3099e4e25..822b18cc50 100644
--- a/be/src/olap/byte_buffer.cpp
+++ b/be/src/olap/byte_buffer.cpp
@@ -20,6 +20,7 @@
 #include <sys/mman.h>
 
 #include "olap/utils.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -42,6 +43,8 @@ void StorageByteBuffer::BufDeleter::operator()(char* p) {
         if (0 != munmap(p, _mmap_length)) {
             LOG(FATAL) << "fail to munmap: mem=" << p << ", len=" << _mmap_length
                        << ", errno=" << Errno::no() << ", errno_str=" << Errno::str();
+        } else {
+            RELEASE_THREAD_LOCAL_MEM_TRACKER(_mmap_length);
         }
     } else {
         delete[] p;
@@ -93,10 +96,12 @@ StorageByteBuffer* StorageByteBuffer::reference_buffer(StorageByteBuffer* refere
 
 StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int prot, int flags,
                                            int fd, uint64_t offset) {
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(length);
     char* memory = (char*)::mmap(start, length, prot, flags, fd, offset);
 
     if (MAP_FAILED == memory) {
         OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", Errno::no(), Errno::str());
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
 
@@ -108,6 +113,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(void* start, uint64_t length, int pro
     if (nullptr == buf) {
         deleter(memory);
         OLAP_LOG_WARNING("fail to allocate StorageByteBuffer.");
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
 
@@ -128,10 +134,12 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset
 
     size_t length = handler->length();
     int fd = handler->fd();
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(length);
     char* memory = (char*)::mmap(nullptr, length, prot, flags, fd, offset);
 
     if (MAP_FAILED == memory) {
         OLAP_LOG_WARNING("fail to mmap. [errno='%d' errno_str='%s']", Errno::no(), Errno::str());
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
 
@@ -143,6 +151,7 @@ StorageByteBuffer* StorageByteBuffer::mmap(FileHandler* handler, uint64_t offset
     if (nullptr == buf) {
         deleter(memory);
         OLAP_LOG_WARNING("fail to allocate StorageByteBuffer.");
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
 
@@ -173,7 +182,7 @@ Status StorageByteBuffer::put(uint64_t index, char src) {
 }
 
 Status StorageByteBuffer::put(const char* src, uint64_t src_size, uint64_t offset,
-                                  uint64_t length) {
+                              uint64_t length) {
     //没有足够的空间可以写
     if (length > remaining()) {
         return Status::OLAPInternalError(OLAP_ERR_BUFFER_OVERFLOW);
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 0f5ebaa395..1a045b24a3 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -475,8 +475,7 @@ Cache::Handle* ShardedLRUCache::insert(const CacheKey& key, void* value, size_t
                                        CachePriority priority) {
     // The memory of the parameter value should be recorded in the tls mem tracker,
     // transfer the memory ownership of the value to ShardedLRUCache::_mem_tracker.
-    thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
-                                                                                charge);
+    tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), charge);
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const uint32_t hash = _hash_slice(key);
     return _shards[_shard(hash)]->insert(key, hash, value, charge, deleter, priority);
diff --git a/be/src/olap/out_stream.cpp b/be/src/olap/out_stream.cpp
index 475d0ca98f..b5d19b841a 100644
--- a/be/src/olap/out_stream.cpp
+++ b/be/src/olap/out_stream.cpp
@@ -25,7 +25,7 @@
 namespace doris {
 
 OutStreamFactory::OutStreamFactory(CompressKind compress_kind, uint32_t stream_buffer_size)
-        : _compress_kind(compress_kind), _stream_buffer_size(stream_buffer_size) {
+        : _stream_buffer_size(stream_buffer_size) {
     switch (compress_kind) {
     case COMPRESS_NONE:
         _compressor = nullptr;
diff --git a/be/src/olap/out_stream.h b/be/src/olap/out_stream.h
index a67a6c5204..b1954ce6cf 100644
--- a/be/src/olap/out_stream.h
+++ b/be/src/olap/out_stream.h
@@ -141,7 +141,6 @@ public:
 
 private:
     std::map<StreamName, OutStream*> _streams; // All created streams
-    CompressKind _compress_kind;
     Compressor _compressor;
     uint32_t _stream_buffer_size;
 
diff --git a/be/src/olap/rowset/column_writer.cpp b/be/src/olap/rowset/column_writer.cpp
index d750db1ec1..6128ab4f6f 100644
--- a/be/src/olap/rowset/column_writer.cpp
+++ b/be/src/olap/rowset/column_writer.cpp
@@ -488,8 +488,7 @@ void ByteColumnWriter::record_position() {
 
 IntegerColumnWriter::IntegerColumnWriter(uint32_t column_id, uint32_t unique_column_id,
                                          OutStreamFactory* stream_factory, bool is_singed)
-        : _column_id(column_id),
-          _unique_column_id(unique_column_id),
+        : _unique_column_id(unique_column_id),
           _stream_factory(stream_factory),
           _writer(nullptr),
           _is_signed(is_singed) {}
diff --git a/be/src/olap/rowset/column_writer.h b/be/src/olap/rowset/column_writer.h
index 9fe2d60f2a..a40a3780ba 100644
--- a/be/src/olap/rowset/column_writer.h
+++ b/be/src/olap/rowset/column_writer.h
@@ -179,7 +179,6 @@ public:
     Status flush() { return _writer->flush(); }
 
 private:
-    uint32_t _column_id;
     uint32_t _unique_column_id;
     OutStreamFactory* _stream_factory;
     RunLengthIntegerWriter* _writer;
diff --git a/be/src/olap/rowset/segment_v2/segment.cpp b/be/src/olap/rowset/segment_v2/segment.cpp
index b06083e996..7970d0da5b 100644
--- a/be/src/olap/rowset/segment_v2/segment.cpp
+++ b/be/src/olap/rowset/segment_v2/segment.cpp
@@ -51,10 +51,9 @@ Segment::Segment(const FilePathDesc& path_desc, uint32_t segment_id,
                  const TabletSchema* tablet_schema)
         : _path_desc(path_desc), _segment_id(segment_id), _tablet_schema(tablet_schema) {
 #ifndef BE_TEST
-    _mem_tracker = MemTracker::create_virtual_tracker(
-            -1, "Segment", StorageEngine::instance()->tablet_mem_tracker());
+    _mem_tracker = StorageEngine::instance()->tablet_mem_tracker();
 #else
-    _mem_tracker = MemTracker::create_virtual_tracker(-1, "Segment");
+    _mem_tracker = MemTracker::get_process_tracker();
 #endif
 }
 
diff --git a/be/src/runtime/bufferpool/system_allocator.cc b/be/src/runtime/bufferpool/system_allocator.cc
index a2dfc394b1..3fa69e981e 100644
--- a/be/src/runtime/bufferpool/system_allocator.cc
+++ b/be/src/runtime/bufferpool/system_allocator.cc
@@ -22,6 +22,7 @@
 
 #include "common/config.h"
 #include "gutil/strings/substitute.h"
+#include "runtime/thread_context.h"
 #include "util/bit_util.h"
 #include "util/error_util.h"
 
@@ -75,9 +76,11 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) {
         // Map an extra huge page so we can fix up the alignment if needed.
         map_len += HUGE_PAGE_SIZE;
     }
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(map_len);
     uint8_t* mem = reinterpret_cast<uint8_t*>(
             mmap(nullptr, map_len, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE, -1, 0));
     if (mem == MAP_FAILED) {
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len);
         return Status::BufferAllocFailed("mmap failed");
     }
 
@@ -89,10 +92,12 @@ Status SystemAllocator::AllocateViaMMap(int64_t len, uint8_t** buffer_mem) {
         if (misalignment != 0) {
             uintptr_t fixup = HUGE_PAGE_SIZE - misalignment;
             munmap(mem, fixup);
+            RELEASE_THREAD_LOCAL_MEM_TRACKER(fixup);
             mem += fixup;
             map_len -= fixup;
         }
         munmap(mem + len, map_len - len);
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(map_len - len);
         DCHECK_EQ(reinterpret_cast<uintptr_t>(mem) % HUGE_PAGE_SIZE, 0) << mem;
         // Mark the buffer as a candidate for promotion to huge pages. The Linux Transparent
         // Huge Pages implementation will try to back the memory with a huge page if it is
@@ -142,6 +147,7 @@ Status SystemAllocator::AllocateViaMalloc(int64_t len, uint8_t** buffer_mem) {
 void SystemAllocator::Free(BufferPool::BufferHandle&& buffer) {
     if (config::mmap_buffers) {
         int rc = munmap(buffer.data(), buffer.len());
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(buffer.len());
         DCHECK_EQ(rc, 0) << "Unexpected munmap() error: " << errno;
     } else {
         bool use_huge_pages = buffer.len() % HUGE_PAGE_SIZE == 0 && config::madvise_huge_pages;
diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc
index ebedb58057..1f65851809 100644
--- a/be/src/runtime/disk_io_mgr.cc
+++ b/be/src/runtime/disk_io_mgr.cc
@@ -220,7 +220,7 @@ void DiskIoMgr::BufferDescriptor::reset(RequestContext* reader, ScanRange* range
     _eosr = false;
     _status = Status::OK();
     // Consume in the tls mem tracker when the buffer is allocated.
-    _buffer_mem_tracker = thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get();
+    _buffer_mem_tracker = tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get();
 }
 
 void DiskIoMgr::BufferDescriptor::return_buffer() {
@@ -739,7 +739,7 @@ char* DiskIoMgr::get_free_buffer(int64_t* buffer_size) {
         buffer = new char[*buffer_size];
     } else {
         // This means the buffer's memory ownership is transferred from DiskIoMgr to tls tracker.
-        _mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), *buffer_size);
+        _mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), *buffer_size);
         buffer = _free_buffers[idx].front();
         _free_buffers[idx].pop_front();
     }
@@ -767,7 +767,7 @@ void DiskIoMgr::gc_io_buffers(int64_t bytes_to_free) {
     // The deleted buffer is released in the tls mem tracker, the deleted buffer belongs to DiskIoMgr,
     // so the freed memory should be recorded in the DiskIoMgr mem tracker. So if the tls mem tracker
     // and the DiskIoMgr tracker are different, transfer memory ownership.
-    _mem_tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), bytes_freed);
+    _mem_tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), bytes_freed);
 }
 
 void DiskIoMgr::return_free_buffer(BufferDescriptor* desc) {
@@ -793,7 +793,7 @@ void DiskIoMgr::return_free_buffer(char* buffer, int64_t buffer_size, MemTracker
         // The deleted buffer is released in the tls mem tracker. When the buffer was allocated,
         // it was consumed in BufferDescriptor->buffer_mem_tracker, so if the tls mem tracker and
         // the tracker in the parameters are different, transfer memory ownership.
-        tracker->transfer_to(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(), buffer_size);
+        tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(), buffer_size);
     }
 }
 
diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp
index 4a51b91932..5cdfcb084e 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -44,6 +44,7 @@ TUniqueId FoldConstantExecutor::_dummy_id;
 
 Status FoldConstantExecutor::fold_constant_expr(
         const TFoldConstantParams& params, PConstantExprResult* response) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const auto& expr_map = params.expr_map;
     auto expr_result_map = response->mutable_expr_result_map();
 
@@ -53,7 +54,6 @@ Status FoldConstantExecutor::fold_constant_expr(
     if (UNLIKELY(!status.ok())) {
         return status;
     }
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     for (const auto& m : expr_map) {
         PExprResultMap pexpr_result_map;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 34ac356909..d12bc85f26 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -467,8 +467,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
             .instance_id(exec_state->fragment_instance_id())
             .tag("pthread_id", std::to_string((uintptr_t)pthread_self()));
 #ifndef BE_TEST
-    SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state()->query_type(),
-                              print_id(exec_state->query_id()), exec_state->fragment_instance_id(),
+    SCOPED_ATTACH_TASK_THREAD(exec_state->executor()->runtime_state(),
                               exec_state->executor()->runtime_state()->instance_mem_tracker());
 #endif
     exec_state->execute();
diff --git a/be/src/runtime/mem_pool.cpp b/be/src/runtime/mem_pool.cpp
index a2b9bf3424..adc86f0e67 100644
--- a/be/src/runtime/mem_pool.cpp
+++ b/be/src/runtime/mem_pool.cpp
@@ -65,7 +65,7 @@ MemPool::MemPool()
           total_allocated_bytes_(0),
           total_reserved_bytes_(0),
           peak_allocated_bytes_(0),
-          _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get()) {}
+          _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get()) {}
 
 MemPool::ChunkInfo::ChunkInfo(const Chunk& chunk_) : chunk(chunk_), allocated_bytes(0) {
     DorisMetrics::instance()->memory_pool_bytes_total->increment(chunk.size);
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index 86f7c0370b..49f8862b0b 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -60,6 +60,19 @@ MemTracker* MemTracker::get_raw_process_tracker() {
     return raw_process_tracker;
 }
 
+// Track memory for all brpc server responses.
+static std::shared_ptr<MemTracker> brpc_server_tracker;
+static GoogleOnceType brpc_server_tracker_once = GOOGLE_ONCE_INIT;
+
+void MemTracker::create_brpc_server_tracker() {
+    brpc_server_tracker = MemTracker::create_tracker(-1, "Brpc", get_process_tracker(), MemTrackerLevel::OVERVIEW);
+}
+
+std::shared_ptr<MemTracker> MemTracker::get_brpc_server_tracker() {
+    GoogleOnceInit(&brpc_server_tracker_once, &MemTracker::create_brpc_server_tracker);
+    return brpc_server_tracker;
+}
+
 void MemTracker::list_process_trackers(std::vector<std::shared_ptr<MemTracker>>* trackers) {
     trackers->clear();
     std::deque<std::shared_ptr<MemTracker>> to_process;
@@ -88,7 +101,8 @@ std::shared_ptr<MemTracker> MemTracker::create_tracker(int64_t byte_limit, const
                                                        const std::shared_ptr<MemTracker>& parent,
                                                        MemTrackerLevel level,
                                                        RuntimeProfile* profile) {
-    std::shared_ptr<MemTracker> reset_parent = parent ? parent : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker();
+    std::shared_ptr<MemTracker> reset_parent =
+            parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
     DCHECK(reset_parent);
 
     std::shared_ptr<MemTracker> tracker(
@@ -102,7 +116,8 @@ std::shared_ptr<MemTracker> MemTracker::create_tracker(int64_t byte_limit, const
 std::shared_ptr<MemTracker> MemTracker::create_virtual_tracker(
         int64_t byte_limit, const std::string& label, const std::shared_ptr<MemTracker>& parent,
         MemTrackerLevel level) {
-   std::shared_ptr<MemTracker> reset_parent = parent ? parent : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker();
+    std::shared_ptr<MemTracker> reset_parent =
+            parent ? parent : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker();
     DCHECK(reset_parent);
 
     std::shared_ptr<MemTracker> tracker(
@@ -121,6 +136,7 @@ MemTracker::MemTracker(int64_t byte_limit, const std::string& label,
                        RuntimeProfile* profile)
         : _limit(byte_limit),
           _label(label),
+          // Not 100% sure the id is unique. This is generated because it is faster than converting to int after hash.
           _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()),
           _parent(parent),
           _level(level) {
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 3d0e3f7271..74a7b4bef6 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -97,6 +97,8 @@ public:
     // Gets a shared_ptr to the "process" tracker, creating it if necessary.
     static std::shared_ptr<MemTracker> get_process_tracker();
     static MemTracker* get_raw_process_tracker();
+    // Gets a shared_ptr to the "brpc server" tracker, creating it if necessary.
+    static std::shared_ptr<MemTracker> get_brpc_server_tracker();
 
     Status check_sys_mem_info(int64_t bytes) {
         if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
@@ -464,6 +466,8 @@ private:
 
     // Creates the process tracker.
     static void create_process_tracker();
+    // Creates the brpc server tracker.
+    static void create_brpc_server_tracker();
 
     // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit.
     int64_t _limit;
diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp
index 2d2a9a5c01..7f8259c034 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -17,6 +17,8 @@
 
 #include "runtime/memory/chunk_allocator.h"
 
+#include <sanitizer/asan_interface.h>
+
 #include <list>
 #include <mutex>
 
@@ -134,8 +136,7 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit)
 
 Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker, bool check_limits) {
     MemTracker* reset_tracker =
-            tracker ? tracker
-                    : thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get();
+            tracker ? tracker : tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get();
     // In advance, transfer the memory ownership of allocate from ChunkAllocator::tracker to the parameter tracker.
     // Next, if the allocate is successful, it will exit normally;
     // if the allocate fails, return this part of the memory to the parameter tracker.
@@ -178,7 +179,7 @@ Status ChunkAllocator::allocate(size_t size, Chunk* chunk, MemTracker* tracker,
         chunk->data = SystemAllocator::allocate(size);
         // The allocated chunk is consumed in the tls mem tracker, we want to consume in the ChunkAllocator tracker,
         // transfer memory ownership. TODO(zxy) replace with switch tls tracker
-        thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size);
+        tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(), size);
     }
     chunk_pool_system_alloc_count->increment(1);
     chunk_pool_system_alloc_cost_ns->increment(cost_ns);
@@ -208,9 +209,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) {
                 // it was consumed in the parameter tracker, so if the tls mem tracker and the parameter
                 // tracker are different, transfer memory ownership.
                 if (tracker)
-                    tracker->transfer_to(
-                            thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker().get(),
-                            chunk.size);
+                    tracker->transfer_to(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker().get(),
+                                         chunk.size);
             }
             chunk_pool_system_free_count->increment(1);
             chunk_pool_system_free_cost_ns->increment(cost_ns);
@@ -223,8 +223,8 @@ void ChunkAllocator::free(const Chunk& chunk, MemTracker* tracker) {
     if (tracker) {
         tracker->transfer_to(_mem_tracker.get(), chunk.size);
     } else {
-        thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(
-                _mem_tracker.get(), chunk.size);
+        tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->transfer_to(_mem_tracker.get(),
+                                                                       chunk.size);
     }
     _arenas[chunk.core_id]->push_free_chunk(chunk.data, chunk.size);
 }
diff --git a/be/src/runtime/memory/system_allocator.cpp b/be/src/runtime/memory/system_allocator.cpp
index 374cec5557..6ed5906f00 100644
--- a/be/src/runtime/memory/system_allocator.cpp
+++ b/be/src/runtime/memory/system_allocator.cpp
@@ -23,6 +23,7 @@
 
 #include "common/config.h"
 #include "common/logging.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -43,6 +44,8 @@ void SystemAllocator::free(uint8_t* ptr, size_t length) {
             char buf[64];
             LOG(ERROR) << "fail to free memory via munmap, errno=" << errno
                        << ", errmsg=" << strerror_r(errno, buf, 64);
+        } else {
+            RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         }
     } else {
         ::free(ptr);
@@ -63,12 +66,14 @@ uint8_t* SystemAllocator::allocate_via_malloc(size_t length) {
 }
 
 uint8_t* SystemAllocator::allocate_via_mmap(size_t length) {
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(length);
     auto ptr = (uint8_t*)mmap(nullptr, length, PROT_READ | PROT_WRITE, MAP_ANONYMOUS | MAP_PRIVATE,
                               -1, 0);
     if (ptr == MAP_FAILED) {
         char buf[64];
         LOG(ERROR) << "fail to allocate memory via mmap, errno=" << errno
                    << ", errmsg=" << strerror_r(errno, buf, 64);
+        RELEASE_THREAD_LOCAL_MEM_TRACKER(length);
         return nullptr;
     }
     return ptr;
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 6cdbce39d2..2745215bfb 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -89,9 +89,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
     _runtime_state->set_query_fragments_ctx(fragments_ctx);
 
     RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
-    SCOPED_ATTACH_TASK_THREAD(_runtime_state->query_type(), print_id(_runtime_state->query_id()),
-                              _runtime_state->fragment_instance_id(),
-                              _runtime_state->instance_mem_tracker());
+    SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), _runtime_state->instance_mem_tracker());
     _runtime_state->set_be_number(request.backend_num);
     if (request.__isset.backend_id) {
         _runtime_state->set_backend_id(request.backend_id);
@@ -442,9 +440,7 @@ void PlanFragmentExecutor::_collect_node_statistics() {
 }
 
 void PlanFragmentExecutor::report_profile() {
-    SCOPED_ATTACH_TASK_THREAD(_runtime_state->query_type(), print_id(_runtime_state->query_id()),
-                              _runtime_state->fragment_instance_id(),
-                              _runtime_state->instance_mem_tracker());
+    SCOPED_ATTACH_TASK_THREAD(_runtime_state.get(), _runtime_state->instance_mem_tracker());
     VLOG_FILE << "report_profile(): instance_id=" << _runtime_state->fragment_instance_id();
     DCHECK(_report_status_cb);
 
diff --git a/be/src/runtime/row_batch.cpp b/be/src/runtime/row_batch.cpp
index 614f7888f1..06306df512 100644
--- a/be/src/runtime/row_batch.cpp
+++ b/be/src/runtime/row_batch.cpp
@@ -44,7 +44,7 @@ const int RowBatch::AT_CAPACITY_MEM_USAGE = 8 * 1024 * 1024;
 const int RowBatch::FIXED_LEN_BUFFER_LIMIT = AT_CAPACITY_MEM_USAGE / 2;
 
 RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity)
-        : _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()),
+        : _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()),
           _has_in_flight_row(false),
           _num_rows(0),
           _num_uncommitted_rows(0),
@@ -70,7 +70,7 @@ RowBatch::RowBatch(const RowDescriptor& row_desc, int capacity)
 // to allocated string data in special mempool
 // (change via python script that runs over Data_types.cc)
 RowBatch::RowBatch(const RowDescriptor& row_desc, const PRowBatch& input_batch)
-        : _mem_tracker(thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()),
+        : _mem_tracker(tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()),
           _has_in_flight_row(false),
           _num_rows(input_batch.num_rows()),
           _num_uncommitted_rows(0),
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 4eade51b46..008e9d2e58 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -156,8 +156,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     // LOG(INFO) << "entity filter id:" << filter_id;
     cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, _fragment_instance_id);
     cntVal->tracker = MemTracker::create_tracker(
-            -1, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id,
-            thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker());
+            -1, tls_ctx()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id,
+            tls_ctx()->_thread_mem_tracker_mgr->mem_tracker());
     _filter_map.emplace(filter_id, cntVal);
     return Status::OK();
 }
diff --git a/be/src/runtime/sorted_run_merger.cc b/be/src/runtime/sorted_run_merger.cc
index 11042a4d9d..5bf518178d 100644
--- a/be/src/runtime/sorted_run_merger.cc
+++ b/be/src/runtime/sorted_run_merger.cc
@@ -129,7 +129,7 @@ public:
         *done = false;
         _pull_task_thread =
         std::thread(&SortedRunMerger::ParallelBatchedRowSupplier::process_sorted_run_task,
-                    this, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker());
+                    this, tls_ctx()->_thread_mem_tracker_mgr->mem_tracker());
 
         RETURN_IF_ERROR(next(nullptr, done));
         return Status::OK();
diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h
index 9ba55fde8e..548b886299 100644
--- a/be/src/runtime/tcmalloc_hook.h
+++ b/be/src/runtime/tcmalloc_hook.h
@@ -36,11 +36,11 @@
 //  destructor to control the behavior of consume can lead to unexpected behavior,
 //  like this: if (LIKELY(doris::start_thread_mem_tracker)) {
 void new_hook(const void* ptr, size_t size) {
-    doris::thread_local_ctx.get()->consume_mem(tc_nallocx(size, 0));
+    doris::tls_ctx()->consume_mem(tc_nallocx(size, 0));
 }
 
 void delete_hook(const void* ptr) {
-    doris::thread_local_ctx.get()->release_mem(tc_malloc_size(const_cast<void*>(ptr)));
+    doris::tls_ctx()->release_mem(tc_malloc_size(const_cast<void*>(ptr)));
 }
 
 void init_hook() {
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index 871cd1ebf0..0b71101d97 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -17,6 +17,9 @@
 
 #include "runtime/thread_context.h"
 
+#include "runtime/runtime_state.h"
+#include "util/doris_metrics.h"
+
 namespace doris {
 
 DEFINE_STATIC_THREAD_LOCAL(ThreadContext, ThreadContextPtr, thread_local_ctx);
@@ -29,4 +32,128 @@ ThreadContext* ThreadContextPtr::get() {
     return thread_local_ctx;
 }
 
+AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id,
+                                   const TUniqueId& fragment_instance_id,
+                                   const std::shared_ptr<doris::MemTracker>& mem_tracker) {
+    DCHECK(task_id != "");
+    tls_ctx()->attach(type, task_id, fragment_instance_id, mem_tracker);
+}
+
+AttachTaskThread::AttachTaskThread(const ThreadContext::TaskType& type,
+                                   const std::shared_ptr<doris::MemTracker>& mem_tracker) {
+#ifndef BE_TEST
+    DCHECK(mem_tracker);
+#endif
+    tls_ctx()->attach(type, "", TUniqueId(), mem_tracker);
+}
+
+AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
+                                   const std::shared_ptr<doris::MemTracker>& mem_tracker) {
+#ifndef BE_TEST
+    DCHECK(mem_tracker);
+#endif
+    tls_ctx()->attach(query_to_task_type(query_type), "", TUniqueId(), mem_tracker);
+}
+
+AttachTaskThread::AttachTaskThread(const TQueryType::type& query_type,
+                                   const std::shared_ptr<doris::MemTracker>& mem_tracker,
+                                   const std::string& task_id,
+                                   const TUniqueId& fragment_instance_id) {
+#ifndef BE_TEST
+    DCHECK(task_id != "");
+    DCHECK(fragment_instance_id != TUniqueId());
+    DCHECK(mem_tracker);
+#endif
+    tls_ctx()->attach(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker);
+}
+
+AttachTaskThread::AttachTaskThread(const RuntimeState* runtime_state,
+                                   const std::shared_ptr<doris::MemTracker>& mem_tracker) {
+#ifndef BE_TEST
+    DCHECK(print_id(runtime_state->query_id()) != "");
+    DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
+    DCHECK(mem_tracker);
+#endif
+    tls_ctx()->attach(query_to_task_type(runtime_state->query_type()),
+                      print_id(runtime_state->query_id()), runtime_state->fragment_instance_id(),
+                      mem_tracker);
+}
+
+AttachTaskThread::~AttachTaskThread() {
+    tls_ctx()->detach();
+    DorisMetrics::instance()->attach_task_thread_count->increment(1);
+}
+
+template <bool Existed>
+SwitchThreadMemTracker<Existed>::SwitchThreadMemTracker(
+        const std::shared_ptr<doris::MemTracker>& mem_tracker, bool in_task) {
+    if (config::memory_verbose_track) {
+#ifndef BE_TEST
+        DCHECK(mem_tracker);
+        // The thread tracker must be switched after the attach task, otherwise switching
+        // in the main thread will cause the cached tracker not be cleaned up in time.
+        DCHECK(in_task == false || tls_ctx()->_thread_mem_tracker_mgr->is_attach_task());
+        if (Existed) {
+            _old_tracker_id = tls_ctx()->_thread_mem_tracker_mgr->update_tracker<true>(mem_tracker);
+        } else {
+            _old_tracker_id =
+                    tls_ctx()->_thread_mem_tracker_mgr->update_tracker<false>(mem_tracker);
+        }
+#endif
+#ifndef NDEBUG
+        tls_ctx()->_thread_mem_tracker_mgr->switch_count += 1;
+#endif
+    }
+}
+
+template <bool Existed>
+SwitchThreadMemTracker<Existed>::~SwitchThreadMemTracker() {
+    if (config::memory_verbose_track) {
+#ifndef NDEBUG
+        tls_ctx()->_thread_mem_tracker_mgr->switch_count -= 1;
+        DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
+#endif
+#ifndef BE_TEST
+        tls_ctx()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
+#endif
+    }
+}
+
+SwitchThreadMemTrackerErrCallBack::SwitchThreadMemTrackerErrCallBack(
+        const std::string& action_type, bool cancel_work, ERRCALLBACK err_call_back_func) {
+    DCHECK(action_type != std::string());
+    _old_tracker_cb = tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(
+            action_type, cancel_work, err_call_back_func);
+}
+
+SwitchThreadMemTrackerErrCallBack::~SwitchThreadMemTrackerErrCallBack() {
+    tls_ctx()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb);
+    DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1);
+}
+
+SwitchBthread::SwitchBthread() {
+    tls = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+    // First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
+    if (tls == nullptr) {
+        // Create thread-local data on demand.
+        tls = new ThreadContext;
+        tls->_thread_mem_tracker_mgr->init_bthread();
+        // set the data so that next time bthread_getspecific in the thread returns the data.
+        CHECK_EQ(0, bthread_setspecific(btls_key, tls));
+    } else {
+        tls->_thread_mem_tracker_mgr->init_bthread();
+    }
+}
+
+SwitchBthread::~SwitchBthread() {
+    DCHECK(tls != nullptr);
+    tls->_thread_mem_tracker_mgr->clear_untracked_mems();
+#ifndef NDEBUG
+        DorisMetrics::instance()->switch_bthread_count->increment(1);
+#endif
+}
+
+template class SwitchThreadMemTracker<true>;
+template class SwitchThreadMemTracker<false>;
+
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 4d9d60078c..8ab72be634 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -17,15 +17,17 @@
 
 #pragma once
 
+#include <service/brpc_conflict.h>
+// After brpc_conflict.h
+#include <bthread/bthread.h>
+
 #include <string>
 #include <thread>
 
 #include "common/logging.h"
-#include "gen_cpp/Types_types.h"
-#include "runtime/runtime_state.h"
+#include "gen_cpp/PaloInternalService_types.h" // for TQueryType
 #include "runtime/thread_mem_tracker_mgr.h"
 #include "runtime/threadlocal.h"
-#include "util/doris_metrics.h"
 
 // Attach to task when thread starts
 #define SCOPED_ATTACH_TASK_THREAD(type, ...) \
@@ -34,34 +36,50 @@
 // may be different from the order of execution of instructions, which will cause the position of
 // the memory track to be unexpected.
 #define SCOPED_STOP_THREAD_LOCAL_MEM_TRACKER() \
-    auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(true)
+    auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(true)
 #define GLOBAL_STOP_THREAD_LOCAL_MEM_TRACKER() \
-    auto VARNAME_LINENUM(stop_tracker) = StopThreadMemTracker(false)
+    auto VARNAME_LINENUM(stop_tracker) = doris::StopThreadMemTracker(false)
 // Switch thread mem tracker during task execution.
 // After the non-query thread switches the mem tracker, if the thread will not switch the mem
 // tracker again in the short term, can consider manually clear_untracked_mems.
 // The query thread will automatically clear_untracked_mems when detach_task.
 #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<false>(mem_tracker, false)
+    auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker<false>(mem_tracker, false)
 #define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<false>(mem_tracker, true);
+    auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker<false>(mem_tracker, true);
+#define SCOPED_SWITCH_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \
+    auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker<true>(mem_tracker, false)
 #define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<true>(mem_tracker, true)
+    auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTracker<true>(mem_tracker, true)
 // After the non-query thread switches the mem tracker, if the thread will not switch the mem
 // tracker again in the short term, can consider manually clear_untracked_mems.
 // The query thread will automatically clear_untracked_mems when detach_task.
 #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTrackerEndClear(mem_tracker)
+    auto VARNAME_LINENUM(switch_tracker) = doris::SwitchThreadMemTrackerEndClear(mem_tracker)
 #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \
     auto VARNAME_LINENUM(witch_tracker_cb) =                            \
-            SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__)
+            doris::SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__)
+#define SCOPED_SWITCH_BTHREAD() auto VARNAME_LINENUM(switch_bthread) = SwitchBthread()
+// Before switching the same tracker multiple times, add tracker as early as possible,
+// and then call `SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER` to reduce one map find.
+// For example, in the exec_node open phase `add tracker`, it is no longer necessary to determine
+// whether the tracker exists in TLS when switching the tracker in the exec_node get_next phase.
+// TODO(zxy): Duplicate add tracker is currently prohibited, because it will,
+// 1. waste time 2. `_untracked_mems[mem_tracker->id()] = 0` will cause the memory track to be lost.
+// Some places may have to repeat the add tracker. optimize after.
 #define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
-    thread_local_ctx.get()->_thread_mem_tracker_mgr->add_tracker(mem_tracker)
+    doris::tls_ctx()->_thread_mem_tracker_mgr->add_tracker(mem_tracker)
+#define CONSUME_THREAD_LOCAL_MEM_TRACKER(size) \
+    doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(size)
+#define RELEASE_THREAD_LOCAL_MEM_TRACKER(size) \
+    doris::tls_ctx()->_thread_mem_tracker_mgr->noncache_consume(-size)
 
 namespace doris {
 
 class TUniqueId;
 
+extern bthread_key_t btls_key;
+
 // The thread context saves some info about a working thread.
 // 2 requried info:
 //   1. thread_id:   Current thread id, Auto generated.
@@ -80,19 +98,20 @@ public:
         STORAGE = 4
         // to be added ...
     };
-    inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", "STORAGE"};
+    inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION",
+                                                     "STORAGE"};
 
 public:
-    ThreadContext() : _thread_id(std::this_thread::get_id()), _type(TaskType::UNKNOWN) {
+    ThreadContext() : _type(TaskType::UNKNOWN) {
         _thread_mem_tracker_mgr.reset(new ThreadMemTrackerMgr());
-        std::stringstream ss;
-        ss << _thread_id;
-        _thread_id_str = ss.str();
+        _thread_mem_tracker_mgr->init();
+        start_thread_mem_tracker = true;
+        _thread_id = get_thread_id();
     }
 
     void attach(const TaskType& type, const std::string& task_id,
                 const TUniqueId& fragment_instance_id,
-                const std::shared_ptr<MemTracker>& mem_tracker) {
+                const std::shared_ptr<doris::MemTracker>& mem_tracker) {
         DCHECK(_type == TaskType::UNKNOWN && _task_id == "")
                 << ",old tracker label: " << mem_tracker->label()
                 << ",new tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label();
@@ -111,10 +130,15 @@ public:
     }
 
     const std::string& task_id() const { return _task_id; }
-    const std::thread::id& thread_id() const { return _thread_id; }
-    const std::string& thread_id_str() const { return _thread_id_str; }
+    const std::string& thread_id_str() const { return _thread_id; }
     const TUniqueId& fragment_instance_id() const { return _fragment_instance_id; }
 
+    std::string get_thread_id() {
+        std::stringstream ss;
+        ss << std::this_thread::get_id();
+        return ss.str();
+    }
+
     void consume_mem(int64_t size) {
         if (start_thread_mem_tracker) {
             _thread_mem_tracker_mgr->cache_consume(size);
@@ -136,8 +160,7 @@ public:
     std::unique_ptr<ThreadMemTrackerMgr> _thread_mem_tracker_mgr;
 
 private:
-    std::thread::id _thread_id;
-    std::string _thread_id_str;
+    std::string _thread_id;
     TaskType _type;
     std::string _task_id;
     TUniqueId _fragment_instance_id;
@@ -178,55 +201,33 @@ private:
 
 inline thread_local ThreadContextPtr thread_local_ctx;
 
+static ThreadContext* tls_ctx() {
+    ThreadContext* tls = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+    if (tls != nullptr) {
+        return tls;
+    } else {
+        return thread_local_ctx.get();
+    }
+}
+
 class AttachTaskThread {
 public:
     explicit AttachTaskThread(const ThreadContext::TaskType& type, const std::string& task_id,
                               const TUniqueId& fragment_instance_id = TUniqueId(),
-                              const std::shared_ptr<MemTracker>& mem_tracker = nullptr) {
-        DCHECK(task_id != "");
-        thread_local_ctx.get()->attach(type, task_id, fragment_instance_id, mem_tracker);
-    }
+                              const std::shared_ptr<doris::MemTracker>& mem_tracker = nullptr);
 
     explicit AttachTaskThread(const ThreadContext::TaskType& type,
-                              const std::shared_ptr<MemTracker>& mem_tracker) {
-#ifndef BE_TEST
-        DCHECK(mem_tracker);
-#endif
-        thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker);
-    }
+                              const std::shared_ptr<doris::MemTracker>& mem_tracker);
 
     explicit AttachTaskThread(const TQueryType::type& query_type,
-                              const std::shared_ptr<MemTracker>& mem_tracker) {
-#ifndef BE_TEST
-        DCHECK(mem_tracker);
-#endif
-        thread_local_ctx.get()->attach(query_to_task_type(query_type), "", TUniqueId(),
-                                       mem_tracker);
-    }
+                              const std::shared_ptr<doris::MemTracker>& mem_tracker);
 
-    explicit AttachTaskThread(const TQueryType::type& query_type, const std::string& task_id,
-                              const TUniqueId& fragment_instance_id,
-                              const std::shared_ptr<MemTracker>& mem_tracker) {
-#ifndef BE_TEST
-        DCHECK(task_id != "");
-        DCHECK(fragment_instance_id != TUniqueId());
-        DCHECK(mem_tracker);
-#endif
-        thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id,
-                                       fragment_instance_id, mem_tracker);
-    }
+    explicit AttachTaskThread(const TQueryType::type& query_type,
+                              const std::shared_ptr<doris::MemTracker>& mem_tracker,
+                              const std::string& task_id, const TUniqueId& fragment_instance_id);
 
     explicit AttachTaskThread(const RuntimeState* runtime_state,
-                              const std::shared_ptr<MemTracker>& mem_tracker) {
-#ifndef BE_TEST
-        DCHECK(print_id(runtime_state->query_id()) != "");
-        DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
-        DCHECK(mem_tracker);
-#endif
-        thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()),
-                                       print_id(runtime_state->query_id()),
-                                       runtime_state->fragment_instance_id(), mem_tracker);
-    }
+                              const std::shared_ptr<doris::MemTracker>& mem_tracker);
 
     const ThreadContext::TaskType query_to_task_type(const TQueryType::type& query_type) {
         switch (query_type) {
@@ -240,10 +241,7 @@ public:
         }
     }
 
-    ~AttachTaskThread() {
-        thread_local_ctx.get()->detach();
-        DorisMetrics::instance()->attach_task_thread_count->increment(1);
-    }
+    ~AttachTaskThread();
 };
 
 class StopThreadMemTracker {
@@ -263,36 +261,10 @@ private:
 template <bool Existed>
 class SwitchThreadMemTracker {
 public:
-    explicit SwitchThreadMemTracker(const std::shared_ptr<MemTracker>& mem_tracker,
-                                    bool in_task = true) {
-        if (config::memory_verbose_track) {
-#ifndef BE_TEST
-            DCHECK(mem_tracker);
-            // The thread tracker must be switched after the attach task, otherwise switching
-            // in the main thread will cause the cached tracker not be cleaned up in time.
-            DCHECK(in_task == false ||
-                   thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task());
-            if (Existed) {
-                _old_tracker_id =
-                        thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<true>(
-                                mem_tracker);
-            } else {
-                _old_tracker_id =
-                        thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<false>(
-                                mem_tracker);
-            }
-#endif
-        }
-    }
+    explicit SwitchThreadMemTracker(const std::shared_ptr<doris::MemTracker>& mem_tracker,
+                                    bool in_task = true);
 
-    ~SwitchThreadMemTracker() {
-        if (config::memory_verbose_track) {
-#ifndef BE_TEST
-            thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
-            DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
-#endif
-        }
-    }
+    ~SwitchThreadMemTracker();
 
 protected:
     int64_t _old_tracker_id = 0;
@@ -300,11 +272,11 @@ protected:
 
 class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker<false> {
 public:
-    explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr<MemTracker>& mem_tracker)
+    explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr<doris::MemTracker>& mem_tracker)
             : SwitchThreadMemTracker<false>(mem_tracker, false) {}
 
     ~SwitchThreadMemTrackerEndClear() {
-        thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
+        tls_ctx()->_thread_mem_tracker_mgr->clear_untracked_mems();
     }
 };
 
@@ -312,19 +284,22 @@ class SwitchThreadMemTrackerErrCallBack {
 public:
     explicit SwitchThreadMemTrackerErrCallBack(const std::string& action_type,
                                                bool cancel_work = true,
-                                               ERRCALLBACK err_call_back_func = nullptr) {
-        DCHECK(action_type != std::string());
-        _old_tracker_cb = thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(
-                action_type, cancel_work, err_call_back_func);
-    }
+                                               ERRCALLBACK err_call_back_func = nullptr);
 
-    ~SwitchThreadMemTrackerErrCallBack() {
-        thread_local_ctx.get()->_thread_mem_tracker_mgr->update_consume_err_cb(_old_tracker_cb);
-        DorisMetrics::instance()->switch_thread_mem_tracker_err_cb_count->increment(1);
-    }
+    ~SwitchThreadMemTrackerErrCallBack();
 
 private:
     ConsumeErrCallBackInfo _old_tracker_cb;
 };
 
+class SwitchBthread {
+public:
+    explicit SwitchBthread();
+
+    ~SwitchBthread();
+
+private:
+    ThreadContext* tls;
+};
+
 } // namespace doris
diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp
index 06fd521faf..e55a4620f0 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/thread_mem_tracker_mgr.cpp
@@ -17,6 +17,8 @@
 
 #include "runtime/thread_mem_tracker_mgr.h"
 
+#include "runtime/exec_env.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/mem_tracker_task_pool.h"
 #include "service/backend_options.h"
 
@@ -25,6 +27,7 @@ namespace doris {
 void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std::string& task_id,
                                       const TUniqueId& fragment_instance_id,
                                       const std::shared_ptr<MemTracker>& mem_tracker) {
+    DCHECK(switch_count == 0) << print_debug_string();
     _task_id = task_id;
     _fragment_instance_id = fragment_instance_id;
     _consume_err_cb.cancel_msg = cancel_msg;
@@ -44,26 +47,15 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std::
 }
 
 void ThreadMemTrackerMgr::detach_task() {
+    DCHECK(switch_count == 0) << print_debug_string();
     _task_id = "";
     _fragment_instance_id = TUniqueId();
     _consume_err_cb.init();
     clear_untracked_mems();
-    _tracker_id = 0;
-    // The following memory changes for the two map operations of _untracked_mems and _mem_trackers
-    // will be re-recorded in _untracked_mem.
-    _untracked_mems.clear();
-    _untracked_mems[0] = 0;
-    _mem_trackers.clear();
-    _mem_trackers[0] = MemTracker::get_process_tracker();
-    _mem_tracker_labels.clear();
-    _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
+    init();
 }
 
 void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) {
-    _temp_task_mem_tracker =
-            ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(
-                    _task_id);
-    DCHECK(_temp_task_mem_tracker);
     if (_fragment_instance_id != TUniqueId()) {
         ExecEnv::GetInstance()->fragment_mgr()->cancel(
                 _fragment_instance_id, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index 4ca2adba3e..404837a73a 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -20,8 +20,6 @@
 #include <fmt/format.h>
 #include <parallel_hashmap/phmap.h>
 
-#include "runtime/exec_env.h"
-#include "runtime/fragment_mgr.h"
 #include "runtime/mem_tracker.h"
 
 namespace doris {
@@ -61,33 +59,20 @@ inline thread_local bool start_thread_mem_tracker = false;
 // need to manually call cosume after stop_mem_tracker, and then start_mem_tracker.
 class ThreadMemTrackerMgr {
 public:
-    ThreadMemTrackerMgr() {
-        _mem_trackers[0] = MemTracker::get_process_tracker();
-        _untracked_mems[0] = 0;
-        _tracker_id = 0;
-        _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
-        start_thread_mem_tracker = true;
-    }
+    ThreadMemTrackerMgr() {}
+
     ~ThreadMemTrackerMgr() {
         clear_untracked_mems();
         start_thread_mem_tracker = false;
     }
 
-    void clear_untracked_mems() {
-        for (const auto& untracked_mem : _untracked_mems) {
-            if (untracked_mem.second != 0) {
-                DCHECK(_mem_trackers[untracked_mem.first])
-                        << ", label: " << _mem_tracker_labels[untracked_mem.first];
-                if (_mem_trackers[untracked_mem.first]) {
-                    _mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
-                } else {
-                    MemTracker::get_process_tracker()->consume(untracked_mem.second);
-                }
-            }
-        }
-        mem_tracker()->consume(_untracked_mem);
-        _untracked_mem = 0;
-    }
+    // After thread initialization, calling `init` again must call `clear_untracked_mems` first
+    // to avoid memory tracking loss.
+    void init();
+
+    void init_bthread();
+
+    void clear_untracked_mems();
 
     // After attach, the current thread TCMalloc Hook starts to consume/release task mem_tracker
     void attach_task(const std::string& cancel_msg, const std::string& task_id,
@@ -96,21 +81,18 @@ public:
 
     void detach_task();
 
-    // Must be fast enough!
-    // Thread update_tracker may be called very frequently, adding a memory copy will be slow.
+    // Must be fast enough! Thread update_tracker may be called very frequently.
+    // So for performance, add tracker as early as possible, and then call update_tracker<Existed>.
     template <bool Existed>
     int64_t update_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
     void update_tracker_id(int64_t tracker_id);
 
-    void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
-        _mem_trackers[mem_tracker->id()] = mem_tracker;
-        DCHECK(_mem_trackers[mem_tracker->id()]);
-        _untracked_mems[mem_tracker->id()] = 0;
-        _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
-    }
+    // Before switching the same tracker multiple times, add tracker as early as possible,
+    // update_tracker<true> can reduce one map find.
+    void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
 
-    ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg,
-                                                        bool cancel_task, ERRCALLBACK cb_func) {
+    ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg, bool cancel_task,
+                                                 ERRCALLBACK cb_func) {
         _temp_consume_err_cb = _consume_err_cb;
         _consume_err_cb.cancel_msg = cancel_msg;
         _consume_err_cb.cancel_task = cancel_task;
@@ -127,17 +109,34 @@ public:
     // must increase the control to avoid entering infinite recursion, otherwise it may cause crash or stuck,
     void cache_consume(int64_t size);
 
-    void noncache_consume();
+    void noncache_consume(int64_t size);
 
     bool is_attach_task() { return _task_id != ""; }
 
-    std::shared_ptr<MemTracker> mem_tracker() {
-        DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
-        if (_mem_trackers[_tracker_id]) {
-            return _mem_trackers[_tracker_id];
-        } else {
-            return MemTracker::get_process_tracker();
+    std::shared_ptr<MemTracker> mem_tracker();
+
+    int64_t switch_count = 0;
+
+    std::string print_debug_string() {
+        fmt::memory_buffer mem_trackers_buf;
+        for (const auto& [key, value] : _mem_trackers) {
+            fmt::format_to(mem_trackers_buf, "{}_{},", std::to_string(key), value->log_usage(1));
         }
+        fmt::memory_buffer untracked_mems_buf;
+        for (const auto& [key, value] : _untracked_mems) {
+            fmt::format_to(untracked_mems_buf, "{}_{},", std::to_string(key),
+                           std::to_string(value));
+        }
+        fmt::memory_buffer mem_tracker_labels_buf;
+        for (const auto& [key, value] : _mem_tracker_labels) {
+            fmt::format_to(mem_tracker_labels_buf, "{}_{},", std::to_string(key), value);
+        }
+        return fmt::format(
+                "ThreadMemTrackerMgr debug string, _tracker_id:{}, _untracked_mem:{}, _task_id:{}, "
+                "_mem_trackers:<{}>, _untracked_mems:<{}>, _mem_tracker_labels:<{}>",
+                std::to_string(_tracker_id), std::to_string(_untracked_mem), _task_id,
+                fmt::to_string(mem_trackers_buf), fmt::to_string(untracked_mems_buf),
+                fmt::to_string(mem_tracker_labels_buf));
     }
 
 private:
@@ -175,39 +174,71 @@ private:
     ConsumeErrCallBackInfo _consume_err_cb;
 };
 
+inline void ThreadMemTrackerMgr::init() {
+    _tracker_id = 0;
+    _mem_trackers.clear();
+    _mem_trackers[0] = MemTracker::get_process_tracker();
+    _untracked_mems.clear();
+    _untracked_mems[0] = 0;
+    _mem_tracker_labels.clear();
+    _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
+}
+
+inline void ThreadMemTrackerMgr::init_bthread() {
+    init();
+    _mem_trackers[1] = MemTracker::get_brpc_server_tracker();
+    _untracked_mems[1] = 0;
+    _mem_tracker_labels[1] = MemTracker::get_brpc_server_tracker()->label();
+    _tracker_id = 1;
+}
+
+inline void ThreadMemTrackerMgr::clear_untracked_mems() {
+    for (const auto& untracked_mem : _untracked_mems) {
+        if (untracked_mem.second != 0) {
+            DCHECK(_mem_trackers[untracked_mem.first]) << print_debug_string();
+            _mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
+        }
+    }
+    mem_tracker()->consume(_untracked_mem);
+    _untracked_mem = 0;
+}
+
 template <bool Existed>
 inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
-    DCHECK(mem_tracker);
+    DCHECK(mem_tracker) << print_debug_string();
     _temp_tracker_id = mem_tracker->id();
     if (_temp_tracker_id == _tracker_id) {
         return _tracker_id;
     }
     if (Existed) {
-        DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end());
+        DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end()) << print_debug_string();
     } else {
+        // If the tracker has already been added, avoid `_untracked_mems[x] = 0;` again causing the memory track to be lost.
         if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) {
             _mem_trackers[_temp_tracker_id] = mem_tracker;
-            DCHECK(_mem_trackers[_temp_tracker_id]);
+            DCHECK(_mem_trackers[_temp_tracker_id]) << print_debug_string();
             _untracked_mems[_temp_tracker_id] = 0;
             _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
         }
     }
 
+    DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << print_debug_string();
+    DCHECK(_mem_trackers[_tracker_id]) << print_debug_string();
     _untracked_mems[_tracker_id] += _untracked_mem;
     _untracked_mem = 0;
     std::swap(_tracker_id, _temp_tracker_id);
-    DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
+    DCHECK(_mem_trackers[_tracker_id]) << print_debug_string();
     return _temp_tracker_id; // old tracker_id
 }
 
 inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) {
+    DCHECK(switch_count >= 0) << print_debug_string();
     if (tracker_id != _tracker_id) {
         _untracked_mems[_tracker_id] += _untracked_mem;
         _untracked_mem = 0;
         _tracker_id = tracker_id;
-        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end())
-                << ", label: " << _mem_tracker_labels[_tracker_id];
-        DCHECK(_mem_trackers[_tracker_id]);
+        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string();
+        DCHECK(_mem_trackers[_tracker_id]) << print_debug_string();
     }
 }
 
@@ -218,7 +249,7 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
     // it will cause tracker->consumption to be temporarily less than 0.
     if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
         _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
-        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
+        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string();
         // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again, infinite recursion.
         // Needs to ensure that all memory allocated in mem_tracker.consume/try_consume is freed in time to avoid tracking misses.
         start_thread_mem_tracker = false;
@@ -227,21 +258,36 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
             _untracked_mem += _untracked_mems[_tracker_id];
             _untracked_mems[_tracker_id] = 0;
         }
-        noncache_consume();
+        noncache_consume(_untracked_mem);
+        _untracked_mem = 0;
         start_thread_mem_tracker = true;
     }
 }
 
-inline void ThreadMemTrackerMgr::noncache_consume() {
-    DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
-    Status st = mem_tracker()->try_consume(_untracked_mem);
+inline void ThreadMemTrackerMgr::noncache_consume(int64_t size) {
+    Status st = mem_tracker()->try_consume(size);
     if (!st) {
         // The memory has been allocated, so when TryConsume fails, need to continue to complete
         // the consume to ensure the accuracy of the statistics.
-        mem_tracker()->consume(_untracked_mem);
-        exceeded(_untracked_mem, st);
+        mem_tracker()->consume(size);
+        exceeded(size, st);
     }
-    _untracked_mem = 0;
+}
+
+inline void ThreadMemTrackerMgr::add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
+    DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end()) << print_debug_string();
+    _mem_trackers[mem_tracker->id()] = mem_tracker;
+    DCHECK(_mem_trackers[mem_tracker->id()]) << print_debug_string();
+    _untracked_mems[mem_tracker->id()] = 0;
+    _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
+}
+
+inline std::shared_ptr<MemTracker> ThreadMemTrackerMgr::mem_tracker() {
+    // Whether the key _tracker_id exists in _mem_trackers.
+    DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end()) << print_debug_string();
+    // If the key _tracker_id exists in _mem_trackers, check whether the value is null.
+    DCHECK(_mem_trackers[_tracker_id]) << print_debug_string();
+    return _mem_trackers[_tracker_id];
 }
 
 } // namespace doris
diff --git a/be/src/service/brpc.h b/be/src/service/brpc.h
index 031a9d6697..6e1b348ac7 100644
--- a/be/src/service/brpc.h
+++ b/be/src/service/brpc.h
@@ -17,33 +17,10 @@
 
 #pragma once
 
-// This file is used to fixed macro conflict between butil and gutil
 // all header need by brpc is contain in this file.
-// include this file instead of include <brpc/xxx.h>
-// and this file must put the first include in source file
+// include this file instead of include <brpc/xxx.h>.
 
-#include "gutil/macros.h"
-// Macros in the guti/macros.h, use butil's define
-#ifdef DISALLOW_IMPLICIT_CONSTRUCTORS
-#undef DISALLOW_IMPLICIT_CONSTRUCTORS
-#endif
-
-#ifdef arraysize
-#undef arraysize
-#endif
-
-#undef OVERRIDE
-#undef FINAL
-
-// use be/src/gutil/integral_types.h override butil/basictypes.h
-#include "gutil/integral_types.h"
-#ifdef BASE_INTEGRAL_TYPES_H_
-#define BUTIL_BASICTYPES_H_
-#endif
-
-#ifdef DEBUG_MODE
-#undef DEBUG_MODE
-#endif
+#include <service/brpc_conflict.h>
 
 #include <brpc/channel.h>
 #include <brpc/closure_guard.h>
@@ -51,6 +28,8 @@
 #include <brpc/protocol.h>
 #include <brpc/reloadable_flags.h>
 #include <brpc/server.h>
+#include <bthread/bthread.h>
+#include <bthread/types.h>
 #include <butil/containers/flat_map.h>
 #include <butil/containers/flat_map_inl.h>
 #include <butil/endpoint.h>
diff --git a/be/src/service/brpc.h b/be/src/service/brpc_conflict.h
similarity index 75%
copy from be/src/service/brpc.h
copy to be/src/service/brpc_conflict.h
index 031a9d6697..35ef1b815c 100644
--- a/be/src/service/brpc.h
+++ b/be/src/service/brpc_conflict.h
@@ -18,8 +18,6 @@
 #pragma once
 
 // This file is used to fixed macro conflict between butil and gutil
-// all header need by brpc is contain in this file.
-// include this file instead of include <brpc/xxx.h>
 // and this file must put the first include in source file
 
 #include "gutil/macros.h"
@@ -32,6 +30,10 @@
 #undef arraysize
 #endif
 
+#ifdef ARRAY_SIZE
+#undef ARRAY_SIZE
+#endif
+
 #undef OVERRIDE
 #undef FINAL
 
@@ -44,15 +46,3 @@
 #ifdef DEBUG_MODE
 #undef DEBUG_MODE
 #endif
-
-#include <brpc/channel.h>
-#include <brpc/closure_guard.h>
-#include <brpc/controller.h>
-#include <brpc/protocol.h>
-#include <brpc/reloadable_flags.h>
-#include <brpc/server.h>
-#include <butil/containers/flat_map.h>
-#include <butil/containers/flat_map_inl.h>
-#include <butil/endpoint.h>
-#include <butil/fd_utility.h>
-#include <butil/macros.h>
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 4cb6b8f7ee..5080b1c218 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -29,6 +29,7 @@
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/routine_load/routine_load_task_executor.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "service/brpc.h"
 #include "util/brpc_client_cache.h"
 #include "util/md5.h"
@@ -42,16 +43,24 @@ namespace doris {
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(add_batch_task_queue_size, MetricUnit::NOUNIT);
 
+bthread_key_t btls_key;
+
+static void thread_context_deleter(void* d) {
+    delete static_cast<ThreadContext*>(d);
+}
+
 template <typename T>
 PInternalServiceImpl<T>::PInternalServiceImpl(ExecEnv* exec_env)
         : _exec_env(exec_env), _tablet_worker_pool(config::number_tablet_writer_threads, 10240) {
     REGISTER_HOOK_METRIC(add_batch_task_queue_size,
                          [this]() { return _tablet_worker_pool.get_queue_size(); });
+    CHECK_EQ(0, bthread_key_create(&btls_key, thread_context_deleter));
 }
 
 template <typename T>
 PInternalServiceImpl<T>::~PInternalServiceImpl() {
     DEREGISTER_HOOK_METRIC(add_batch_task_queue_size);
+    CHECK_EQ(0, bthread_key_delete(btls_key));
 }
 
 template <typename T>
@@ -59,6 +68,7 @@ void PInternalServiceImpl<T>::transmit_data(google::protobuf::RpcController* cnt
                                             const PTransmitDataParams* request,
                                             PTransmitDataResult* response,
                                             google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
              << " node=" << request->node_id();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -84,6 +94,7 @@ void PInternalServiceImpl<T>::tablet_writer_open(google::protobuf::RpcController
                                                  const PTabletWriterOpenRequest* request,
                                                  PTabletWriterOpenResult* response,
                                                  google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer open, id=" << request->id() << ", index_id=" << request->index_id()
              << ", txn_id=" << request->txn_id();
     brpc::ClosureGuard closure_guard(done);
@@ -101,6 +112,7 @@ void PInternalServiceImpl<T>::exec_plan_fragment(google::protobuf::RpcController
                                                  const PExecPlanFragmentRequest* request,
                                                  PExecPlanFragmentResult* response,
                                                  google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto st = Status::OK();
     bool compact = request->has_compact() ? request->compact() : false;
@@ -116,6 +128,7 @@ void PInternalServiceImpl<T>::tablet_writer_add_batch(google::protobuf::RpcContr
                                                       const PTabletWriterAddBatchRequest* request,
                                                       PTabletWriterAddBatchResult* response,
                                                       google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer add batch, id=" << request->id()
              << ", index_id=" << request->index_id() << ", sender_id=" << request->sender_id()
              << ", current_queued_size=" << _tablet_worker_pool.get_queue_size();
@@ -150,6 +163,7 @@ void PInternalServiceImpl<T>::tablet_writer_cancel(google::protobuf::RpcControll
                                                    const PTabletWriterCancelRequest* request,
                                                    PTabletWriterCancelResult* response,
                                                    google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_RPC << "tablet writer cancel, id=" << request->id() << ", index_id=" << request->index_id()
              << ", sender_id=" << request->sender_id();
     brpc::ClosureGuard closure_guard(done);
@@ -177,6 +191,7 @@ void PInternalServiceImpl<T>::cancel_plan_fragment(google::protobuf::RpcControll
                                                    const PCancelPlanFragmentRequest* request,
                                                    PCancelPlanFragmentResult* result,
                                                    google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId tid;
     tid.__set_hi(request->finst_id().hi());
@@ -201,6 +216,7 @@ template <typename T>
 void PInternalServiceImpl<T>::fetch_data(google::protobuf::RpcController* cntl_base,
                                          const PFetchDataRequest* request, PFetchDataResult* result,
                                          google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     GetResultBatchCtx* ctx = new GetResultBatchCtx(cntl, result, done);
     _exec_env->result_mgr()->fetch_data(request->finst_id(), ctx);
@@ -210,6 +226,7 @@ template <typename T>
 void PInternalServiceImpl<T>::get_info(google::protobuf::RpcController* controller,
                                        const PProxyRequest* request, PProxyResult* response,
                                        google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     // PProxyRequest is defined in gensrc/proto/internal_service.proto
     // Currently it supports 2 kinds of requests:
@@ -272,6 +289,7 @@ void PInternalServiceImpl<T>::update_cache(google::protobuf::RpcController* cont
                                            const PUpdateCacheRequest* request,
                                            PCacheResponse* response,
                                            google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->update(request, response);
 }
@@ -281,6 +299,7 @@ void PInternalServiceImpl<T>::fetch_cache(google::protobuf::RpcController* contr
                                           const PFetchCacheRequest* request,
                                           PFetchCacheResult* result,
                                           google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->fetch(request, result);
 }
@@ -290,6 +309,7 @@ void PInternalServiceImpl<T>::clear_cache(google::protobuf::RpcController* contr
                                           const PClearCacheRequest* request,
                                           PCacheResponse* response,
                                           google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     _exec_env->result_cache()->clear(request, response);
 }
@@ -299,6 +319,7 @@ void PInternalServiceImpl<T>::merge_filter(::google::protobuf::RpcController* co
                                            const ::doris::PMergeFilterRequest* request,
                                            ::doris::PMergeFilterResponse* response,
                                            ::google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto buf = static_cast<brpc::Controller*>(controller)->request_attachment();
     Status st = _exec_env->fragment_mgr()->merge_filter(request, buf.to_string().data());
@@ -313,6 +334,7 @@ void PInternalServiceImpl<T>::apply_filter(::google::protobuf::RpcController* co
                                            const ::doris::PPublishFilterRequest* request,
                                            ::doris::PPublishFilterResponse* response,
                                            ::google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     auto attachment = static_cast<brpc::Controller*>(controller)->request_attachment();
     UniqueId unique_id(request->query_id());
@@ -329,6 +351,7 @@ template <typename T>
 void PInternalServiceImpl<T>::send_data(google::protobuf::RpcController* controller,
                                         const PSendDataRequest* request, PSendDataResult* response,
                                         google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -352,6 +375,7 @@ template <typename T>
 void PInternalServiceImpl<T>::commit(google::protobuf::RpcController* controller,
                                      const PCommitRequest* request, PCommitResult* response,
                                      google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -370,6 +394,7 @@ template <typename T>
 void PInternalServiceImpl<T>::rollback(google::protobuf::RpcController* controller,
                                        const PRollbackRequest* request, PRollbackResult* response,
                                        google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     TUniqueId fragment_instance_id;
     fragment_instance_id.hi = request->fragment_instance_id().hi();
@@ -389,6 +414,7 @@ void PInternalServiceImpl<T>::fold_constant_expr(google::protobuf::RpcController
                                                  const PConstantExprRequest* request,
                                                  PConstantExprResult* response,
                                                  google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
 
@@ -425,6 +451,7 @@ void PInternalServiceImpl<T>::transmit_block(google::protobuf::RpcController* cn
                                              const PTransmitDataParams* request,
                                              PTransmitDataResult* response,
                                              google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     VLOG_ROW << "transmit data: fragment_instance_id=" << print_id(request->finst_id())
              << " node=" << request->node_id();
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
@@ -450,6 +477,7 @@ void PInternalServiceImpl<T>::check_rpc_channel(google::protobuf::RpcController*
                                                 const PCheckRPCChannelRequest* request,
                                                 PCheckRPCChannelResponse* response,
                                                 google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(0);
     if (request->data().size() != request->size()) {
@@ -477,6 +505,7 @@ void PInternalServiceImpl<T>::reset_rpc_channel(google::protobuf::RpcController*
                                                 const PResetRPCChannelRequest* request,
                                                 PResetRPCChannelResponse* response,
                                                 google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     response->mutable_status()->set_status_code(0);
     if (request->all()) {
@@ -511,6 +540,7 @@ void PInternalServiceImpl<T>::hand_shake(google::protobuf::RpcController* cntl_b
                                          const PHandShakeRequest* request,
                                          PHandShakeResponse* response,
                                          google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     brpc::ClosureGuard closure_guard(done);
     if (request->has_hello()) {
         response->set_hello(request->hello());
diff --git a/be/src/util/bit_util.h b/be/src/util/bit_util.h
index 7526a7728a..dabf87ee0f 100644
--- a/be/src/util/bit_util.h
+++ b/be/src/util/bit_util.h
@@ -25,7 +25,6 @@
 
 #include "common/compiler_util.h"
 #include "gutil/bits.h"
-#include "gutil/port.h"
 #include "util/cpu_info.h"
 #ifdef __aarch64__
 #include "sse2neon.h"
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 0163815c76..8bd9b05847 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -137,6 +137,7 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MIC
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_count, MetricUnit::NOUNIT);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_thread_mem_tracker_err_cb_count, MetricUnit::NOUNIT);
+DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_bthread_count, MetricUnit::NOUNIT);
 
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES);
 DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num, MetricUnit::NOUNIT);
@@ -286,6 +287,7 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, attach_task_thread_count);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_thread_mem_tracker_count);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_thread_mem_tracker_err_cb_count);
+    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_bthread_count);
 
     _server_metric_entity->register_hook(_s_hook_name, std::bind(&DorisMetrics::_update, this));
 
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index aa59d1770b..602eb78a7e 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -130,6 +130,8 @@ public:
     IntCounter* attach_task_thread_count;
     IntCounter* switch_thread_mem_tracker_count;
     IntCounter* switch_thread_mem_tracker_err_cb_count;
+    // brpc server response count
+    IntCounter* switch_bthread_count;
 
     IntGauge* memory_pool_bytes_total;
     IntGauge* process_thread_num;
diff --git a/be/src/util/file_utils.cpp b/be/src/util/file_utils.cpp
index 3fd4a3bba6..a971acce86 100644
--- a/be/src/util/file_utils.cpp
+++ b/be/src/util/file_utils.cpp
@@ -34,6 +34,7 @@
 #include "gutil/strings/strip.h"
 #include "gutil/strings/substitute.h"
 #include "olap/file_helper.h"
+#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 
 namespace doris {
@@ -196,11 +197,13 @@ Status FileUtils::md5sum(const std::string& file, std::string* md5sum) {
         return Status::InternalError("failed to stat file");
     }
     size_t file_len = statbuf.st_size;
+    CONSUME_THREAD_LOCAL_MEM_TRACKER(file_len);
     void* buf = mmap(0, file_len, PROT_READ, MAP_SHARED, fd, 0);
 
     unsigned char result[MD5_DIGEST_LENGTH];
     MD5((unsigned char*)buf, file_len, result);
     munmap(buf, file_len);
+    RELEASE_THREAD_LOCAL_MEM_TRACKER(file_len);
 
     std::stringstream ss;
     for (int32_t i = 0; i < MD5_DIGEST_LENGTH; i++) {
diff --git a/be/src/vec/common/allocator.h b/be/src/vec/common/allocator.h
index 2a50dabf5c..216864dbb8 100644
--- a/be/src/vec/common/allocator.h
+++ b/be/src/vec/common/allocator.h
@@ -29,6 +29,7 @@
 #include <exception>
 
 #include "common/status.h"
+#include "runtime/thread_context.h"
 
 #ifdef NDEBUG
 #define ALLOCATOR_ASLR 0
@@ -137,15 +138,18 @@ public:
         } else if (old_size >= MMAP_THRESHOLD && new_size >= MMAP_THRESHOLD) {
             /// Resize mmap'd memory region.
             // CurrentMemoryTracker::realloc(old_size, new_size);
+            CONSUME_THREAD_LOCAL_MEM_TRACKER(new_size - old_size);
 
             // On apple and freebsd self-implemented mremap used (common/mremap.h)
             buf = clickhouse_mremap(buf, old_size, new_size, MREMAP_MAYMOVE, PROT_READ | PROT_WRITE,
                                     mmap_flags, -1, 0);
-            if (MAP_FAILED == buf)
+            if (MAP_FAILED == buf){
+                RELEASE_THREAD_LOCAL_MEM_TRACKER(new_size - old_size);
                 doris::vectorized::throwFromErrno("Allocator: Cannot mremap memory chunk from " +
                                                           std::to_string(old_size) + " to " +
                                                           std::to_string(new_size) + ".",
                                                   doris::TStatusCode::VEC_CANNOT_MREMAP);
+            }
 
             /// No need for zero-fill, because mmap guarantees it.
         } else if (new_size < MMAP_THRESHOLD) {
@@ -197,10 +201,13 @@ private:
                                 alignment, size),
                         doris::TStatusCode::VEC_BAD_ARGUMENTS);
 
+            CONSUME_THREAD_LOCAL_MEM_TRACKER(size);
             buf = mmap(get_mmap_hint(), size, PROT_READ | PROT_WRITE, mmap_flags, -1, 0);
-            if (MAP_FAILED == buf)
+            if (MAP_FAILED == buf) {
+                RELEASE_THREAD_LOCAL_MEM_TRACKER(size);
                 doris::vectorized::throwFromErrno(fmt::format("Allocator: Cannot mmap {}.", size),
                                                   doris::TStatusCode::VEC_CANNOT_ALLOCATE_MEMORY);
+            }
 
             /// No need for zero-fill, because mmap guarantees it.
         } else {
@@ -231,9 +238,12 @@ private:
 
     void free_no_track(void* buf, size_t size) {
         if (size >= MMAP_THRESHOLD) {
-            if (0 != munmap(buf, size))
+            if (0 != munmap(buf, size)) {
                 doris::vectorized::throwFromErrno(fmt::format("Allocator: Cannot munmap {}.", size),
                                                   doris::TStatusCode::VEC_CANNOT_MUNMAP);
+            } else {
+                RELEASE_THREAD_LOCAL_MEM_TRACKER(size);
+            }
         } else {
             ::free(buf);
         }
diff --git a/be/src/vec/exec/vexchange_node.cpp b/be/src/vec/exec/vexchange_node.cpp
index ea4e61e7fe..57cbcff921 100644
--- a/be/src/vec/exec/vexchange_node.cpp
+++ b/be/src/vec/exec/vexchange_node.cpp
@@ -66,6 +66,7 @@ Status VExchangeNode::prepare(RuntimeState* state) {
 Status VExchangeNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
 
     if (_is_merging) {
@@ -84,7 +85,6 @@ Status VExchangeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
 Status VExchangeNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     SCOPED_TIMER(runtime_profile()->total_time_counter());
     SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
-    ADD_THREAD_LOCAL_MEM_TRACKER(_stream_recvr->mem_tracker());
     auto status = _stream_recvr->get_next(block, eos);
     if (block != nullptr) {
         if (_num_rows_returned + block->rows() < _limit) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org