You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/04/08 01:02:32 UTC

[incubator-doris] branch master updated: [feature-wip] (memory tracker) (step4) Switch TLS mem tracker to separate more detailed memory usage (#8669)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 519305cb22 [feature-wip] (memory tracker) (step4) Switch TLS mem tracker to separate more detailed memory usage (#8669)
519305cb22 is described below

commit 519305cb229c0fa2abbddd4dd814e26dffe6e029
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Apr 8 09:02:26 2022 +0800

    [feature-wip] (memory tracker) (step4) Switch TLS mem tracker to separate more detailed memory usage (#8669)
    
    Based on #8605, Separate out the memory usage of each operator from the Query/Load/StorageEngine mem tracker.
---
 be/src/common/config.h                        |  4 ++
 be/src/exec/analytic_eval_node.cpp            |  3 +
 be/src/exec/assert_num_rows_node.cpp          |  2 +
 be/src/exec/base_scanner.cpp                  | 19 +++---
 be/src/exec/blocking_join_node.cpp            |  5 +-
 be/src/exec/broker_scan_node.cpp              |  4 +-
 be/src/exec/cross_join_node.cpp               |  3 +-
 be/src/exec/csv_scan_node.cpp                 |  5 +-
 be/src/exec/es_http_scan_node.cpp             |  4 +-
 be/src/exec/es_scan_node.cpp                  |  3 +
 be/src/exec/except_node.cpp                   |  5 +-
 be/src/exec/exchange_node.cpp                 |  3 +
 be/src/exec/exec_node.cpp                     |  1 -
 be/src/exec/exec_node.h                       |  2 +-
 be/src/exec/hash_join_node.cpp                |  4 +-
 be/src/exec/intersect_node.cpp                |  5 +-
 be/src/exec/merge_join_node.cpp               |  3 +
 be/src/exec/merge_node.cpp                    |  3 +
 be/src/exec/mysql_scan_node.cpp               |  5 +-
 be/src/exec/odbc_scan_node.cpp                |  5 +-
 be/src/exec/olap_scan_node.cpp                |  5 +-
 be/src/exec/olap_scanner.cpp                  |  3 +
 be/src/exec/olap_scanner.h                    |  2 +
 be/src/exec/partitioned_aggregation_node.cc   |  3 +
 be/src/exec/repeat_node.cpp                   |  3 +
 be/src/exec/schema_scan_node.cpp              |  3 +
 be/src/exec/select_node.cpp                   |  3 +
 be/src/exec/set_operation_node.cpp            |  7 +-
 be/src/exec/spill_sort_node.cc                |  3 +
 be/src/exec/table_function_node.cpp           |  3 +
 be/src/exec/tablet_sink.cpp                   | 15 +++++
 be/src/exec/topn_node.cpp                     |  3 +
 be/src/exec/union_node.cpp                    |  3 +
 be/src/exprs/expr_context.cpp                 |  6 ++
 be/src/olap/base_compaction.cpp               |  1 -
 be/src/olap/delta_writer.cpp                  |  8 +--
 be/src/olap/lru_cache.cpp                     |  9 +--
 be/src/olap/lru_cache.h                       |  2 +-
 be/src/olap/memtable.cpp                      |  5 +-
 be/src/olap/memtable.h                        |  3 +-
 be/src/olap/memtable_flush_executor.cpp       |  2 +
 be/src/olap/page_cache.cpp                    |  2 +
 be/src/olap/schema_change.cpp                 | 21 +-----
 be/src/olap/schema_change.h                   |  2 -
 be/src/olap/snapshot_manager.cpp              |  4 ++
 be/src/olap/snapshot_manager.h                |  1 -
 be/src/olap/storage_engine.cpp                |  5 ++
 be/src/olap/storage_engine.h                  |  1 +
 be/src/olap/task/engine_alter_tablet_task.cpp |  2 +
 be/src/olap/task/engine_batch_load_task.cpp   |  2 +
 be/src/olap/task/engine_checksum_task.cpp     |  2 +
 be/src/olap/task/engine_clone_task.cpp        |  2 +
 be/src/runtime/data_stream_recvr.cc           |  7 ++
 be/src/runtime/data_stream_sender.cpp         |  7 +-
 be/src/runtime/disk_io_mgr.cc                 |  9 +++
 be/src/runtime/fold_constant_executor.cpp     |  2 +
 be/src/runtime/load_channel.cpp               |  4 ++
 be/src/runtime/load_channel_mgr.cpp           |  5 ++
 be/src/runtime/mem_tracker.cpp                |  3 +-
 be/src/runtime/mem_tracker.h                  |  4 +-
 be/src/runtime/memory/chunk_allocator.cpp     |  5 +-
 be/src/runtime/runtime_filter_mgr.cpp         |  9 ++-
 be/src/runtime/tablets_channel.cpp            |  5 ++
 be/src/runtime/thread_context.h               | 78 +++++++++++++++++------
 be/src/runtime/thread_mem_tracker_mgr.cpp     | 15 +++--
 be/src/runtime/thread_mem_tracker_mgr.h       | 92 ++++++++++++++++++---------
 be/src/vec/exec/join/vhash_join_node.cpp      |  5 +-
 be/src/vec/exec/vaggregation_node.cpp         | 13 ++--
 be/src/vec/exec/vanalytic_eval_node.cpp       |  3 +
 be/src/vec/exec/vblocking_join_node.cpp       |  5 +-
 be/src/vec/exec/vcross_join_node.cpp          |  5 +-
 be/src/vec/exec/volap_scan_node.cpp           |  3 +-
 be/src/vec/exec/volap_scanner.cpp             |  1 +
 be/src/vec/exec/vset_operation_node.cpp       |  5 +-
 be/src/vec/exec/vsort_node.cpp                |  3 +
 be/src/vec/exprs/vexpr_context.cpp            |  4 ++
 be/src/vec/runtime/vdata_stream_recvr.cpp     |  6 ++
 be/src/vec/sink/vdata_stream_sender.cpp       |  4 ++
 be/src/vec/sink/vtablet_sink.cpp              |  2 +
 79 files changed, 378 insertions(+), 150 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 52f7510c4f..a93229cdf8 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -614,6 +614,10 @@ CONF_mInt32(remote_storage_read_buffer_mb, "16");
 // Whether Hook TCmalloc new/delete, currently consume/release tls mem tracker in Hook.
 CONF_Bool(track_new_delete, "true");
 
+// If true, switch TLS MemTracker to count more detailed memory,
+// including caches such as ExecNode operators and TabletManager.
+CONF_Bool(memory_verbose_track, "true");
+
 // Default level of MemTracker to show in web page
 // now MemTracker support two level:
 //      OVERVIEW: 0
diff --git a/be/src/exec/analytic_eval_node.cpp b/be/src/exec/analytic_eval_node.cpp
index 3741241bed..d7d8726267 100644
--- a/be/src/exec/analytic_eval_node.cpp
+++ b/be/src/exec/analytic_eval_node.cpp
@@ -140,6 +140,7 @@ Status AnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) {
 Status AnalyticEvalNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
     _child_tuple_desc = child(0)->row_desc().tuple_descriptors()[0];
     _curr_tuple_pool.reset(new MemPool(mem_tracker().get()));
@@ -184,6 +185,7 @@ Status AnalyticEvalNode::prepare(RuntimeState* state) {
 
 Status AnalyticEvalNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_CANCELLED(state);
     //RETURN_IF_ERROR(QueryMaintenance(state));
@@ -812,6 +814,7 @@ inline int64_t AnalyticEvalNode::num_output_rows_ready() const {
 
 Status AnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     //RETURN_IF_ERROR(QueryMaintenance(state));
diff --git a/be/src/exec/assert_num_rows_node.cpp b/be/src/exec/assert_num_rows_node.cpp
index 6c84dfc1f0..e101b245ea 100644
--- a/be/src/exec/assert_num_rows_node.cpp
+++ b/be/src/exec/assert_num_rows_node.cpp
@@ -49,6 +49,7 @@ Status AssertNumRowsNode::prepare(RuntimeState* state) {
 
 Status AssertNumRowsNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     // ISSUE-3435
     RETURN_IF_ERROR(child(0)->open(state));
@@ -58,6 +59,7 @@ Status AssertNumRowsNode::open(RuntimeState* state) {
 Status AssertNumRowsNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     output_batch->reset();
     child(0)->get_next(state, output_batch, eos);
     _num_rows_returned += output_batch->num_rows();
diff --git a/be/src/exec/base_scanner.cpp b/be/src/exec/base_scanner.cpp
index 92bb10886e..06331023e8 100644
--- a/be/src/exec/base_scanner.cpp
+++ b/be/src/exec/base_scanner.cpp
@@ -39,6 +39,15 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
           _counter(counter),
           _src_tuple(nullptr),
           _src_tuple_row(nullptr),
+#if BE_TEST
+          _mem_tracker(new MemTracker()),
+#else
+          _mem_tracker(MemTracker::create_tracker(
+                  -1, state->query_type() == TQueryType::LOAD
+                              ? "BaseScanner:" + std::to_string(state->load_job_id())
+                              : "BaseScanner:Select")),
+#endif
+          _mem_pool(std::make_unique<MemPool>(_mem_tracker.get())),
           _dest_tuple_desc(nullptr),
           _pre_filter_texprs(pre_filter_texprs),
           _strict_mode(false),
@@ -48,15 +57,7 @@ BaseScanner::BaseScanner(RuntimeState* state, RuntimeProfile* profile,
           _read_timer(nullptr),
           _materialize_timer(nullptr),
           _success(false),
-          _scanner_eof(false) {
-#ifndef BE_TEST
-    _mem_pool.reset(new MemPool(state->query_type() == TQueryType::LOAD
-                                        ? "BaseScanner:" + std::to_string(state->load_job_id())
-                                        : "BaseScanner:Select"));
-#else
-    _mem_pool.reset(new MemPool());
-#endif
-}
+          _scanner_eof(false) {}
 
 Status BaseScanner::open() {
     RETURN_IF_ERROR(init_expr_ctxes());
diff --git a/be/src/exec/blocking_join_node.cpp b/be/src/exec/blocking_join_node.cpp
index 57196713a2..432606c5c6 100644
--- a/be/src/exec/blocking_join_node.cpp
+++ b/be/src/exec/blocking_join_node.cpp
@@ -23,7 +23,6 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
@@ -46,6 +45,7 @@ BlockingJoinNode::~BlockingJoinNode() {
 Status BlockingJoinNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
 
     _build_pool.reset(new MemPool(mem_tracker().get()));
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
@@ -88,8 +88,9 @@ void BlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Statu
 }
 
 Status BlockingJoinNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(ExecNode::open(state));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    RETURN_IF_ERROR(ExecNode::open(state));
     // RETURN_IF_ERROR(Expr::open(_conjuncts, state));
 
     RETURN_IF_CANCELLED(state);
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 6e156f8d83..e16410e0f5 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -30,7 +30,6 @@
 #include "runtime/dpp_sink_internal.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
@@ -61,6 +60,7 @@ Status BrokerScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
 Status BrokerScanNode::prepare(RuntimeState* state) {
     VLOG_QUERY << "BrokerScanNode prepare";
     RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     // get tuple desc
     _runtime_state = state;
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
@@ -88,6 +88,7 @@ Status BrokerScanNode::prepare(RuntimeState* state) {
 
 Status BrokerScanNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_CANCELLED(state);
@@ -108,6 +109,7 @@ Status BrokerScanNode::start_scanners() {
 
 Status BrokerScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     // check if CANCELLED.
     if (state->is_cancelled()) {
         std::unique_lock<std::mutex> l(_batch_queue_lock);
diff --git a/be/src/exec/cross_join_node.cpp b/be/src/exec/cross_join_node.cpp
index 5def58a4cc..3ba307fd0c 100644
--- a/be/src/exec/cross_join_node.cpp
+++ b/be/src/exec/cross_join_node.cpp
@@ -23,7 +23,6 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
 
@@ -35,6 +34,7 @@ CrossJoinNode::CrossJoinNode(ObjectPool* pool, const TPlanNode& tnode, const Des
 Status CrossJoinNode::prepare(RuntimeState* state) {
     DCHECK(_join_op == TJoinOp::CROSS_JOIN);
     RETURN_IF_ERROR(BlockingJoinNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _build_batch_pool.reset(new ObjectPool());
     return Status::OK();
 }
@@ -89,6 +89,7 @@ Status CrossJoinNode::get_next(RuntimeState* state, RowBatch* output_batch, bool
     // TOOD(zhaochun)
     // RETURN_IF_ERROR(state->check_query_state());
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     if (reached_limit() || _eos) {
         *eos = true;
diff --git a/be/src/exec/csv_scan_node.cpp b/be/src/exec/csv_scan_node.cpp
index e262bda786..f8a1a80437 100644
--- a/be/src/exec/csv_scan_node.cpp
+++ b/be/src/exec/csv_scan_node.cpp
@@ -128,6 +128,7 @@ Status CsvScanNode::prepare(RuntimeState* state) {
     }
 
     RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
 
     // add timer
     _split_check_timer = ADD_TIMER(_runtime_profile, "split check timer");
@@ -210,6 +211,8 @@ Status CsvScanNode::prepare(RuntimeState* state) {
 }
 
 Status CsvScanNode::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     VLOG_CRITICAL << "CsvScanNode::Open";
 
@@ -225,7 +228,6 @@ Status CsvScanNode::open(RuntimeState* state) {
 
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_CANCELLED(state);
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(_csv_scanner->open());
 
     return Status::OK();
@@ -244,6 +246,7 @@ Status CsvScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     if (reached_limit()) {
         *eos = true;
diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index ba91ee02c8..be18743a46 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -30,7 +30,6 @@
 #include "runtime/dpp_sink_internal.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "service/backend_options.h"
 #include "util/runtime_profile.h"
 
@@ -68,6 +67,7 @@ Status EsHttpScanNode::init(const TPlanNode& tnode, RuntimeState* state) {
 Status EsHttpScanNode::prepare(RuntimeState* state) {
     VLOG_QUERY << "EsHttpScanNode prepare";
     RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
 
     _scanner_profile.reset(new RuntimeProfile("EsHttpScanNode"));
     runtime_profile()->add_child(_scanner_profile.get(), true, nullptr);
@@ -124,6 +124,7 @@ Status EsHttpScanNode::build_conjuncts_list() {
 
 Status EsHttpScanNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_CANCELLED(state);
@@ -199,6 +200,7 @@ Status EsHttpScanNode::collect_scanners_status() {
 
 Status EsHttpScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     if (state->is_cancelled()) {
         std::unique_lock<std::mutex> l(_batch_queue_lock);
         if (update_status(Status::Cancelled("Cancelled"))) {
diff --git a/be/src/exec/es_scan_node.cpp b/be/src/exec/es_scan_node.cpp
index 4ba08bf554..5f7ddb1b53 100644
--- a/be/src/exec/es_scan_node.cpp
+++ b/be/src/exec/es_scan_node.cpp
@@ -67,6 +67,7 @@ Status EsScanNode::prepare(RuntimeState* state) {
     VLOG_CRITICAL << "EsScanNode::Prepare";
 
     RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     if (_tuple_desc == nullptr) {
         std::stringstream ss;
@@ -85,6 +86,7 @@ Status EsScanNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
 
     // TExtOpenParams.row_schema
@@ -205,6 +207,7 @@ Status EsScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     // create tuple
     MemPool* tuple_pool = row_batch->tuple_data_pool();
diff --git a/be/src/exec/except_node.cpp b/be/src/exec/except_node.cpp
index ec3e451a50..893a8f8d71 100644
--- a/be/src/exec/except_node.cpp
+++ b/be/src/exec/except_node.cpp
@@ -21,7 +21,6 @@
 #include "exprs/expr.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 
 namespace doris {
 ExceptNode::ExceptNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -40,8 +39,9 @@ Status ExceptNode::init(const TPlanNode& tnode, RuntimeState* state) {
 }
 
 Status ExceptNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(SetOperationNode::open(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Except Node, while probing the hash table.");
+    RETURN_IF_ERROR(SetOperationNode::open(state));
     // if a table is empty, the result must be empty
     if (_hash_tbl->size() == 0) {
         _hash_tbl_iterator = _hash_tbl->begin();
@@ -88,6 +88,7 @@ Status ExceptNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eos)
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     *eos = true;
     if (reached_limit()) {
         return Status::OK();
diff --git a/be/src/exec/exchange_node.cpp b/be/src/exec/exchange_node.cpp
index 373cff3a9f..083e40518e 100644
--- a/be/src/exec/exchange_node.cpp
+++ b/be/src/exec/exchange_node.cpp
@@ -57,6 +57,7 @@ Status ExchangeNode::init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status ExchangeNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _convert_row_batch_timer = ADD_TIMER(runtime_profile(), "ConvertRowBatchTime");
     // TODO: figure out appropriate buffer size
     DCHECK_GT(_num_senders, 0);
@@ -75,6 +76,7 @@ Status ExchangeNode::prepare(RuntimeState* state) {
 
 Status ExchangeNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     if (_is_merging) {
         RETURN_IF_ERROR(_sort_exec_exprs.open(state));
@@ -129,6 +131,7 @@ Status ExchangeNode::fill_input_row_batch(RuntimeState* state) {
 Status ExchangeNode::get_next(RuntimeState* state, RowBatch* output_batch, bool* eos) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     if (reached_limit()) {
         _stream_recvr->transfer_all_resources(output_batch);
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index e3891b6139..2b7855e493 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -57,7 +57,6 @@
 #include "runtime/mem_tracker.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "util/debug_util.h"
 #include "util/runtime_profile.h"
 #include "vec/core/block.h"
diff --git a/be/src/exec/exec_node.h b/be/src/exec/exec_node.h
index 1644ba5165..665d1c9d6c 100644
--- a/be/src/exec/exec_node.h
+++ b/be/src/exec/exec_node.h
@@ -28,11 +28,11 @@
 #include "runtime/descriptors.h"
 #include "runtime/mem_pool.h"
 #include "runtime/query_statistics.h"
+#include "runtime/thread_context.h"
 #include "service/backend_options.h"
 #include "util/blocking_queue.hpp"
 #include "util/runtime_profile.h"
 #include "util/uid_util.h" // for print_id
-
 #include "vec/exprs/vexpr_context.h"
 
 namespace doris {
diff --git a/be/src/exec/hash_join_node.cpp b/be/src/exec/hash_join_node.cpp
index 30cd84441c..a4964f45ec 100644
--- a/be/src/exec/hash_join_node.cpp
+++ b/be/src/exec/hash_join_node.cpp
@@ -30,7 +30,6 @@
 #include "runtime/row_batch.h"
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "util/runtime_profile.h"
 
@@ -96,6 +95,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status HashJoinNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
 
     _build_pool.reset(new MemPool(mem_tracker().get()));
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
@@ -220,6 +220,7 @@ Status HashJoinNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(Expr::open(_build_expr_ctxs, state));
     RETURN_IF_ERROR(Expr::open(_probe_expr_ctxs, state));
@@ -306,6 +307,7 @@ Status HashJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* eo
     // it may cause the memory to exceed the limit.
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Hash join, while execute get_next.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     if (reached_limit()) {
         *eos = true;
diff --git a/be/src/exec/intersect_node.cpp b/be/src/exec/intersect_node.cpp
index df0e30de67..a12673a1b5 100644
--- a/be/src/exec/intersect_node.cpp
+++ b/be/src/exec/intersect_node.cpp
@@ -21,7 +21,6 @@
 #include "exprs/expr.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 
 namespace doris {
 IntersectNode::IntersectNode(ObjectPool* pool, const TPlanNode& tnode, const DescriptorTbl& descs)
@@ -44,8 +43,9 @@ Status IntersectNode::init(const TPlanNode& tnode, RuntimeState* state) {
 // 2 probe with child(1), then filter the hash table and find the matched item, use them to rebuild a hash table
 // repeat [2] this for all the rest child
 Status IntersectNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(SetOperationNode::open(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("Intersect Node, while probing the hash table.");
+    RETURN_IF_ERROR(SetOperationNode::open(state));
     // if a table is empty, the result must be empty
     if (_hash_tbl->size() == 0) {
         _hash_tbl_iterator = _hash_tbl->begin();
@@ -88,6 +88,7 @@ Status IntersectNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     *eos = true;
     if (reached_limit()) {
         return Status::OK();
diff --git a/be/src/exec/merge_join_node.cpp b/be/src/exec/merge_join_node.cpp
index d21a1e61e1..453963f601 100644
--- a/be/src/exec/merge_join_node.cpp
+++ b/be/src/exec/merge_join_node.cpp
@@ -71,6 +71,7 @@ Status MergeJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status MergeJoinNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
 
     // build and probe exprs are evaluated in the context of the rows produced by our
     // right and left children, respectively
@@ -148,6 +149,7 @@ Status MergeJoinNode::close(RuntimeState* state) {
 
 Status MergeJoinNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(Expr::open(_left_expr_ctxs, state));
@@ -171,6 +173,7 @@ Status MergeJoinNode::get_next(RuntimeState* state, RowBatch* out_batch, bool* e
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     if (reached_limit() || _eos) {
         *eos = true;
diff --git a/be/src/exec/merge_node.cpp b/be/src/exec/merge_node.cpp
index d92dde402f..816993aa1d 100644
--- a/be/src/exec/merge_node.cpp
+++ b/be/src/exec/merge_node.cpp
@@ -60,6 +60,7 @@ Status MergeNode::init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status MergeNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     DCHECK(_tuple_desc != nullptr);
 
@@ -90,6 +91,7 @@ Status MergeNode::prepare(RuntimeState* state) {
 }
 
 Status MergeNode::open(RuntimeState* state) {
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     // Prepare const expr lists.
     for (int i = 0; i < _const_result_expr_ctx_lists.size(); ++i) {
@@ -108,6 +110,7 @@ Status MergeNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     // Create new tuple buffer for row_batch.
     int tuple_buffer_size = row_batch->capacity() * _tuple_desc->byte_size();
     void* tuple_buffer = row_batch->tuple_data_pool()->allocate(tuple_buffer_size);
diff --git a/be/src/exec/mysql_scan_node.cpp b/be/src/exec/mysql_scan_node.cpp
index f83a66fdeb..d395f2bf70 100644
--- a/be/src/exec/mysql_scan_node.cpp
+++ b/be/src/exec/mysql_scan_node.cpp
@@ -53,6 +53,7 @@ Status MysqlScanNode::prepare(RuntimeState* state) {
     }
 
     RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     // get tuple desc
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
 
@@ -99,6 +100,8 @@ Status MysqlScanNode::prepare(RuntimeState* state) {
 }
 
 Status MysqlScanNode::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     VLOG_CRITICAL << "MysqlScanNode::Open";
 
@@ -112,7 +115,6 @@ Status MysqlScanNode::open(RuntimeState* state) {
 
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_CANCELLED(state);
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(_mysql_scanner->open());
     RETURN_IF_ERROR(_mysql_scanner->query(_table_name, _columns, _filters, _limit));
 
@@ -159,6 +161,7 @@ Status MysqlScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* e
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     // create new tuple buffer for row_batch
     int tuple_buffer_size = row_batch->capacity() * _tuple_desc->byte_size();
diff --git a/be/src/exec/odbc_scan_node.cpp b/be/src/exec/odbc_scan_node.cpp
index 17945bf0a2..c2783abeac 100644
--- a/be/src/exec/odbc_scan_node.cpp
+++ b/be/src/exec/odbc_scan_node.cpp
@@ -55,6 +55,7 @@ Status OdbcScanNode::prepare(RuntimeState* state) {
     }
 
     RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     // get tuple desc
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
 
@@ -92,6 +93,8 @@ Status OdbcScanNode::prepare(RuntimeState* state) {
 }
 
 Status OdbcScanNode::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     VLOG_CRITICAL << _scan_node_type << "::Open";
 
@@ -105,7 +108,6 @@ Status OdbcScanNode::open(RuntimeState* state) {
 
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_CANCELLED(state);
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(_odbc_scanner->open());
     RETURN_IF_ERROR(_odbc_scanner->query());
     // check materialize slot num
@@ -140,6 +142,7 @@ Status OdbcScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eo
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     if (reached_limit()) {
         *eos = true;
diff --git a/be/src/exec/olap_scan_node.cpp b/be/src/exec/olap_scan_node.cpp
index 69217280a9..c2d577b948 100644
--- a/be/src/exec/olap_scan_node.cpp
+++ b/be/src/exec/olap_scan_node.cpp
@@ -34,7 +34,6 @@
 #include "runtime/runtime_filter_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/string_value.h"
-#include "runtime/thread_context.h"
 #include "runtime/tuple_row.h"
 #include "util/priority_thread_pool.hpp"
 #include "util/priority_work_stealing_thread_pool.hpp"
@@ -177,6 +176,7 @@ void OlapScanNode::_init_counter(RuntimeState* state) {
 Status OlapScanNode::prepare(RuntimeState* state) {
     init_scan_profile();
     RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     // create scanner profile
     // create timer
     _tablet_counter = ADD_COUNTER(runtime_profile(), "TabletCount ", TUnit::UNIT);
@@ -223,6 +223,7 @@ Status OlapScanNode::prepare(RuntimeState* state) {
 Status OlapScanNode::open(RuntimeState* state) {
     VLOG_CRITICAL << "OlapScanNode::Open";
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
 
@@ -266,6 +267,7 @@ Status OlapScanNode::open(RuntimeState* state) {
 Status OlapScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     // check if Canceled.
     if (state->is_cancelled()) {
@@ -1521,6 +1523,7 @@ void OlapScanNode::transfer_thread(RuntimeState* state) {
 
 void OlapScanNode::scanner_thread(OlapScanner* scanner) {
     SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker());
+    ADD_THREAD_LOCAL_MEM_TRACKER(scanner->mem_tracker());
     if (UNLIKELY(_transfer_done)) {
         _scanner_done = true;
         std::unique_lock<std::mutex> l(_scan_batches_lock);
diff --git a/be/src/exec/olap_scanner.cpp b/be/src/exec/olap_scanner.cpp
index 37e6bafde2..6619c67aa6 100644
--- a/be/src/exec/olap_scanner.cpp
+++ b/be/src/exec/olap_scanner.cpp
@@ -60,6 +60,7 @@ Status OlapScanner::prepare(
         const std::vector<TCondition>& filters,
         const std::vector<std::pair<string, std::shared_ptr<IBloomFilterFuncBase>>>&
                 bloom_filters) {
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     set_tablet_reader();
     // set limit to reduce end of rowset and segment mem use
     _tablet_reader->set_batch_size(
@@ -119,6 +120,7 @@ Status OlapScanner::prepare(
 
 Status OlapScanner::open() {
     SCOPED_TIMER(_parent->_reader_init_timer);
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     if (_conjunct_ctxs.size() > _parent->_direct_conjunct_size) {
         _use_pushdown_conjuncts = true;
@@ -274,6 +276,7 @@ Status OlapScanner::_init_return_columns() {
 }
 
 Status OlapScanner::get_batch(RuntimeState* state, RowBatch* batch, bool* eof) {
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
     // 2. Allocate Row's Tuple buf
     uint8_t* tuple_buf =
             batch->tuple_data_pool()->allocate(state->batch_size() * _tuple_desc->byte_size());
diff --git a/be/src/exec/olap_scanner.h b/be/src/exec/olap_scanner.h
index a9b89511ea..5c336dffd6 100644
--- a/be/src/exec/olap_scanner.h
+++ b/be/src/exec/olap_scanner.h
@@ -93,6 +93,8 @@ public:
 
     const std::vector<SlotDescriptor*>& get_query_slots() const { return _query_slots; }
 
+    const std::shared_ptr<MemTracker>& mem_tracker() const {return _mem_tracker;}
+
 protected:
     Status _init_tablet_reader_params(const std::vector<OlapScanRange*>& key_ranges,
                         const std::vector<TCondition>& filters,
diff --git a/be/src/exec/partitioned_aggregation_node.cc b/be/src/exec/partitioned_aggregation_node.cc
index 59fa6c3293..3108b38d08 100644
--- a/be/src/exec/partitioned_aggregation_node.cc
+++ b/be/src/exec/partitioned_aggregation_node.cc
@@ -183,6 +183,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
 
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     state_ = state;
 
     mem_pool_.reset(new MemPool(mem_tracker().get()));
@@ -244,6 +245,7 @@ Status PartitionedAggregationNode::prepare(RuntimeState* state) {
 
 Status PartitionedAggregationNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     // Open the child before consuming resources in this node.
     RETURN_IF_ERROR(child(0)->open(state));
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -341,6 +343,7 @@ Status PartitionedAggregationNode::open(RuntimeState* state) {
 }
 
 Status PartitionedAggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     // 1. `!need_finalize` means this aggregation node not the level two aggregation node
     // 2. `grouping_exprs_.size() == 0 ` means is not group by
     // 3. `child(0)->rows_returned() == 0` mean not data from child
diff --git a/be/src/exec/repeat_node.cpp b/be/src/exec/repeat_node.cpp
index 401b40e318..086ff64a9e 100644
--- a/be/src/exec/repeat_node.cpp
+++ b/be/src/exec/repeat_node.cpp
@@ -44,6 +44,7 @@ RepeatNode::~RepeatNode() {}
 Status RepeatNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _runtime_state = state;
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     if (_tuple_desc == nullptr) {
@@ -55,6 +56,7 @@ Status RepeatNode::prepare(RuntimeState* state) {
 
 Status RepeatNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(child(0)->open(state));
@@ -163,6 +165,7 @@ Status RepeatNode::get_repeated_batch(RowBatch* child_row_batch, int repeat_id_i
 
 Status RepeatNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     DCHECK(_repeat_id_idx >= 0);
     for (const std::vector<int64_t>& v : _grouping_list) {
diff --git a/be/src/exec/schema_scan_node.cpp b/be/src/exec/schema_scan_node.cpp
index fae3a1dd9e..69feae25fd 100644
--- a/be/src/exec/schema_scan_node.cpp
+++ b/be/src/exec/schema_scan_node.cpp
@@ -99,6 +99,7 @@ Status SchemaScanNode::prepare(RuntimeState* state) {
     }
 
     RETURN_IF_ERROR(ScanNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
 
     // new one mem pool
     _tuple_pool.reset(new (std::nothrow) MemPool());
@@ -196,6 +197,7 @@ Status SchemaScanNode::open(RuntimeState* state) {
     }
 
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
@@ -242,6 +244,7 @@ Status SchemaScanNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
 
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     if (reached_limit()) {
         *eos = true;
diff --git a/be/src/exec/select_node.cpp b/be/src/exec/select_node.cpp
index a6db3a402d..4cef98f504 100644
--- a/be/src/exec/select_node.cpp
+++ b/be/src/exec/select_node.cpp
@@ -33,11 +33,13 @@ SelectNode::SelectNode(ObjectPool* pool, const TPlanNode& tnode, const Descripto
 
 Status SelectNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _child_row_batch.reset(new RowBatch(child(0)->row_desc(), state->batch_size()));
     return Status::OK();
 }
 
 Status SelectNode::open(RuntimeState* state) {
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(child(0)->open(state));
@@ -48,6 +50,7 @@ Status SelectNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     if (reached_limit() || (_child_row_idx == _child_row_batch->num_rows() && _child_eos)) {
         // we're already done or we exhausted the last child batch and there won't be any
diff --git a/be/src/exec/set_operation_node.cpp b/be/src/exec/set_operation_node.cpp
index 5574d3b90c..4b4ed9f828 100644
--- a/be/src/exec/set_operation_node.cpp
+++ b/be/src/exec/set_operation_node.cpp
@@ -23,7 +23,6 @@
 #include "runtime/raw_value.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 
 namespace doris {
 SetOperationNode::SetOperationNode(ObjectPool* pool, const TPlanNode& tnode,
@@ -41,13 +40,14 @@ Status SetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
 }
 
 Status SetOperationNode::prepare(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     DCHECK(_tuple_desc != nullptr);
     _build_pool.reset(new MemPool(mem_tracker().get()));
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
     for (size_t i = 0; i < _child_expr_lists.size(); ++i) {
         RETURN_IF_ERROR(Expr::prepare(_child_expr_lists[i], state, child(i)->row_desc(),
                                       expr_mem_tracker()));
@@ -138,8 +138,9 @@ bool SetOperationNode::equals(TupleRow* row, TupleRow* other) {
 Status SetOperationNode::open(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("SetOperation, while constructing the hash table.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("SetOperation, while constructing the hash table.");
     RETURN_IF_CANCELLED(state);
     // open result expr lists.
     for (const std::vector<ExprContext*>& exprs : _child_expr_lists) {
diff --git a/be/src/exec/spill_sort_node.cc b/be/src/exec/spill_sort_node.cc
index 58802741c0..c71deb5bac 100644
--- a/be/src/exec/spill_sort_node.cc
+++ b/be/src/exec/spill_sort_node.cc
@@ -44,6 +44,7 @@ Status SpillSortNode::init(const TPlanNode& tnode, RuntimeState* state) {
 Status SpillSortNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor,
                                              expr_mem_tracker()));
     // AddExprCtxsToFree(_sort_exec_exprs);
@@ -52,6 +53,7 @@ Status SpillSortNode::prepare(RuntimeState* state) {
 
 Status SpillSortNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(_sort_exec_exprs.open(state));
     RETURN_IF_CANCELLED(state);
@@ -82,6 +84,7 @@ Status SpillSortNode::open(RuntimeState* state) {
 
 Status SpillSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     // RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT, state));
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
diff --git a/be/src/exec/table_function_node.cpp b/be/src/exec/table_function_node.cpp
index 3b44b436ea..ee225a0610 100644
--- a/be/src/exec/table_function_node.cpp
+++ b/be/src/exec/table_function_node.cpp
@@ -80,6 +80,7 @@ Status TableFunctionNode::_prepare_output_slot_ids(const TPlanNode& tnode) {
 
 Status TableFunctionNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
 
     _num_rows_filtered_counter = ADD_COUNTER(_runtime_profile, "RowsFiltered", TUnit::UNIT);
 
@@ -92,6 +93,7 @@ Status TableFunctionNode::prepare(RuntimeState* state) {
 
 Status TableFunctionNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(ExecNode::open(state));
 
@@ -186,6 +188,7 @@ bool TableFunctionNode::_roll_table_functions(int last_eos_idx) {
 Status TableFunctionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     const RowDescriptor& parent_rowdesc = row_batch->row_desc();
     const RowDescriptor& child_rowdesc = _children[0]->row_desc();
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 96ffd04bbd..27893cb2a0 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -73,6 +73,7 @@ NodeChannel::~NodeChannel() noexcept {
 // no need to set _cancel_msg because the error will be
 // returned directly via "TabletSink::prepare()" method.
 Status NodeChannel::init(RuntimeState* state) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     _tuple_desc = _parent->_output_tuple_desc;
     auto node = _parent->_nodes_info->find_node(_node_id);
     if (node == nullptr) {
@@ -115,6 +116,7 @@ Status NodeChannel::init(RuntimeState* state) {
 }
 
 void NodeChannel::open() {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     PTabletWriterOpenRequest request;
     request.set_allocated_id(&_parent->_load_id);
     request.set_index_id(_index_channel->_index_id);
@@ -160,6 +162,7 @@ void NodeChannel::_cancel_with_msg(const std::string& msg) {
 
 Status NodeChannel::open_wait() {
     _open_closure->join();
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     if (_open_closure->cntl.Failed()) {
         if (!ExecEnv::GetInstance()->brpc_internal_client_cache()->available(
                     _stub, _node_info.host, _node_info.brpc_port)) {
@@ -249,6 +252,7 @@ Status NodeChannel::open_wait() {
 }
 
 Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
@@ -300,6 +304,7 @@ Status NodeChannel::add_row(Tuple* input_tuple, int64_t tablet_id) {
 // Used for vectorized engine.
 // TODO(cmy): deprecated, need refactor
 Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
@@ -350,6 +355,7 @@ Status NodeChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
 }
 
 void NodeChannel::mark_close() {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
         return;
@@ -373,6 +379,7 @@ void NodeChannel::mark_close() {
 }
 
 Status NodeChannel::close_wait(RuntimeState* state) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     // set _is_closed to true finally
     Defer set_closed {[&]() {
         std::lock_guard<std::mutex> l(_closed_lock);
@@ -422,6 +429,7 @@ Status NodeChannel::close_wait(RuntimeState* state) {
 }
 
 void NodeChannel::cancel(const std::string& cancel_msg) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     // set _is_closed to true finally
     Defer set_closed {[&]() {
         std::lock_guard<std::mutex> l(_closed_lock);
@@ -574,6 +582,7 @@ Status NodeChannel::none_of(std::initializer_list<bool> vars) {
 }
 
 void NodeChannel::clear_all_batches() {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_node_channel_tracker);
     std::lock_guard<std::mutex> lg(_pending_batches_lock);
     std::queue<AddBatchReq> empty;
     std::swap(_pending_batches, empty);
@@ -583,6 +592,7 @@ void NodeChannel::clear_all_batches() {
 IndexChannel::~IndexChannel() {}
 
 Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPartition>& tablets) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
     for (auto& tablet : tablets) {
         auto location = _parent->_location->find_tablet(tablet.tablet_id);
         if (location == nullptr) {
@@ -615,6 +625,7 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
 }
 
 void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
     auto it = _channels_by_tablet.find(tablet_id);
     DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id;
     for (auto channel : it->second) {
@@ -630,6 +641,7 @@ void IndexChannel::add_row(Tuple* tuple, int64_t tablet_id) {
 // Used for vectorized engine.
 // TODO(cmy): deprecated, need refactor
 void IndexChannel::add_row(BlockRow& block_row, int64_t tablet_id) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_index_channel_tracker);
     auto it = _channels_by_tablet.find(tablet_id);
     DCHECK(it != _channels_by_tablet.end()) << "unknown tablet, tablet_id=" << tablet_id;
     for (auto channel : it->second) {
@@ -760,6 +772,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
             MemTracker::create_tracker(-1, "OlapTableSink:" + std::to_string(state->load_job_id()),
                                        state->instance_mem_tracker());
     SCOPED_TIMER(_profile->total_time_counter());
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     // Prepare the exprs to run.
     RETURN_IF_ERROR(Expr::prepare(_output_expr_ctxs, state, _input_row_desc, _expr_mem_tracker));
@@ -863,6 +876,7 @@ Status OlapTableSink::prepare(RuntimeState* state) {
 Status OlapTableSink::open(RuntimeState* state) {
     SCOPED_TIMER(_profile->total_time_counter());
     SCOPED_TIMER(_open_timer);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     // Prepare the exprs to run.
     RETURN_IF_ERROR(Expr::open(_output_expr_ctxs, state));
 
@@ -901,6 +915,7 @@ Status OlapTableSink::open(RuntimeState* state) {
 
 Status OlapTableSink::send(RuntimeState* state, RowBatch* input_batch) {
     SCOPED_TIMER(_profile->total_time_counter());
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     // update incrementally so that FE can get the progress.
     // the real 'num_rows_load_total' will be set when sink being closed.
     int64_t num_rows = input_batch->num_rows();
diff --git a/be/src/exec/topn_node.cpp b/be/src/exec/topn_node.cpp
index 9f7cb71fc3..2707fe4990 100644
--- a/be/src/exec/topn_node.cpp
+++ b/be/src/exec/topn_node.cpp
@@ -59,6 +59,7 @@ Status TopNNode::init(const TPlanNode& tnode, RuntimeState* state) {
 Status TopNNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _tuple_pool.reset(new MemPool(mem_tracker().get()));
     RETURN_IF_ERROR(_sort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor,
                                              expr_mem_tracker()));
@@ -75,6 +76,7 @@ Status TopNNode::prepare(RuntimeState* state) {
 
 Status TopNNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Top n, before open."));
@@ -127,6 +129,7 @@ Status TopNNode::open(RuntimeState* state) {
 
 Status TopNNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(state->check_query_state("Top n, before moving result to row_batch."));
diff --git a/be/src/exec/union_node.cpp b/be/src/exec/union_node.cpp
index d9c389f823..8049861c8c 100644
--- a/be/src/exec/union_node.cpp
+++ b/be/src/exec/union_node.cpp
@@ -67,6 +67,7 @@ Status UnionNode::init(const TPlanNode& tnode, RuntimeState* state) {
 Status UnionNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _tuple_desc = state->desc_tbl().get_tuple_descriptor(_tuple_id);
     DCHECK(_tuple_desc != nullptr);
     _materialize_exprs_evaluate_timer =
@@ -93,6 +94,7 @@ Status UnionNode::prepare(RuntimeState* state) {
 
 Status UnionNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     // open const expr lists.
     for (const std::vector<ExprContext*>& exprs : _const_expr_lists) {
@@ -231,6 +233,7 @@ Status UnionNode::get_next_const(RuntimeState* state, RowBatch* row_batch) {
 
 Status UnionNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
     // TODO(zc)
diff --git a/be/src/exprs/expr_context.cpp b/be/src/exprs/expr_context.cpp
index e3bb2a43b9..779f3dd5d7 100644
--- a/be/src/exprs/expr_context.cpp
+++ b/be/src/exprs/expr_context.cpp
@@ -28,6 +28,7 @@
 #include "runtime/mem_tracker.h"
 #include "runtime/raw_value.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "udf/udf_internal.h"
 #include "util/debug_util.h"
 #include "util/stack_util.h"
@@ -52,6 +53,7 @@ Status ExprContext::prepare(RuntimeState* state, const RowDescriptor& row_desc,
                             const std::shared_ptr<MemTracker>& tracker) {
     DCHECK(!_prepared);
     _mem_tracker = tracker;
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     DCHECK(_pool.get() == nullptr);
     _prepared = true;
     _pool.reset(new MemPool(_mem_tracker.get()));
@@ -63,6 +65,7 @@ Status ExprContext::open(RuntimeState* state) {
     if (_opened) {
         return Status::OK();
     }
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _opened = true;
     // Fragment-local state is only initialized for original contexts. Clones inherit the
     // original's fragment state and only need to have thread-local state initialized.
@@ -108,6 +111,7 @@ Status ExprContext::clone(RuntimeState* state, ExprContext** new_ctx) {
     DCHECK(_prepared);
     DCHECK(_opened);
     DCHECK(*new_ctx == nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     *new_ctx = state->obj_pool()->add(new ExprContext(_root));
     (*new_ctx)->_pool.reset(new MemPool(_pool->mem_tracker()));
@@ -127,6 +131,7 @@ Status ExprContext::clone(RuntimeState* state, ExprContext** new_ctx, Expr* root
     DCHECK(_prepared);
     DCHECK(_opened);
     DCHECK(*new_ctx == nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     *new_ctx = state->obj_pool()->add(new ExprContext(root));
     (*new_ctx)->_pool.reset(new MemPool(_pool->mem_tracker()));
@@ -143,6 +148,7 @@ Status ExprContext::clone(RuntimeState* state, ExprContext** new_ctx, Expr* root
 }
 
 void ExprContext::free_local_allocations() {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     free_local_allocations(_fn_contexts);
 }
 
diff --git a/be/src/olap/base_compaction.cpp b/be/src/olap/base_compaction.cpp
index d379957761..e0de10534f 100644
--- a/be/src/olap/base_compaction.cpp
+++ b/be/src/olap/base_compaction.cpp
@@ -19,7 +19,6 @@
 
 #include "util/doris_metrics.h"
 #include "util/trace.h"
-#include "runtime/thread_context.h"
 
 namespace doris {
 
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 74f52f5a9c..df37735ca9 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -250,7 +250,8 @@ OLAPStatus DeltaWriter::wait_flush() {
 
 void DeltaWriter::_reset_mem_table() {
     _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema, _req.slots,
-                                  _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get()));
+                                  _req.tuple_desc, _tablet->keys_type(), _rowset_writer.get(),
+                                  _mem_tracker));
 }
 
 OLAPStatus DeltaWriter::close() {
@@ -284,10 +285,6 @@ OLAPStatus DeltaWriter::close_wait(google::protobuf::RepeatedPtrField<PTabletInf
 
     // return error if previous flush failed
     RETURN_NOT_OK(_flush_token->wait());
-    // Cannot directly DCHECK_EQ(_mem_tracker->consumption(), 0);
-    // In allocate/free of mem_pool, the consume_cache of _mem_tracker will be called,
-    // and _untracked_mem must be flushed first.
-    MemTracker::memory_leak_check(_mem_tracker.get());
 
     // use rowset meta manager to save meta
     _cur_rowset = _rowset_writer->build();
@@ -330,7 +327,6 @@ OLAPStatus DeltaWriter::cancel() {
         // cancel and wait all memtables in flush queue to be finished
         _flush_token->cancel();
     }
-    MemTracker::memory_leak_check(_mem_tracker.get());
     _is_cancelled = true;
     return OLAP_SUCCESS;
 }
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index 5d5649907f..0f5ebaa395 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -344,7 +344,7 @@ Cache::Handle* LRUCache::insert(const CacheKey& key, uint32_t hash, void* value,
     return reinterpret_cast<Cache::Handle*>(e);
 }
 
-void LRUCache::erase(const CacheKey& key, uint32_t hash, MemTracker* tracker) {
+void LRUCache::erase(const CacheKey& key, uint32_t hash) {
     LRUHandle* e = nullptr;
     bool last_ref = false;
     {
@@ -443,15 +443,12 @@ ShardedLRUCache::ShardedLRUCache(const std::string& name, size_t total_capacity,
         : _name(name),
           _last_id(1),
           _mem_tracker(MemTracker::create_tracker(-1, name, nullptr, MemTrackerLevel::OVERVIEW)) {
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(_mem_tracker);
     const size_t per_shard = (total_capacity + (kNumShards - 1)) / kNumShards;
     for (int s = 0; s < kNumShards; s++) {
         _shards[s] = new LRUCache(type);
         _shards[s]->set_capacity(per_shard);
     }
-    // After the lru cache is created in the main thread, the main thread will not switch to the
-    // lru cache mem tracker again, so manually clear the untracked mem in tls.
-    thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
 
     _entity = DorisMetrics::instance()->metric_registry()->register_entity(
             std::string("lru_cache:") + name, {{"name", name}});
@@ -499,7 +496,7 @@ void ShardedLRUCache::release(Handle* handle) {
 void ShardedLRUCache::erase(const CacheKey& key) {
     SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const uint32_t hash = _hash_slice(key);
-    _shards[_shard(hash)]->erase(key, hash, _mem_tracker.get());
+    _shards[_shard(hash)]->erase(key, hash);
 }
 
 void* ShardedLRUCache::value(Handle* handle) {
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 9a20843bd3..21e6e9742d 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -316,7 +316,7 @@ public:
                           CachePriority priority = CachePriority::NORMAL);
     Cache::Handle* lookup(const CacheKey& key, uint32_t hash);
     void release(Cache::Handle* handle);
-    void erase(const CacheKey& key, uint32_t hash, MemTracker* tracker);
+    void erase(const CacheKey& key, uint32_t hash);
     int64_t prune();
     int64_t prune_if(CacheValuePredicate pred);
 
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index 2ab6ab1b92..7b92121752 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -31,13 +31,14 @@ namespace doris {
 
 MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
-                   KeysType keys_type, RowsetWriter* rowset_writer)
+                   KeysType keys_type, RowsetWriter* rowset_writer,
+                   const std::shared_ptr<MemTracker>& parent_tracker)
         : _tablet_id(tablet_id),
           _schema(schema),
           _tablet_schema(tablet_schema),
           _slot_descs(slot_descs),
           _keys_type(keys_type),
-          _mem_tracker(MemTracker::create_tracker(-1, "MemTable")),
+          _mem_tracker(MemTracker::create_tracker(-1, "MemTable", parent_tracker)),
           _buffer_mem_pool(new MemPool(_mem_tracker.get())),
           _table_mem_pool(new MemPool(_mem_tracker.get())),
           _schema_size(_schema->schema_size()),
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index db0e6a12cf..6849bf45f4 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -40,7 +40,8 @@ class MemTable {
 public:
     MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
-             KeysType keys_type, RowsetWriter* rowset_writer);
+             KeysType keys_type, RowsetWriter* rowset_writer,
+             const std::shared_ptr<MemTracker>& parent_tracker);
     ~MemTable();
 
     int64_t tablet_id() const { return _tablet_id; }
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index b63074d282..75ae89136e 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -22,6 +22,7 @@
 #include "olap/memtable.h"
 #include "util/scoped_cleanup.h"
 #include "util/time.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -56,6 +57,7 @@ OLAPStatus FlushToken::wait() {
 }
 
 void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable, int64_t submit_task_time) {
+    SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD, memtable->mem_tracker());
     _stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time);
     SCOPED_CLEANUP({ memtable.reset(); });
     // If previous flush has failed, return directly
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index 37ef78925b..7f78176164 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -16,6 +16,7 @@
 // under the License.
 
 #include "olap/page_cache.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -31,6 +32,7 @@ StoragePageCache::StoragePageCache(size_t capacity, int32_t index_cache_percenta
         : _index_cache_percentage(index_cache_percentage),
           _mem_tracker(MemTracker::create_tracker(capacity, "StoragePageCache", nullptr,
                                                   MemTrackerLevel::OVERVIEW)) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     if (index_cache_percentage == 0) {
         _data_page_cache =
                 std::unique_ptr<Cache>(new_lru_cache("DataPageCache", capacity));
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ee7589be3c..dfc35a0bcd 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -37,7 +37,6 @@
 #include "runtime/exec_env.h"
 #include "runtime/mem_pool.h"
 #include "runtime/mem_tracker.h"
-#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 
 using std::deque;
@@ -50,9 +49,6 @@ using std::vector;
 
 namespace doris {
 
-DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(schema_change_mem_consumption, MetricUnit::BYTES, "",
-                                   mem_consumption, Labels({{"type", "schema_change"}}));
-
 class RowBlockSorter {
 public:
     explicit RowBlockSorter(RowBlockAllocator* allocator);
@@ -1386,15 +1382,9 @@ bool SchemaChangeWithSorting::_external_sorting(vector<RowsetSharedPtr>& src_row
     return true;
 }
 
-SchemaChangeHandler::SchemaChangeHandler()
-        : _mem_tracker(MemTracker::create_tracker(-1, "SchemaChangeHandler", StorageEngine::instance()->schema_change_mem_tracker())) {
-    REGISTER_HOOK_METRIC(schema_change_mem_consumption,
-                         [this]() { return _mem_tracker->consumption(); });
-}
+SchemaChangeHandler::SchemaChangeHandler() {}
 
-SchemaChangeHandler::~SchemaChangeHandler() {
-    DEREGISTER_HOOK_METRIC(schema_change_mem_consumption);
-}
+SchemaChangeHandler::~SchemaChangeHandler() {}
 
 OLAPStatus SchemaChangeHandler::process_alter_tablet_v2(const TAlterTabletReqV2& request) {
     LOG(INFO) << "begin to do request alter tablet: base_tablet_id=" << request.base_tablet_id
@@ -1497,13 +1487,6 @@ OLAPStatus SchemaChangeHandler::_do_process_alter_tablet_v2(const TAlterTabletRe
         reader_context.seek_columns = &return_columns;
         reader_context.sequence_id_idx = reader_context.tablet_schema->sequence_col_idx();
 
-        // TODO(zxy) switch to tls mem tracker
-        auto mem_tracker = MemTracker::create_tracker(
-                -1,
-                "AlterTablet:" + std::to_string(base_tablet->tablet_id()) + "-" +
-                        std::to_string(new_tablet->tablet_id()),
-                _mem_tracker, MemTrackerLevel::TASK);
-
         do {
             // get history data to be converted and it will check if there is hold in base tablet
             res = _get_versions_to_be_changed(base_tablet, &versions_to_be_changed);
diff --git a/be/src/olap/schema_change.h b/be/src/olap/schema_change.h
index 2db9a8f189..9b699bb35e 100644
--- a/be/src/olap/schema_change.h
+++ b/be/src/olap/schema_change.h
@@ -242,8 +242,6 @@ private:
     virtual ~SchemaChangeHandler();
     SchemaChangeHandler(const SchemaChangeHandler&) = delete;
     SchemaChangeHandler& operator=(const SchemaChangeHandler&) = delete;
-
-    std::shared_ptr<MemTracker> _mem_tracker;
 };
 
 using RowBlockDeleter = std::function<void(RowBlock*)>;
diff --git a/be/src/olap/snapshot_manager.cpp b/be/src/olap/snapshot_manager.cpp
index 4d23cc27ad..797cd59564 100644
--- a/be/src/olap/snapshot_manager.cpp
+++ b/be/src/olap/snapshot_manager.cpp
@@ -36,6 +36,7 @@
 #include "olap/rowset/rowset_id_generator.h"
 #include "olap/rowset/rowset_writer.h"
 #include "olap/storage_engine.h"
+#include "runtime/thread_context.h"
 
 using std::filesystem::path;
 using std::map;
@@ -63,6 +64,7 @@ SnapshotManager* SnapshotManager::instance() {
 
 OLAPStatus SnapshotManager::make_snapshot(const TSnapshotRequest& request, string* snapshot_path,
                                           bool* allow_incremental_clone) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     OLAPStatus res = OLAP_SUCCESS;
     if (snapshot_path == nullptr) {
         LOG(WARNING) << "output parameter cannot be null";
@@ -90,6 +92,7 @@ OLAPStatus SnapshotManager::make_snapshot(const TSnapshotRequest& request, strin
 }
 
 OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     // 如果请求的snapshot_path位于root/snapshot文件夹下,则认为是合法的,可以删除
     // 否则认为是非法请求,返回错误结果
     auto stores = StorageEngine::instance()->get_stores();
@@ -120,6 +123,7 @@ OLAPStatus SnapshotManager::release_snapshot(const string& snapshot_path) {
 // AlphaRowsetMeta here.
 OLAPStatus SnapshotManager::convert_rowset_ids(const FilePathDesc& clone_dir_desc, int64_t tablet_id,
                                                const int32_t& schema_hash) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     OLAPStatus res = OLAP_SUCCESS;
     // check clone dir existed
     if (!FileUtils::check_exist(clone_dir_desc.filepath)) {
diff --git a/be/src/olap/snapshot_manager.h b/be/src/olap/snapshot_manager.h
index 4846fbba2b..c13a133ab6 100644
--- a/be/src/olap/snapshot_manager.h
+++ b/be/src/olap/snapshot_manager.h
@@ -103,7 +103,6 @@ private:
     Mutex _snapshot_mutex;
     uint64_t _snapshot_base_id;
 
-    // TODO(zxy) used after
     std::shared_ptr<MemTracker> _mem_tracker = nullptr;
 }; // SnapshotManager
 
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 830f1932a1..f56c37c8e1 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -85,6 +85,8 @@ namespace doris {
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(unused_rowsets_count, MetricUnit::ROWSETS);
 DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(compaction_mem_consumption, MetricUnit::BYTES, "",
                                    mem_consumption, Labels({{"type", "compaction"}}));
+DEFINE_GAUGE_METRIC_PROTOTYPE_5ARG(schema_change_mem_consumption, MetricUnit::BYTES, "",
+                                   mem_consumption, Labels({{"type", "schema_change"}}));
 
 StorageEngine* StorageEngine::_s_instance = nullptr;
 
@@ -142,11 +144,14 @@ StorageEngine::StorageEngine(const EngineOptions& options)
     REGISTER_HOOK_METRIC(compaction_mem_consumption, [this]() {
         return _compaction_mem_tracker->consumption();
     });
+    REGISTER_HOOK_METRIC(schema_change_mem_consumption,
+                         [this]() { return _schema_change_mem_tracker->consumption(); });
 }
 
 StorageEngine::~StorageEngine() {
     DEREGISTER_HOOK_METRIC(unused_rowsets_count);
     DEREGISTER_HOOK_METRIC(compaction_mem_consumption);
+    DEREGISTER_HOOK_METRIC(schema_change_mem_consumption);
     _clear();
 
     if (_compaction_thread_pool) {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index 7bfbb77e5c..4a73c21e79 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -333,6 +333,7 @@ private:
     // Count the memory consumption of all SchemaChange tasks.
     std::shared_ptr<MemTracker> _schema_change_mem_tracker;
     // Count the memory consumption of all EngineCloneTask.
+    // Note: Memory that does not contain make/release snapshots.
     std::shared_ptr<MemTracker> _clone_mem_tracker;
     // Count the memory consumption of all EngineBatchLoadTask.
     std::shared_ptr<MemTracker> _batch_load_mem_tracker;
diff --git a/be/src/olap/task/engine_alter_tablet_task.cpp b/be/src/olap/task/engine_alter_tablet_task.cpp
index 4af633c0c6..228940975f 100644
--- a/be/src/olap/task/engine_alter_tablet_task.cpp
+++ b/be/src/olap/task/engine_alter_tablet_task.cpp
@@ -19,6 +19,7 @@
 
 #include "olap/schema_change.h"
 #include "runtime/mem_tracker.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -35,6 +36,7 @@ EngineAlterTabletTask::EngineAlterTabletTask(const TAlterTabletReqV2& request)
 }
 
 OLAPStatus EngineAlterTabletTask::execute() {
+    SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
     DorisMetrics::instance()->create_rollup_requests_total->increment(1);
 
     auto schema_change_handler = SchemaChangeHandler::instance();
diff --git a/be/src/olap/task/engine_batch_load_task.cpp b/be/src/olap/task/engine_batch_load_task.cpp
index ad9e264244..f0fce81bbd 100644
--- a/be/src/olap/task/engine_batch_load_task.cpp
+++ b/be/src/olap/task/engine_batch_load_task.cpp
@@ -37,6 +37,7 @@
 #include "olap/tablet.h"
 #include "util/doris_metrics.h"
 #include "util/pretty_printer.h"
+#include "runtime/thread_context.h"
 
 using apache::thrift::ThriftDebugString;
 using std::list;
@@ -60,6 +61,7 @@ EngineBatchLoadTask::EngineBatchLoadTask(TPushReq& push_req, std::vector<TTablet
 EngineBatchLoadTask::~EngineBatchLoadTask() {}
 
 OLAPStatus EngineBatchLoadTask::execute() {
+    SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
     Status status = Status::OK();
     if (_push_req.push_type == TPushType::LOAD || _push_req.push_type == TPushType::LOAD_DELETE ||
         _push_req.push_type == TPushType::LOAD_V2) {
diff --git a/be/src/olap/task/engine_checksum_task.cpp b/be/src/olap/task/engine_checksum_task.cpp
index 7b425a12d7..5068b72a29 100644
--- a/be/src/olap/task/engine_checksum_task.cpp
+++ b/be/src/olap/task/engine_checksum_task.cpp
@@ -19,6 +19,7 @@
 
 #include "olap/tuple_reader.h"
 #include "olap/row.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -34,6 +35,7 @@ EngineChecksumTask::EngineChecksumTask(TTabletId tablet_id, TSchemaHash schema_h
 }
 
 OLAPStatus EngineChecksumTask::execute() {
+    SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
     OLAPStatus res = _compute_checksum();
     return res;
 } // execute
diff --git a/be/src/olap/task/engine_clone_task.cpp b/be/src/olap/task/engine_clone_task.cpp
index b33b1c91c8..8fa5818ea8 100644
--- a/be/src/olap/task/engine_clone_task.cpp
+++ b/be/src/olap/task/engine_clone_task.cpp
@@ -30,6 +30,7 @@
 #include "olap/rowset/rowset_factory.h"
 #include "olap/snapshot_manager.h"
 #include "runtime/client_cache.h"
+#include "runtime/thread_context.h"
 #include "util/thrift_rpc_helper.h"
 
 using std::set;
@@ -63,6 +64,7 @@ EngineCloneTask::EngineCloneTask(const TCloneReq& clone_req, const TMasterInfo&
 
 OLAPStatus EngineCloneTask::execute() {
     // register the tablet to avoid it is deleted by gc thread during clone process
+    SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::STORAGE, _mem_tracker);
     StorageEngine::instance()->tablet_manager()->register_clone_tablet(_clone_req.tablet_id);
     OLAPStatus st = _do_clone();
     StorageEngine::instance()->tablet_manager()->unregister_clone_tablet(_clone_req.tablet_id);
diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc
index 837a6eb835..1232cceeb5 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -28,6 +28,7 @@
 #include "runtime/data_stream_mgr.h"
 #include "runtime/row_batch.h"
 #include "runtime/sorted_run_merger.h"
+#include "runtime/thread_context.h"
 #include "util/debug_util.h"
 #include "util/logging.h"
 #include "util/runtime_profile.h"
@@ -362,6 +363,7 @@ void DataStreamRecvr::SenderQueue::close() {
 
 Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) {
     DCHECK(_is_merging);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     vector<SortedRunMerger::RunBatchSupplier> child_input_batch_suppliers;
     // Create the merger that will a single stream of sorted rows.
     _merger.reset(new SortedRunMerger(less_than, &_row_desc, _profile, false));
@@ -377,6 +379,7 @@ Status DataStreamRecvr::create_merger(const TupleRowComparator& less_than) {
 Status DataStreamRecvr::create_parallel_merger(const TupleRowComparator& less_than,
                                                uint32_t batch_size) {
     DCHECK(_is_merging);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     vector<SortedRunMerger::RunBatchSupplier> child_input_batch_suppliers;
 
     // Create the merger that will a single stream of sorted rows.
@@ -422,6 +425,7 @@ void DataStreamRecvr::transfer_all_resources(RowBatch* transfer_batch) {
     // _child_mergers is not empty, means use parallel merge need transfer resource from
     // _sender queue.
     // the need transfer resources from child_merger input_row_batch
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     if (!_child_mergers.empty()) {
         _merger->transfer_all_resources(transfer_batch);
     } else {
@@ -470,17 +474,20 @@ DataStreamRecvr::DataStreamRecvr(
 
 Status DataStreamRecvr::get_next(RowBatch* output_batch, bool* eos) {
     DCHECK(_merger.get() != nullptr);
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
     return _merger->get_next(output_batch, eos);
 }
 
 void DataStreamRecvr::add_batch(const PRowBatch& batch, int sender_id, int be_number,
                                 int64_t packet_seq, ::google::protobuf::Closure** done) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int use_sender_id = _is_merging ? sender_id : 0;
     // Add all batches to the same queue if _is_merging is false.
     _sender_queues[use_sender_id]->add_batch(batch, be_number, packet_seq, done);
 }
 
 void DataStreamRecvr::add_batch(RowBatch* batch, int sender_id, bool use_move) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->add_batch(batch, use_move);
 }
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index cdd3742515..dae7b8a621 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -39,6 +39,7 @@
 #include "runtime/raw_value.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "runtime/tuple_row.h"
 #include "service/backend_options.h"
 #include "service/brpc.h"
@@ -388,10 +389,10 @@ Status DataStreamSender::prepare(RuntimeState* state) {
           << "])";
     _profile = _pool->add(new RuntimeProfile(title.str()));
     SCOPED_TIMER(_profile->total_time_counter());
-    // TODO(zxy) used after
     _mem_tracker = MemTracker::create_tracker(
             -1, "DataStreamSender:" + print_id(state->fragment_instance_id()),
-            state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _profile);
+            nullptr, MemTrackerLevel::VERBOSE, _profile);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
         std::random_device rd;
@@ -432,6 +433,7 @@ DataStreamSender::~DataStreamSender() {
 
 Status DataStreamSender::open(RuntimeState* state) {
     DCHECK(state != nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     RETURN_IF_ERROR(Expr::open(_partition_expr_ctxs, state));
     for (auto iter : _partition_infos) {
         RETURN_IF_ERROR(iter->open(state));
@@ -441,6 +443,7 @@ Status DataStreamSender::open(RuntimeState* state) {
 
 Status DataStreamSender::send(RuntimeState* state, RowBatch* batch) {
     SCOPED_TIMER(_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
 
     // Unpartition or _channel size
     if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc
index 3c2315b308..f910f96668 100644
--- a/be/src/runtime/disk_io_mgr.cc
+++ b/be/src/runtime/disk_io_mgr.cc
@@ -283,6 +283,7 @@ DiskIoMgr::DiskIoMgr()
 {
     _mem_tracker =
             MemTracker::create_tracker(-1, "DiskIO", nullptr, MemTrackerLevel::OVERVIEW);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t max_buffer_size_scaled = bit_ceil(_max_buffer_size, _min_buffer_size);
     _free_buffers.resize(bit_log2(max_buffer_size_scaled) + 1);
     int num_local_disks = (config::num_disks == 0 ? DiskInfo::num_disks() : config::num_disks);
@@ -305,6 +306,7 @@ DiskIoMgr::DiskIoMgr(int num_local_disks, int threads_per_disk, int min_buffer_s
 {
     _mem_tracker =
             MemTracker::create_tracker(-1, "DiskIO", nullptr, MemTrackerLevel::OVERVIEW);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t max_buffer_size_scaled = bit_ceil(_max_buffer_size, _min_buffer_size);
     _free_buffers.resize(bit_log2(max_buffer_size_scaled) + 1);
     if (num_local_disks == 0) {
@@ -370,6 +372,7 @@ DiskIoMgr::~DiskIoMgr() {
 }
 
 Status DiskIoMgr::init(const int64_t mem_limit) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _mem_tracker->set_limit(mem_limit);
     // If we hit the process limit, see if we can reclaim some memory by removing
     // previously allocated (but unused) io buffers.
@@ -456,6 +459,7 @@ void DiskIoMgr::unregister_context(RequestContext* reader) {
 // is on.
 // If wait_for_disks_completion is true, wait for the number of active disks to become 0.
 void DiskIoMgr::cancel_context(RequestContext* context, bool wait_for_disks_completion) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     context->cancel(Status::Cancelled("Cancelled"));
 
     if (wait_for_disks_completion) {
@@ -536,6 +540,7 @@ Status DiskIoMgr::add_scan_ranges(RequestContext* reader, const vector<ScanRange
     if (ranges.empty()) {
         return Status::OK();
     }
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     // Validate and initialize all ranges
     for (int i = 0; i < ranges.size(); ++i) {
@@ -584,6 +589,7 @@ Status DiskIoMgr::add_scan_ranges(RequestContext* reader, const vector<ScanRange
 Status DiskIoMgr::get_next_range(RequestContext* reader, ScanRange** range) {
     DCHECK(reader != nullptr);
     DCHECK(range != nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     *range = nullptr;
     Status status = Status::OK();
 
@@ -638,6 +644,7 @@ Status DiskIoMgr::get_next_range(RequestContext* reader, ScanRange** range) {
 Status DiskIoMgr::read(RequestContext* reader, ScanRange* range, BufferDescriptor** buffer) {
     DCHECK(range != nullptr);
     DCHECK(buffer != nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     *buffer = nullptr;
 
     if (range->len() > _max_buffer_size) {
@@ -658,6 +665,7 @@ Status DiskIoMgr::read(RequestContext* reader, ScanRange* range, BufferDescripto
 
 void DiskIoMgr::return_buffer(BufferDescriptor* buffer_desc) {
     DCHECK(buffer_desc != nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     if (!buffer_desc->_status.ok()) {
         DCHECK(buffer_desc->_buffer == nullptr);
     }
@@ -1168,6 +1176,7 @@ int DiskIoMgr::free_buffers_idx(int64_t buffer_size) {
 Status DiskIoMgr::add_write_range(RequestContext* writer, WriteRange* write_range) {
     DCHECK_LE(write_range->len(), _max_buffer_size);
     unique_lock<mutex> writer_lock(writer->_lock);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     if (writer->_state == RequestContext::Cancelled) {
         DCHECK(!writer->_status.ok());
diff --git a/be/src/runtime/fold_constant_executor.cpp b/be/src/runtime/fold_constant_executor.cpp
index 6a3e69a02c..9b5300f16a 100644
--- a/be/src/runtime/fold_constant_executor.cpp
+++ b/be/src/runtime/fold_constant_executor.cpp
@@ -22,6 +22,7 @@
 #include "runtime/tuple_row.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "runtime/mem_tracker.h"
 #include "exprs/expr_context.h"
 #include "exprs/expr.h"
@@ -43,6 +44,7 @@ TUniqueId FoldConstantExecutor::_dummy_id;
 
 Status FoldConstantExecutor::fold_constant_expr(
         const TFoldConstantParams& params, PConstantExprResult* response) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     const auto& expr_map = params.expr_map;
     auto expr_result_map = response->mutable_expr_result_map();
 
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 95a50b5ded..766e00919d 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -20,6 +20,7 @@
 #include "olap/lru_cache.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/tablets_channel.h"
+#include "runtime/thread_context.h"
 
 namespace doris {
 
@@ -42,6 +43,7 @@ LoadChannel::~LoadChannel() {
 }
 
 Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t index_id = params.index_id();
     std::shared_ptr<TabletsChannel> channel;
     {
@@ -66,6 +68,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
 
 Status LoadChannel::add_batch(const PTabletWriterAddBatchRequest& request,
                               PTabletWriterAddBatchResult* response) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t index_id = request.index_id();
     // 1. get tablets channel
     std::shared_ptr<TabletsChannel> channel;
@@ -152,6 +155,7 @@ bool LoadChannel::is_finished() {
 }
 
 Status LoadChannel::cancel() {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::lock_guard<std::mutex> l(_lock);
     for (auto& it : _tablets_channels) {
         it.second->cancel();
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index e5e23f5110..1596903ac3 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -21,6 +21,7 @@
 #include "olap/lru_cache.h"
 #include "runtime/load_channel.h"
 #include "runtime/mem_tracker.h"
+#include "runtime/thread_context.h"
 #include "service/backend_options.h"
 #include "util/doris_metrics.h"
 #include "util/stopwatch.hpp"
@@ -87,6 +88,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
     _mem_tracker = MemTracker::create_tracker(load_mem_limit, "LoadChannelMgr",
                                               MemTracker::get_process_tracker(),
                                               MemTrackerLevel::OVERVIEW);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     REGISTER_HOOK_METRIC(load_channel_mem_consumption, [this]() { return _mem_tracker->consumption(); });
     _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
     RETURN_IF_ERROR(_start_bg_worker());
@@ -94,6 +96,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
 }
 
 Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     UniqueId load_id(params.id());
     std::shared_ptr<LoadChannel> channel;
     {
@@ -126,6 +129,7 @@ static void dummy_deleter(const CacheKey& key, void* value) {}
 
 Status LoadChannelMgr::add_batch(const PTabletWriterAddBatchRequest& request,
                                  PTabletWriterAddBatchResult* response) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     UniqueId load_id(request.id());
     // 1. get load channel
     std::shared_ptr<LoadChannel> channel;
@@ -208,6 +212,7 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
 }
 
 Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     UniqueId load_id(params.id());
     std::shared_ptr<LoadChannel> cancelled_channel;
     {
diff --git a/be/src/runtime/mem_tracker.cpp b/be/src/runtime/mem_tracker.cpp
index b4ee28d9fa..002411c737 100644
--- a/be/src/runtime/mem_tracker.cpp
+++ b/be/src/runtime/mem_tracker.cpp
@@ -25,7 +25,6 @@
 #include "gutil/once.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "service/backend_options.h"
 #include "util/pretty_printer.h"
 #include "util/string_util.h"
@@ -119,7 +118,7 @@ MemTracker::MemTracker(int64_t byte_limit, const std::string& label,
                        RuntimeProfile* profile)
         : _limit(byte_limit),
           _label(label),
-          _id(_label + std::to_string(GetCurrentTimeMicros()) + std::to_string(rand())),
+          _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()),
           _parent(parent),
           _level(level) {
     if (profile == nullptr) {
diff --git a/be/src/runtime/mem_tracker.h b/be/src/runtime/mem_tracker.h
index 89a1d2e0c7..9ae3ab906a 100644
--- a/be/src/runtime/mem_tracker.h
+++ b/be/src/runtime/mem_tracker.h
@@ -402,7 +402,7 @@ public:
         return tracker == nullptr ? false : true;
     }
 
-    std::string id() { return _id; }
+    int64_t id() { return _id; }
 
     std::string debug_string() {
         std::stringstream msg;
@@ -467,7 +467,7 @@ private:
 
     std::string _label;
 
-    std::string _id;
+    int64_t _id;
 
     std::shared_ptr<MemTracker> _parent; // The parent of this tracker.
 
diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp
index 9aec083c6d..5d4c7b34f9 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -119,13 +119,10 @@ ChunkAllocator::ChunkAllocator(size_t reserve_limit)
           _arenas(CpuInfo::get_max_num_cores()) {
     _mem_tracker =
             MemTracker::create_tracker(-1, "ChunkAllocator", nullptr, MemTrackerLevel::OVERVIEW);
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(_mem_tracker);
     for (int i = 0; i < _arenas.size(); ++i) {
         _arenas[i].reset(new ChunkArena());
     }
-    // After the ChunkAllocator is created in the main thread, the main thread will not switch to the
-    // chunk allocator mem tracker again, so manually clear the untracked mem in tls.
-    thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
 
     _chunk_allocator_metric_entity =
             DorisMetrics::instance()->metric_registry()->register_entity("chunk_allocator");
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index 6518391218..a5bb1df0c0 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -85,6 +85,7 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt
                                        const TQueryOptions& options, int node_id) {
     DCHECK((role == RuntimeFilterRole::CONSUMER && node_id >= 0) ||
            role != RuntimeFilterRole::CONSUMER);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_tracker);
     int32_t key = desc.filter_id;
 
     std::map<int32_t, RuntimeFilterMgrVal>* filter_map = nullptr;
@@ -112,6 +113,7 @@ Status RuntimeFilterMgr::regist_filter(const RuntimeFilterRole role, const TRunt
 }
 
 Status RuntimeFilterMgr::update_filter(const PPublishFilterRequest* request, const char* data) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_tracker);
     UpdateRuntimeFilterParams params;
     params.request = request;
     params.data = data;
@@ -156,7 +158,8 @@ Status RuntimeFilterMergeControllerEntity::_init_with_desc(
     // LOG(INFO) << "entity filter id:" << filter_id;
     cntVal->filter->init_with_desc(&cntVal->runtime_filter_desc, query_options, _fragment_instance_id);
     cntVal->tracker = MemTracker::create_tracker(
-            -1, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id);
+            -1, thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker()->label() + ":FilterID:" + filter_id,
+            thread_local_ctx.get()->_thread_mem_tracker_mgr->mem_tracker());
     _filter_map.emplace(filter_id, cntVal);
     return Status::OK();
 }
@@ -166,8 +169,8 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag
                                                 const TQueryOptions& query_options) {
     _query_id = query_id;
     _fragment_instance_id = fragment_instance_id;
-    // TODO(zxy) used after
     _mem_tracker = MemTracker::create_tracker(-1, "RuntimeFilterMergeControllerEntity", nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     for (auto& filterid_to_desc : runtime_filter_params.rid_to_runtime_filter) {
         int filter_id = filterid_to_desc.first;
         const auto& target_iter = runtime_filter_params.rid_to_target_param.find(filter_id);
@@ -186,6 +189,7 @@ Status RuntimeFilterMergeControllerEntity::init(UniqueId query_id, UniqueId frag
 // merge data
 Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* request,
                                                  const char* data) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::shared_ptr<RuntimeFilterCntlVal> cntVal;
     int merged_size = 0;
     {
@@ -197,6 +201,7 @@ Status RuntimeFilterMergeControllerEntity::merge(const PMergeFilterRequest* requ
             return Status::InvalidArgument("unknown filter id");
         }
         cntVal = iter->second;
+        SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(cntVal->tracker);
         MergeRuntimeFilterParams params;
         params.data = data;
         params.request = request;
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index e4e0b78a1b..40687f07a2 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -23,6 +23,7 @@
 #include "olap/memtable.h"
 #include "runtime/row_batch.h"
 #include "runtime/tuple_row.h"
+#include "runtime/thread_context.h"
 #include "util/doris_metrics.h"
 
 namespace doris {
@@ -50,6 +51,7 @@ TabletsChannel::~TabletsChannel() {
 }
 
 Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kOpened) {
         // Normal case, already open by other sender
@@ -77,6 +79,7 @@ Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
 Status TabletsChannel::add_batch(const PTabletWriterAddBatchRequest& request,
         PTabletWriterAddBatchResult* response) {
     DCHECK(request.tablet_ids_size() == request.row_batch().num_rows());
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t cur_seq;
     {
         std::lock_guard<std::mutex> l(_lock);
@@ -202,6 +205,7 @@ Status TabletsChannel::close(int sender_id, int64_t backend_id, bool* finished,
 }
 
 Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         // TabletsChannel is closed without LoadChannel's lock,
@@ -301,6 +305,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
 }
 
 Status TabletsChannel::cancel() {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         return _close_status;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 7c3beafc61..a7c8261930 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -42,12 +42,21 @@
 // tracker again in the short term, can consider manually clear_untracked_mems.
 // The query thread will automatically clear_untracked_mems when detach_task.
 #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, false)
+    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<false>(mem_tracker, false)
 #define SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
-    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker(mem_tracker, true);
+    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<false>(mem_tracker, true);
+#define SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker) \
+    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTracker<true>(mem_tracker, true)
+// After the non-query thread switches the mem tracker, if the thread will not switch the mem
+// tracker again in the short term, can consider manually clear_untracked_mems.
+// The query thread will automatically clear_untracked_mems when detach_task.
+#define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_END_CLEAR(mem_tracker) \
+    auto VARNAME_LINENUM(switch_tracker) = SwitchThreadMemTrackerEndClear(mem_tracker)
 #define SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB(action_type, ...) \
     auto VARNAME_LINENUM(witch_tracker_cb) =                            \
             SwitchThreadMemTrackerErrCallBack(action_type, ##__VA_ARGS__)
+#define ADD_THREAD_LOCAL_MEM_TRACKER(mem_tracker) \
+    thread_local_ctx.get()->_thread_mem_tracker_mgr->add_tracker(mem_tracker)
 
 namespace doris {
 
@@ -67,10 +76,11 @@ public:
         UNKNOWN = 0,
         QUERY = 1,
         LOAD = 2,
-        COMPACTION = 3
+        COMPACTION = 3,
+        STORAGE = 4
         // to be added ...
     };
-    inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION"};
+    inline static const std::string TaskTypeStr[] = {"UNKNOWN", "QUERY", "LOAD", "COMPACTION", "STORAGE"};
 
 public:
     ThreadContext() : _thread_id(std::this_thread::get_id()), _type(TaskType::UNKNOWN) {
@@ -83,7 +93,9 @@ public:
     void attach(const TaskType& type, const std::string& task_id,
                 const TUniqueId& fragment_instance_id,
                 const std::shared_ptr<MemTracker>& mem_tracker) {
-        DCHECK(_type == TaskType::UNKNOWN && _task_id == "");
+        DCHECK(_type == TaskType::UNKNOWN && _task_id == "")
+                << ",old tracker label: " << mem_tracker->label()
+                << ",new tracker label: " << _thread_mem_tracker_mgr->mem_tracker()->label();
         _type = type;
         _task_id = task_id;
         _fragment_instance_id = fragment_instance_id;
@@ -177,13 +189,17 @@ public:
 
     explicit AttachTaskThread(const ThreadContext::TaskType& type,
                               const std::shared_ptr<MemTracker>& mem_tracker) {
+#ifndef BE_TEST
         DCHECK(mem_tracker);
+#endif
         thread_local_ctx.get()->attach(type, "", TUniqueId(), mem_tracker);
     }
 
     explicit AttachTaskThread(const TQueryType::type& query_type,
                               const std::shared_ptr<MemTracker>& mem_tracker) {
+#ifndef BE_TEST
         DCHECK(mem_tracker);
+#endif
         thread_local_ctx.get()->attach(query_to_task_type(query_type), "", TUniqueId(),
                                        mem_tracker);
     }
@@ -191,9 +207,11 @@ public:
     explicit AttachTaskThread(const TQueryType::type& query_type, const std::string& task_id,
                               const TUniqueId& fragment_instance_id,
                               const std::shared_ptr<MemTracker>& mem_tracker) {
+#ifndef BE_TEST
         DCHECK(task_id != "");
         DCHECK(fragment_instance_id != TUniqueId());
         DCHECK(mem_tracker);
+#endif
         thread_local_ctx.get()->attach(query_to_task_type(query_type), task_id,
                                        fragment_instance_id, mem_tracker);
     }
@@ -204,10 +222,10 @@ public:
         DCHECK(print_id(runtime_state->query_id()) != "");
         DCHECK(runtime_state->fragment_instance_id() != TUniqueId());
         DCHECK(mem_tracker);
+#endif
         thread_local_ctx.get()->attach(query_to_task_type(runtime_state->query_type()),
                                        print_id(runtime_state->query_id()),
                                        runtime_state->fragment_instance_id(), mem_tracker);
-#endif
     }
 
     const ThreadContext::TaskType query_to_task_type(const TQueryType::type& query_type) {
@@ -223,10 +241,8 @@ public:
     }
 
     ~AttachTaskThread() {
-#ifndef BE_TEST
         thread_local_ctx.get()->detach();
         DorisMetrics::instance()->attach_task_thread_count->increment(1);
-#endif
     }
 };
 
@@ -241,33 +257,55 @@ public:
     }
 
 private:
-    bool _scope;
+    bool _scope = true;
 };
 
+template <bool Existed>
 class SwitchThreadMemTracker {
 public:
     explicit SwitchThreadMemTracker(const std::shared_ptr<MemTracker>& mem_tracker,
                                     bool in_task = true) {
+        if (config::memory_verbose_track) {
 #ifndef BE_TEST
-        DCHECK(mem_tracker);
-        // The thread tracker must be switched after the attach task, otherwise switching
-        // in the main thread will cause the cached tracker not be cleaned up in time.
-        DCHECK(in_task == false ||
-               thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task());
-        _old_tracker_id =
-                thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker(mem_tracker);
+            DCHECK(mem_tracker);
+            // The thread tracker must be switched after the attach task, otherwise switching
+            // in the main thread will cause the cached tracker not be cleaned up in time.
+            DCHECK(in_task == false ||
+                   thread_local_ctx.get()->_thread_mem_tracker_mgr->is_attach_task());
+            if (Existed) {
+                _old_tracker_id =
+                        thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<true>(
+                                mem_tracker);
+            } else {
+                _old_tracker_id =
+                        thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker<false>(
+                                mem_tracker);
+            }
 #endif
+        }
     }
 
     ~SwitchThreadMemTracker() {
+        if (config::memory_verbose_track) {
 #ifndef BE_TEST
-        thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
-        DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
+            thread_local_ctx.get()->_thread_mem_tracker_mgr->update_tracker_id(_old_tracker_id);
+            DorisMetrics::instance()->switch_thread_mem_tracker_count->increment(1);
 #endif
+        }
     }
 
-private:
-    std::string _old_tracker_id;
+protected:
+    int64_t _old_tracker_id = 0;
+};
+
+class SwitchThreadMemTrackerEndClear : public SwitchThreadMemTracker<false> {
+public:
+    explicit SwitchThreadMemTrackerEndClear(const std::shared_ptr<MemTracker>& mem_tracker)
+            : SwitchThreadMemTracker<false>(mem_tracker, false) {}
+
+    ~SwitchThreadMemTrackerEndClear() {
+        thread_local_ctx.get()->_thread_mem_tracker_mgr->clear_untracked_mems();
+    }
 };
 
 class SwitchThreadMemTrackerErrCallBack {
diff --git a/be/src/runtime/thread_mem_tracker_mgr.cpp b/be/src/runtime/thread_mem_tracker_mgr.cpp
index fd06c17427..06fd521faf 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/thread_mem_tracker_mgr.cpp
@@ -37,9 +37,9 @@ void ThreadMemTrackerMgr::attach_task(const std::string& cancel_msg, const std::
         _temp_task_mem_tracker =
                 ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(
                         task_id);
-        update_tracker(_temp_task_mem_tracker);
+        update_tracker<false>(_temp_task_mem_tracker);
     } else {
-        update_tracker(mem_tracker);
+        update_tracker<false>(mem_tracker);
     }
 }
 
@@ -48,13 +48,15 @@ void ThreadMemTrackerMgr::detach_task() {
     _fragment_instance_id = TUniqueId();
     _consume_err_cb.init();
     clear_untracked_mems();
-    _tracker_id = "process";
+    _tracker_id = 0;
     // The following memory changes for the two map operations of _untracked_mems and _mem_trackers
     // will be re-recorded in _untracked_mem.
     _untracked_mems.clear();
-    _untracked_mems["process"] = 0;
+    _untracked_mems[0] = 0;
     _mem_trackers.clear();
-    _mem_trackers["process"] = MemTracker::get_process_tracker();
+    _mem_trackers[0] = MemTracker::get_process_tracker();
+    _mem_tracker_labels.clear();
+    _mem_tracker_labels[0] = MemTracker::get_process_tracker()->label();
 }
 
 void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details) {
@@ -72,7 +74,8 @@ void ThreadMemTrackerMgr::exceeded_cancel_task(const std::string& cancel_details
 
 void ThreadMemTrackerMgr::exceeded(int64_t mem_usage, Status st) {
     auto rst = _mem_trackers[_tracker_id]->mem_limit_exceeded(
-            nullptr, "In TCMalloc Hook, " + _consume_err_cb.cancel_msg, mem_usage, st);
+            nullptr, fmt::format("In TCMalloc Hook, {}", _consume_err_cb.cancel_msg), mem_usage,
+            st);
     if (_consume_err_cb.cb_func != nullptr) {
         _consume_err_cb.cb_func();
     }
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index 2b581dec79..0066dc0519 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -18,6 +18,7 @@
 #pragma once
 
 #include <fmt/format.h>
+#include <parallel_hashmap/phmap.h>
 
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
@@ -61,9 +62,9 @@ inline thread_local bool start_thread_mem_tracker = false;
 class ThreadMemTrackerMgr {
 public:
     ThreadMemTrackerMgr() {
-        _mem_trackers["process"] = MemTracker::get_process_tracker();
-        _untracked_mems["process"] = 0;
-        _tracker_id = "process";
+        _mem_trackers[0] = MemTracker::get_process_tracker();
+        _untracked_mems[0] = 0;
+        _tracker_id = 0;
         start_thread_mem_tracker = true;
     }
     ~ThreadMemTrackerMgr() {
@@ -74,11 +75,15 @@ public:
     void clear_untracked_mems() {
         for (const auto& untracked_mem : _untracked_mems) {
             if (untracked_mem.second != 0) {
-                DCHECK(_mem_trackers[untracked_mem.first]);
-                _mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
+                DCHECK(_mem_trackers[untracked_mem.first]) << ", label: " << _mem_tracker_labels[untracked_mem.first];
+                if (_mem_trackers[untracked_mem.first]) {
+                    _mem_trackers[untracked_mem.first]->consume(untracked_mem.second);
+                } else {
+                    MemTracker::get_process_tracker()->consume(untracked_mem.second);
+                }
             }
         }
-        _mem_trackers[_tracker_id]->consume(_untracked_mem);
+        mem_tracker()->consume(_untracked_mem);
         _untracked_mem = 0;
     }
 
@@ -91,14 +96,16 @@ public:
 
     // Must be fast enough!
     // Thread update_tracker may be called very frequently, adding a memory copy will be slow.
-    std::string update_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
-
-    void update_tracker_id(const std::string& tracker_id) {
-        if (tracker_id != _tracker_id) {
-            _untracked_mems[_tracker_id] += _untracked_mem;
-            _untracked_mem = 0;
-            _tracker_id = tracker_id;
-        }
+    template <bool Existed>
+    int64_t update_tracker(const std::shared_ptr<MemTracker>& mem_tracker);
+    void update_tracker_id(int64_t tracker_id);
+
+    void add_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
+        DCHECK(_mem_trackers.find(mem_tracker->id()) == _mem_trackers.end());
+        _mem_trackers[mem_tracker->id()] = mem_tracker;
+        DCHECK(_mem_trackers[mem_tracker->id()]);
+        _untracked_mems[mem_tracker->id()] = 0;
+        _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
     }
 
     inline ConsumeErrCallBackInfo update_consume_err_cb(const std::string& cancel_msg,
@@ -124,8 +131,12 @@ public:
     bool is_attach_task() { return _task_id != ""; }
 
     std::shared_ptr<MemTracker> mem_tracker() {
-        DCHECK(_mem_trackers[_tracker_id]);
-        return _mem_trackers[_tracker_id];
+        DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
+        if (_mem_trackers[_tracker_id]) {
+            return _mem_trackers[_tracker_id];
+        } else {
+            return MemTracker::get_process_tracker();
+        }
     }
 
 private:
@@ -146,12 +157,15 @@ private:
     //  2. The cost of calling consume for the current untracked mem is huge;
     // In order to reduce the cost, during an attach task, the untracked mem of all switched trackers is cached,
     // and the untracked mem is consumed only after the upper limit is reached or when the task is detached.
-    std::unordered_map<std::string, std::shared_ptr<MemTracker>> _mem_trackers;
-    std::string _tracker_id;
-    std::unordered_map<std::string, int64_t> _untracked_mems;
+    // NOTE: flat_hash_map, int replaces string as key, all to improve the speed of map find,
+    //  the expected speed is increased by more than 10 times.
+    phmap::flat_hash_map<int64_t, std::shared_ptr<MemTracker>> _mem_trackers;
+    int64_t _tracker_id;
+    phmap::flat_hash_map<int64_t, int64_t> _untracked_mems;
+    phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels;
 
     // Avoid memory allocation in functions and fall into an infinite loop
-    std::string _temp_tracker_id;
+    int64_t _temp_tracker_id;
     ConsumeErrCallBackInfo _temp_consume_err_cb;
     std::shared_ptr<MemTracker> _temp_task_mem_tracker;
 
@@ -160,23 +174,41 @@ private:
     ConsumeErrCallBackInfo _consume_err_cb;
 };
 
-inline std::string ThreadMemTrackerMgr::update_tracker(
-        const std::shared_ptr<MemTracker>& mem_tracker) {
+template <bool Existed>
+inline int64_t ThreadMemTrackerMgr::update_tracker(const std::shared_ptr<MemTracker>& mem_tracker) {
     DCHECK(mem_tracker);
     _temp_tracker_id = mem_tracker->id();
     if (_temp_tracker_id == _tracker_id) {
         return _tracker_id;
     }
-    if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) {
-        _mem_trackers[_temp_tracker_id] = mem_tracker;
-        _untracked_mems[_temp_tracker_id] = 0;
+    if (Existed) {
+        DCHECK(_mem_trackers.find(_temp_tracker_id) != _mem_trackers.end());
+    } else {
+        if (_mem_trackers.find(_temp_tracker_id) == _mem_trackers.end()) {
+            _mem_trackers[_temp_tracker_id] = mem_tracker;
+            DCHECK(_mem_trackers[_temp_tracker_id]);
+            _untracked_mems[_temp_tracker_id] = 0;
+            _mem_tracker_labels[_temp_tracker_id] = mem_tracker->label();
+        }
     }
+
     _untracked_mems[_tracker_id] += _untracked_mem;
     _untracked_mem = 0;
     std::swap(_tracker_id, _temp_tracker_id);
+    DCHECK(_mem_trackers[_tracker_id]);
     return _temp_tracker_id; // old tracker_id
 }
 
+inline void ThreadMemTrackerMgr::update_tracker_id(int64_t tracker_id) {
+    if (tracker_id != _tracker_id) {
+        _untracked_mems[_tracker_id] += _untracked_mem;
+        _untracked_mem = 0;
+        _tracker_id = tracker_id;
+        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
+        DCHECK(_mem_trackers[_tracker_id]);
+    }
+}
+
 inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
     _untracked_mem += size;
     // When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
@@ -184,7 +216,8 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
     // it will cause tracker->consumption to be temporarily less than 0.
     if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
         _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
-        DCHECK(_mem_trackers.find(_tracker_id) != _mem_trackers.end());
+        DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end());
+        start_thread_mem_tracker = false;
         // When switching to the current tracker last time, the remaining untracked memory.
         if (_untracked_mems[_tracker_id] != 0) {
             _untracked_mem += _untracked_mems[_tracker_id];
@@ -192,19 +225,18 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
         }
         // Avoid getting stuck in infinite loop if there is memory allocation in noncache_consume.
         // For example: GC function when try_consume; mem_limit_exceeded.
-        start_thread_mem_tracker = false;
         noncache_consume();
         start_thread_mem_tracker = true;
     }
 }
 
 inline void ThreadMemTrackerMgr::noncache_consume() {
-    DCHECK(_mem_trackers[_tracker_id]);
-    Status st = _mem_trackers[_tracker_id]->try_consume(_untracked_mem);
+    DCHECK(_mem_trackers[_tracker_id]) << ", label: " << _mem_tracker_labels[_tracker_id];
+    Status st = mem_tracker()->try_consume(_untracked_mem);
     if (!st) {
         // The memory has been allocated, so when TryConsume fails, need to continue to complete
         // the consume to ensure the accuracy of the statistics.
-        _mem_trackers[_tracker_id]->consume(_untracked_mem);
+        mem_tracker()->consume(_untracked_mem);
         exceeded(_untracked_mem, st);
     }
     _untracked_mem = 0;
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index 1754acdae5..c222a7a5b5 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -20,7 +20,6 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/runtime_filter_mgr.h"
-#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "vec/core/materialize_block.h"
 #include "vec/exprs/vexpr.h"
@@ -752,6 +751,7 @@ Status HashJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status HashJoinNode::prepare(RuntimeState* state) {
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _hash_table_mem_tracker = MemTracker::create_virtual_tracker(-1, "VSetOperationNode:HashTable");
 
     // Build phase
@@ -944,9 +944,10 @@ Status HashJoinNode::get_next(RuntimeState* state, Block* output_block, bool* eo
 }
 
 Status HashJoinNode::open(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::OPEN));
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_CANCELLED(state);
 
     RETURN_IF_ERROR(VExpr::open(_build_expr_ctxs, state));
diff --git a/be/src/vec/exec/vaggregation_node.cpp b/be/src/vec/exec/vaggregation_node.cpp
index 0788303f87..0942e24758 100644
--- a/be/src/vec/exec/vaggregation_node.cpp
+++ b/be/src/vec/exec/vaggregation_node.cpp
@@ -22,7 +22,6 @@
 #include "exec/exec_node.h"
 #include "runtime/mem_pool.h"
 #include "runtime/row_batch.h"
-#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "vec/core/block.h"
 #include "vec/data_types/data_type_nullable.h"
@@ -203,15 +202,15 @@ void AggregationNode::_init_hash_method(std::vector<VExprContext*>& probe_exprs)
 }
 
 Status AggregationNode::prepare(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _exec_timer = ADD_TIMER(runtime_profile(), "ExecTime");
     _merge_timer = ADD_TIMER(runtime_profile(), "MergeTime");
     _expr_timer = ADD_TIMER(runtime_profile(), "ExprTime");
     _get_results_timer = ADD_TIMER(runtime_profile(), "GetResultsTime");
     _data_mem_tracker = MemTracker::create_virtual_tracker(-1, "AggregationNode:Data", mem_tracker());
-
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
     _intermediate_tuple_desc = state->desc_tbl().get_tuple_descriptor(_intermediate_tuple_id);
     _output_tuple_desc = state->desc_tbl().get_tuple_descriptor(_output_tuple_id);
     DCHECK_EQ(_intermediate_tuple_desc->slots().size(), _output_tuple_desc->slots().size());
@@ -332,9 +331,10 @@ Status AggregationNode::prepare(RuntimeState* state) {
 }
 
 Status AggregationNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(ExecNode::open(state));
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute open.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute open.");
+    RETURN_IF_ERROR(ExecNode::open(state));
 
     RETURN_IF_ERROR(VExpr::open(_probe_expr_ctxs, state));
 
@@ -368,8 +368,9 @@ Status AggregationNode::get_next(RuntimeState* state, RowBatch* row_batch, bool*
 }
 
 Status AggregationNode::get_next(RuntimeState* state, Block* block, bool* eos) {
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute get_next.");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER_ERR_CB("aggregator, while execute get_next.");
 
     if (_is_streaming_preagg) {
         bool child_eos = false;
diff --git a/be/src/vec/exec/vanalytic_eval_node.cpp b/be/src/vec/exec/vanalytic_eval_node.cpp
index 4d69716216..6861421e12 100644
--- a/be/src/vec/exec/vanalytic_eval_node.cpp
+++ b/be/src/vec/exec/vanalytic_eval_node.cpp
@@ -142,6 +142,7 @@ Status VAnalyticEvalNode::init(const TPlanNode& tnode, RuntimeState* state) {
 Status VAnalyticEvalNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     DCHECK(child(0)->row_desc().is_prefix_of(row_desc()));
     _mem_pool.reset(new MemPool(mem_tracker().get()));
     _evaluation_timer = ADD_TIMER(runtime_profile(), "EvaluationTime");
@@ -208,6 +209,7 @@ Status VAnalyticEvalNode::prepare(RuntimeState* state) {
 
 Status VAnalyticEvalNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_CANCELLED(state);
     RETURN_IF_ERROR(child(0)->open(state));
@@ -234,6 +236,7 @@ Status VAnalyticEvalNode::get_next(RuntimeState* state, RowBatch* row_batch, boo
 
 Status VAnalyticEvalNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     RETURN_IF_CANCELLED(state);
 
diff --git a/be/src/vec/exec/vblocking_join_node.cpp b/be/src/vec/exec/vblocking_join_node.cpp
index 4ab17c0f8c..acfec89b26 100644
--- a/be/src/vec/exec/vblocking_join_node.cpp
+++ b/be/src/vec/exec/vblocking_join_node.cpp
@@ -22,7 +22,6 @@
 #include "exprs/expr.h"
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
 
 namespace doris::vectorized {
@@ -40,6 +39,7 @@ Status VBlockingJoinNode::init(const TPlanNode& tnode, RuntimeState* state) {
 Status VBlockingJoinNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
 
     _build_pool.reset(new MemPool(mem_tracker().get()));
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
@@ -76,8 +76,9 @@ void VBlockingJoinNode::build_side_thread(RuntimeState* state, std::promise<Stat
 }
 
 Status VBlockingJoinNode::open(RuntimeState* state) {
-    RETURN_IF_ERROR(ExecNode::open(state));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
+    RETURN_IF_ERROR(ExecNode::open(state));
 
     RETURN_IF_CANCELLED(state);
 
diff --git a/be/src/vec/exec/vcross_join_node.cpp b/be/src/vec/exec/vcross_join_node.cpp
index d2fc21d295..01d96d69de 100644
--- a/be/src/vec/exec/vcross_join_node.cpp
+++ b/be/src/vec/exec/vcross_join_node.cpp
@@ -23,7 +23,6 @@
 #include "gen_cpp/PlanNodes_types.h"
 #include "runtime/row_batch.h"
 #include "runtime/runtime_state.h"
-#include "runtime/thread_context.h"
 #include "util/runtime_profile.h"
 
 namespace doris::vectorized {
@@ -34,6 +33,7 @@ VCrossJoinNode::VCrossJoinNode(ObjectPool* pool, const TPlanNode& tnode, const D
 Status VCrossJoinNode::prepare(RuntimeState* state) {
     DCHECK(_join_op == TJoinOp::CROSS_JOIN);
     RETURN_IF_ERROR(VBlockingJoinNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(mem_tracker());
     _block_mem_tracker = MemTracker::create_virtual_tracker(-1, "VCrossJoinNode:Block", mem_tracker());
 
     _num_existing_columns = child(0)->row_desc().num_materialized_slots();
@@ -90,8 +90,9 @@ void VCrossJoinNode::init_get_next(int left_batch_row) {
 
 Status VCrossJoinNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     RETURN_IF_CANCELLED(state);
-    *eos = false;
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
+    *eos = false;
 
     if (_eos) {
         *eos = true;
diff --git a/be/src/vec/exec/volap_scan_node.cpp b/be/src/vec/exec/volap_scan_node.cpp
index d6109e0541..604dda8e72 100644
--- a/be/src/vec/exec/volap_scan_node.cpp
+++ b/be/src/vec/exec/volap_scan_node.cpp
@@ -21,7 +21,6 @@
 #include "runtime/descriptors.h"
 #include "runtime/exec_env.h"
 #include "runtime/runtime_filter_mgr.h"
-#include "runtime/thread_context.h"
 #include "util/priority_thread_pool.hpp"
 #include "vec/core/block.h"
 #include "vec/exec/volap_scanner.h"
@@ -147,6 +146,7 @@ void VOlapScanNode::transfer_thread(RuntimeState* state) {
 
 void VOlapScanNode::scanner_thread(VOlapScanner* scanner) {
     SCOPED_ATTACH_TASK_THREAD(_runtime_state, mem_tracker());
+    ADD_THREAD_LOCAL_MEM_TRACKER(scanner->mem_tracker());
     int64_t wait_time = scanner->update_wait_worker_timer();
     // Do not use ScopedTimer. There is no guarantee that, the counter
     // (_scan_cpu_timer, the class member) is not destroyed after `_running_thread==0`.
@@ -444,6 +444,7 @@ Status VOlapScanNode::close(RuntimeState* state) {
 Status VOlapScanNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     RETURN_IF_ERROR(exec_debug_action(TExecNodePhase::GETNEXT));
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(mem_tracker());
 
     // check if Canceled.
     if (state->is_cancelled()) {
diff --git a/be/src/vec/exec/volap_scanner.cpp b/be/src/vec/exec/volap_scanner.cpp
index 3850a8ab0c..c186e9a5c7 100644
--- a/be/src/vec/exec/volap_scanner.cpp
+++ b/be/src/vec/exec/volap_scanner.cpp
@@ -40,6 +40,7 @@ VOlapScanner::VOlapScanner(RuntimeState* runtime_state, VOlapScanNode* parent, b
 Status VOlapScanner::get_block(RuntimeState* state, vectorized::Block* block, bool* eof) {
     // only empty block should be here
     DCHECK(block->rows() == 0);
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
 
     int64_t raw_rows_threshold = raw_rows_read() + config::doris_scanner_row_num;
     int64_t raw_bytes_threshold = config::doris_scanner_row_bytes;
diff --git a/be/src/vec/exec/vset_operation_node.cpp b/be/src/vec/exec/vset_operation_node.cpp
index bd7856f29e..14cb8dec65 100644
--- a/be/src/vec/exec/vset_operation_node.cpp
+++ b/be/src/vec/exec/vset_operation_node.cpp
@@ -17,7 +17,6 @@
 
 #include "vec/exec/vset_operation_node.h"
 
-#include "runtime/thread_context.h"
 #include "util/defer_op.h"
 #include "vec/exprs/vexpr.h"
 namespace doris {
@@ -115,6 +114,7 @@ Status VSetOperationNode::init(const TPlanNode& tnode, RuntimeState* state) {
 
 Status VSetOperationNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     RETURN_IF_ERROR(ExecNode::open(state));
     // open result expr lists.
     for (const std::vector<VExprContext*>& exprs : _child_expr_lists) {
@@ -125,9 +125,10 @@ Status VSetOperationNode::open(RuntimeState* state) {
 }
 
 Status VSetOperationNode::prepare(RuntimeState* state) {
+    SCOPED_TIMER(_runtime_profile->total_time_counter());
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _hash_table_mem_tracker = MemTracker::create_virtual_tracker(-1, "VSetOperationNode:HashTable");
-    SCOPED_TIMER(_runtime_profile->total_time_counter());
     _build_timer = ADD_TIMER(runtime_profile(), "BuildTime");
     _probe_timer = ADD_TIMER(runtime_profile(), "ProbeTime");
 
diff --git a/be/src/vec/exec/vsort_node.cpp b/be/src/vec/exec/vsort_node.cpp
index 4b5197a497..8e963262fe 100644
--- a/be/src/vec/exec/vsort_node.cpp
+++ b/be/src/vec/exec/vsort_node.cpp
@@ -43,6 +43,7 @@ Status VSortNode::prepare(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
     _runtime_profile->add_info_string("TOP-N", _limit == -1 ? "false" : "true");
     RETURN_IF_ERROR(ExecNode::prepare(state));
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _block_mem_tracker = MemTracker::create_virtual_tracker(-1, "VSortNode:Block", mem_tracker());
     RETURN_IF_ERROR(_vsort_exec_exprs.prepare(state, child(0)->row_desc(), _row_descriptor,
                                               expr_mem_tracker()));
@@ -51,6 +52,7 @@ Status VSortNode::prepare(RuntimeState* state) {
 
 Status VSortNode::open(RuntimeState* state) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     RETURN_IF_ERROR(ExecNode::open(state));
     RETURN_IF_ERROR(_vsort_exec_exprs.open(state));
     RETURN_IF_CANCELLED(state);
@@ -76,6 +78,7 @@ Status VSortNode::get_next(RuntimeState* state, RowBatch* row_batch, bool* eos)
 
 Status VSortNode::get_next(RuntimeState* state, Block* block, bool* eos) {
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
 
     auto status = Status::OK();
     if (_sorted_blocks.empty()) {
diff --git a/be/src/vec/exprs/vexpr_context.cpp b/be/src/vec/exprs/vexpr_context.cpp
index 1a8c3f15ee..06d3d2f2be 100644
--- a/be/src/vec/exprs/vexpr_context.cpp
+++ b/be/src/vec/exprs/vexpr_context.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/exprs/vexpr_context.h"
 
+#include "runtime/thread_context.h"
 #include "udf/udf_internal.h"
 #include "vec/exprs/vexpr.h"
 
@@ -40,6 +41,7 @@ doris::Status VExprContext::prepare(doris::RuntimeState* state,
                                     const std::shared_ptr<doris::MemTracker>& tracker) {
     _prepared = true;
     _mem_tracker = tracker;
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _pool.reset(new MemPool(_mem_tracker.get()));
     return _root->prepare(state, row_desc, this);
 }
@@ -49,6 +51,7 @@ doris::Status VExprContext::open(doris::RuntimeState* state) {
     if (_opened) {
         return Status::OK();
     }
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _opened = true;
     // Fragment-local state is only initialized for original contexts. Clones inherit the
     // original's fragment state and only need to have thread-local state initialized.
@@ -77,6 +80,7 @@ doris::Status VExprContext::clone(RuntimeState* state, VExprContext** new_ctx) {
     DCHECK(_prepared);
     DCHECK(_opened);
     DCHECK(*new_ctx == nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     *new_ctx = state->obj_pool()->add(new VExprContext(_root));
     (*new_ctx)->_pool.reset(new MemPool(_pool->mem_tracker()));
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 4767127174..180c31838c 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -19,6 +19,7 @@
 
 #include "gen_cpp/data.pb.h"
 #include "runtime/mem_tracker.h"
+#include "runtime/thread_context.h"
 #include "util/uid_util.h"
 
 #include "vec/core/block.h"
@@ -261,6 +262,7 @@ VDataStreamRecvr::VDataStreamRecvr(
     _mem_tracker =
             MemTracker::create_tracker(-1, "VDataStreamRecvr:" + print_id(_fragment_instance_id),
                                        nullptr, MemTrackerLevel::VERBOSE, _profile);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     _block_mem_tracker = MemTracker::create_virtual_tracker(
             -1, "VDataStreamRecvr:block:" + print_id(_fragment_instance_id), _mem_tracker);
 
@@ -293,6 +295,7 @@ Status VDataStreamRecvr::create_merger(const std::vector<VExprContext*>& orderin
                                        const std::vector<bool>& nulls_first, size_t batch_size,
                                        int64_t limit, size_t offset) {
     DCHECK(_is_merging);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::vector<BlockSupplier> child_block_suppliers;
     // Create the merger that will a single stream of sorted rows.
     _merger.reset(new VSortedRunMerger(ordering_expr, is_asc_order, nulls_first, batch_size, limit,
@@ -308,16 +311,19 @@ Status VDataStreamRecvr::create_merger(const std::vector<VExprContext*>& orderin
 
 void VDataStreamRecvr::add_block(const PBlock& pblock, int sender_id, int be_number,
                                  int64_t packet_seq, ::google::protobuf::Closure** done) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->add_block(pblock, be_number, packet_seq, done);
 }
 
 void VDataStreamRecvr::add_block(Block* block, int sender_id, bool use_move) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int use_sender_id = _is_merging ? sender_id : 0;
     _sender_queues[use_sender_id]->add_block(block, use_move);
 }
 
 Status VDataStreamRecvr::get_next(Block* block, bool* eos) {
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
     if (!_is_merging) {
         Block* res = nullptr;
         RETURN_IF_ERROR(_sender_queues[0]->get_batch(&res));
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index 46df1dc47c..f5882ed38f 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -27,6 +27,7 @@
 #include "runtime/exec_env.h"
 #include "runtime/mem_tracker.h"
 #include "runtime/runtime_state.h"
+#include "runtime/thread_context.h"
 #include "util/proto_util.h"
 #include "vec/common/sip_hash.h"
 #include "vec/runtime/vdata_stream_mgr.h"
@@ -343,6 +344,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     _mem_tracker = MemTracker::create_tracker(
             -1, "VDataStreamSender:" + print_id(state->fragment_instance_id()),
             state->instance_mem_tracker(), MemTrackerLevel::VERBOSE, _profile);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
         std::random_device rd;
@@ -376,6 +378,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
 
 Status VDataStreamSender::open(RuntimeState* state) {
     DCHECK(state != nullptr);
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     RETURN_IF_ERROR(VExpr::open(_partition_expr_ctxs, state));
     for (auto iter : _partition_infos) {
         RETURN_IF_ERROR(iter->open(state));
@@ -389,6 +392,7 @@ Status VDataStreamSender::send(RuntimeState* state, RowBatch* batch) {
 
 Status VDataStreamSender::send(RuntimeState* state, Block* block) {
     SCOPED_TIMER(_profile->total_time_counter());
+    SCOPED_SWITCH_TASK_THREAD_LOCAL_EXISTED_MEM_TRACKER(_mem_tracker);
     if (_part_type == TPartitionType::UNPARTITIONED || _channels.size() == 1) {
         // 1. serialize depends on it is not local exchange
         // 2. send block
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index e8d3cc0f5f..58bb03c69c 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -17,6 +17,7 @@
 
 #include "vec/sink/vtablet_sink.h"
 
+#include "runtime/thread_context.h"
 #include "util/doris_metrics.h"
 #include "vec/core/block.h"
 #include "vec/exprs/vexpr.h"
@@ -55,6 +56,7 @@ Status VOlapTableSink::open(RuntimeState* state) {
 }
 
 Status VOlapTableSink::send(RuntimeState* state, vectorized::Block* input_block) {
+    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     Status status = Status::OK();
 
     auto rows = input_block->rows();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org