You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/27 10:59:30 UTC

[doris] branch master updated: [fix] (mem tracker) Fix MemTracker accuracy (#11190)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new b6bdb3bdbc [fix] (mem tracker) Fix MemTracker accuracy (#11190)
b6bdb3bdbc is described below

commit b6bdb3bdbcc81d6751b21044a09485a3be7bf07e
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed Jul 27 18:59:24 2022 +0800

    [fix] (mem tracker) Fix MemTracker accuracy (#11190)
---
 be/src/common/config.h                           |  2 +-
 be/src/exec/broker_scan_node.cpp                 |  2 +
 be/src/exec/es_http_scan_node.cpp                |  1 +
 be/src/exec/tablet_sink.cpp                      |  1 -
 be/src/http/ev_http_server.cpp                   |  2 +
 be/src/olap/compaction.cpp                       |  8 +++-
 be/src/olap/delta_writer.cpp                     | 39 ++++++++++++-----
 be/src/olap/delta_writer.h                       | 11 +++--
 be/src/olap/memtable.cpp                         |  5 +--
 be/src/olap/memtable.h                           |  4 +-
 be/src/olap/memtable_flush_executor.cpp          | 52 ++++++++++++----------
 be/src/olap/memtable_flush_executor.h            |  7 ++-
 be/src/olap/olap_server.cpp                      | 10 ++++-
 be/src/olap/storage_engine.cpp                   |  7 ++-
 be/src/olap/storage_engine.h                     |  2 +
 be/src/runtime/disk_io_mgr.cc                    |  9 ++--
 be/src/runtime/disk_io_mgr.h                     |  2 +-
 be/src/runtime/exec_env_init.cpp                 |  1 +
 be/src/runtime/fragment_mgr.cpp                  |  5 +--
 be/src/runtime/load_channel.cpp                  |  4 +-
 be/src/runtime/load_channel.h                    |  1 -
 be/src/runtime/memory/mem_tracker.cpp            |  8 ++--
 be/src/runtime/memory/mem_tracker.h              |  6 +--
 be/src/runtime/memory/mem_tracker_limiter.h      |  8 +++-
 be/src/runtime/memory/mem_tracker_task_pool.cpp  | 56 +++++++++---------------
 be/src/runtime/memory/mem_tracker_task_pool.h    |  7 +--
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp |  8 ++++
 be/src/runtime/memory/thread_mem_tracker_mgr.h   | 18 ++++++++
 be/src/runtime/tablets_channel.cpp               | 15 +++----
 be/src/runtime/tablets_channel.h                 |  7 ++-
 be/src/runtime/thread_context.cpp                | 19 --------
 be/src/runtime/thread_context.h                  |  1 -
 be/src/service/doris_main.cpp                    |  4 +-
 be/src/vec/exec/vbroker_scan_node.cpp            |  3 ++
 be/src/vec/exec/ves_http_scan_node.cpp           |  1 +
 be/src/vec/sink/vtablet_sink.cpp                 |  1 +
 be/test/olap/delta_writer_test.cpp               |  6 +--
 37 files changed, 192 insertions(+), 151 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 9891df2f18..d1f43eaeba 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -653,7 +653,7 @@ CONF_Bool(memory_verbose_track, "false");
 // smaller than this value will continue to accumulate. specified as number of bytes.
 // Decreasing this value will increase the frequency of consume/release.
 // Increasing this value will cause MemTracker statistics to be inaccurate.
-CONF_mInt32(mem_tracker_consume_min_size_bytes, "4194304");
+CONF_mInt32(mem_tracker_consume_min_size_bytes, "1048576");
 
 // The version information of the tablet will be stored in the memory
 // in an adjacency graph data structure.
diff --git a/be/src/exec/broker_scan_node.cpp b/be/src/exec/broker_scan_node.cpp
index 9bc920a9b6..0b5fe1d940 100644
--- a/be/src/exec/broker_scan_node.cpp
+++ b/be/src/exec/broker_scan_node.cpp
@@ -373,6 +373,8 @@ Status BrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range,
 }
 
 void BrokerScanNode::scanner_worker(int start_idx, int length) {
+    SCOPED_ATTACH_TASK(_runtime_state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
     auto status = Expr::clone_if_not_exists(_conjunct_ctxs, _runtime_state, &scanner_expr_ctxs);
diff --git a/be/src/exec/es_http_scan_node.cpp b/be/src/exec/es_http_scan_node.cpp
index acfc14bb7d..ec23253a5b 100644
--- a/be/src/exec/es_http_scan_node.cpp
+++ b/be/src/exec/es_http_scan_node.cpp
@@ -419,6 +419,7 @@ static std::string get_host_port(const std::vector<TNetworkAddress>& es_hosts) {
 
 void EsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
     SCOPED_ATTACH_TASK(_runtime_state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
     DCHECK(start_idx < length);
diff --git a/be/src/exec/tablet_sink.cpp b/be/src/exec/tablet_sink.cpp
index 6254d906bd..ef3d97c2bf 100644
--- a/be/src/exec/tablet_sink.cpp
+++ b/be/src/exec/tablet_sink.cpp
@@ -600,7 +600,6 @@ Status IndexChannel::init(RuntimeState* state, const std::vector<TTabletWithPart
 
 void IndexChannel::mark_as_failed(int64_t node_id, const std::string& host, const std::string& err,
                                   int64_t tablet_id) {
-    SCOPED_CONSUME_MEM_TRACKER(_index_channel_tracker.get());
     const auto& it = _tablets_by_channel.find(node_id);
     if (it == _tablets_by_channel.end()) {
         return;
diff --git a/be/src/http/ev_http_server.cpp b/be/src/http/ev_http_server.cpp
index 2d9031383a..bf612ca830 100644
--- a/be/src/http/ev_http_server.cpp
+++ b/be/src/http/ev_http_server.cpp
@@ -33,6 +33,7 @@
 #include "http/http_handler.h"
 #include "http/http_headers.h"
 #include "http/http_request.h"
+#include "runtime/thread_context.h"
 #include "service/brpc.h"
 #include "util/debug_util.h"
 #include "util/threadpool.h"
@@ -98,6 +99,7 @@ void EvHttpServer::start() {
     _event_bases.resize(_num_workers);
     for (int i = 0; i < _num_workers; ++i) {
         CHECK(_workers->submit_func([this, i]() {
+                          thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
                           std::shared_ptr<event_base> base(event_base_new(), [](event_base* base) {
                               event_base_free(base);
                           });
diff --git a/be/src/olap/compaction.cpp b/be/src/olap/compaction.cpp
index aa6ff0f6d3..a456370f85 100644
--- a/be/src/olap/compaction.cpp
+++ b/be/src/olap/compaction.cpp
@@ -41,7 +41,13 @@ Compaction::Compaction(TabletSharedPtr tablet, const std::string& label)
 #endif
 }
 
-Compaction::~Compaction() {}
+Compaction::~Compaction() {
+#ifndef BE_TEST
+    // Compaction tracker cannot be completely accurate, offset the global impact.
+    StorageEngine::instance()->compaction_mem_tracker()->consumption_revise(
+            -_mem_tracker->consumption());
+#endif
+}
 
 Status Compaction::compact() {
     RETURN_NOT_OK(prepare_compact());
diff --git a/be/src/olap/delta_writer.cpp b/be/src/olap/delta_writer.cpp
index 73e9d7b992..04b7327f25 100644
--- a/be/src/olap/delta_writer.cpp
+++ b/be/src/olap/delta_writer.cpp
@@ -30,12 +30,14 @@
 
 namespace doris {
 
-Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, bool is_vec) {
-    *writer = new DeltaWriter(req, StorageEngine::instance(), is_vec);
+Status DeltaWriter::open(WriteRequest* req, DeltaWriter** writer, MemTrackerLimiter* parent_tracker,
+                         bool is_vec) {
+    *writer = new DeltaWriter(req, StorageEngine::instance(), parent_tracker, is_vec);
     return Status::OK();
 }
 
-DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec)
+DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine,
+                         MemTrackerLimiter* parent_tracker, bool is_vec)
         : _req(*req),
           _tablet(nullptr),
           _cur_rowset(nullptr),
@@ -43,6 +45,7 @@ DeltaWriter::DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool
           _tablet_schema(new TabletSchema),
           _delta_written_success(false),
           _storage_engine(storage_engine),
+          _parent_tracker(parent_tracker),
           _is_vec(is_vec) {}
 
 DeltaWriter::~DeltaWriter() {
@@ -95,9 +98,9 @@ Status DeltaWriter::init() {
                      << ", schema_hash=" << _req.schema_hash;
         return Status::OLAPInternalError(OLAP_ERR_TABLE_NOT_FOUND);
     }
-
-    _flushed_mem_tracker = std::make_unique<MemTracker>(
-            fmt::format("DeltaWriter:tabletId={}", std::to_string(_tablet->tablet_id())));
+    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+            -1, fmt::format("DeltaWriter:tabletId={}", _tablet->tablet_id()), _parent_tracker);
+    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     // check tablet version number
     if (_tablet->version_count() > config::max_tablet_version_num) {
         //trigger quick compaction
@@ -147,7 +150,10 @@ Status DeltaWriter::write(Tuple* tuple) {
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
+    int64_t prev_memtable_usage = _mem_table->memory_usage();
     _mem_table->insert(tuple);
+    THREAD_MEM_TRACKER_TRANSFER_TO(_mem_table->memory_usage() - prev_memtable_usage,
+                                   _mem_tracker.get());
 
     // if memtable is full, push it to the flush executor,
     // and create a new memtable for incoming data
@@ -171,9 +177,15 @@ Status DeltaWriter::write(const RowBatch* row_batch, const std::vector<int>& row
     if (_is_cancelled) {
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
+
+    // Hook automatically records that the memory is lower than the real value, so manual tracking is used.
+    // Because multiple places freed memory that doesn't belong to DeltaWriter
+    int64_t prev_memtable_usage = _mem_table->memory_usage();
     for (const auto& row_idx : row_idxs) {
         _mem_table->insert(row_batch->get_row(row_idx)->get_tuple(0));
     }
+    THREAD_MEM_TRACKER_TRANSFER_TO(_mem_table->memory_usage() - prev_memtable_usage,
+                                   _mem_tracker.get());
 
     if (_mem_table->memory_usage() >= config::write_buffer_size) {
         RETURN_NOT_OK(_flush_memtable_async());
@@ -196,6 +208,7 @@ Status DeltaWriter::write(const vectorized::Block* block, const std::vector<int>
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
+    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     _mem_table->insert(block, row_idxs);
 
     if (_mem_table->need_to_agg()) {
@@ -213,8 +226,7 @@ Status DeltaWriter::_flush_memtable_async() {
     if (++_segment_counter > config::max_segment_num_per_rowset) {
         return Status::OLAPInternalError(OLAP_ERR_TOO_MANY_SEGMENTS);
     }
-    _flushed_mem_tracker->consume(_mem_table->memory_usage());
-    return _flush_token->submit(_mem_table);
+    return _flush_token->submit(std::move(_mem_table), _mem_tracker.get());
 }
 
 Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
@@ -231,6 +243,7 @@ Status DeltaWriter::flush_memtable_and_wait(bool need_wait) {
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
+    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     if (mem_consumption() == _mem_table->memory_usage()) {
         // equal means there is no memtable in flush queue, just flush this memtable
         VLOG_NOTICE << "flush memtable to reduce mem consumption. memtable size: "
@@ -267,7 +280,7 @@ Status DeltaWriter::wait_flush() {
 void DeltaWriter::_reset_mem_table() {
     _mem_table.reset(new MemTable(_tablet->tablet_id(), _schema.get(), _tablet_schema.get(),
                                   _req.slots, _req.tuple_desc, _tablet->keys_type(),
-                                  _rowset_writer.get(), _flushed_mem_tracker.get(), _is_vec));
+                                  _rowset_writer.get(), _is_vec));
 }
 
 Status DeltaWriter::close() {
@@ -285,6 +298,7 @@ Status DeltaWriter::close() {
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
+    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     RETURN_NOT_OK(_flush_memtable_async());
     _mem_table.reset();
     return Status::OK();
@@ -299,6 +313,7 @@ Status DeltaWriter::close_wait() {
         return Status::OLAPInternalError(OLAP_ERR_ALREADY_CANCELLED);
     }
 
+    SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     // return error if previous flush failed
     RETURN_NOT_OK(_flush_token->wait());
 
@@ -350,12 +365,12 @@ int64_t DeltaWriter::get_mem_consumption_snapshot() const {
 }
 
 int64_t DeltaWriter::mem_consumption() const {
-    if (_flushed_mem_tracker == nullptr) {
+    if (_mem_tracker == nullptr) {
         // This method may be called before this writer is initialized.
-        // So _flushed_mem_tracker may be null.
+        // So _mem_tracker may be null.
         return 0;
     }
-    return _flushed_mem_tracker->consumption() + _mem_table->memory_usage();
+    return _mem_tracker->consumption();
 }
 
 int64_t DeltaWriter::partition_id() const {
diff --git a/be/src/olap/delta_writer.h b/be/src/olap/delta_writer.h
index 0c11b8fcbe..1ce62de338 100644
--- a/be/src/olap/delta_writer.h
+++ b/be/src/olap/delta_writer.h
@@ -55,7 +55,8 @@ struct WriteRequest {
 // This class is NOT thread-safe, external synchronization is required.
 class DeltaWriter {
 public:
-    static Status open(WriteRequest* req, DeltaWriter** writer, bool is_vec = false);
+    static Status open(WriteRequest* req, DeltaWriter** writer,
+                       MemTrackerLimiter* parent_tracker = nullptr, bool is_vec = false);
 
     ~DeltaWriter();
 
@@ -100,7 +101,8 @@ public:
     int64_t get_mem_consumption_snapshot() const;
 
 private:
-    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, bool is_vec);
+    DeltaWriter(WriteRequest* req, StorageEngine* storage_engine, MemTrackerLimiter* parent_tracker,
+                bool is_vec);
 
     // push a full memtable to flush executor
     Status _flush_memtable_async();
@@ -120,7 +122,7 @@ private:
     RowsetSharedPtr _cur_rowset;
     std::unique_ptr<RowsetWriter> _rowset_writer;
     // TODO: Recheck the lifetime of _mem_table, Look should use unique_ptr
-    std::shared_ptr<MemTable> _mem_table;
+    std::unique_ptr<MemTable> _mem_table;
     std::unique_ptr<Schema> _schema;
     //const TabletSchema* _tablet_schema;
     // tablet schema owned by delta writer, all write will use this tablet schema
@@ -131,7 +133,8 @@ private:
 
     StorageEngine* _storage_engine;
     std::unique_ptr<FlushToken> _flush_token;
-    std::unique_ptr<MemTracker> _flushed_mem_tracker;
+    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+    MemTrackerLimiter* _parent_tracker;
 
     // The counter of number of segment flushed already.
     int64_t _segment_counter = 0;
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index a44c54c552..46607f6a6c 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -31,8 +31,7 @@ namespace doris {
 
 MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
                    const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
-                   KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* writer_mem_tracker,
-                   bool support_vec)
+                   KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec)
         : _tablet_id(tablet_id),
           _schema(schema),
           _tablet_schema(tablet_schema),
@@ -40,7 +39,6 @@ MemTable::MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet
           _keys_type(keys_type),
           _mem_tracker(std::make_unique<MemTracker>(
                   fmt::format("MemTable:tabletId={}", std::to_string(tablet_id)))),
-          _writer_mem_tracker(writer_mem_tracker),
           _buffer_mem_pool(new MemPool(_mem_tracker.get())),
           _table_mem_pool(new MemPool(_mem_tracker.get())),
           _schema_size(_schema->schema_size()),
@@ -134,7 +132,6 @@ MemTable::~MemTable() {
         }
     }
     std::for_each(_row_in_blocks.begin(), _row_in_blocks.end(), std::default_delete<RowInBlock>());
-    _writer_mem_tracker->release(_mem_tracker->consumption());
     _mem_tracker->release(_mem_usage);
     _buffer_mem_pool->free_all();
     _table_mem_pool->free_all();
diff --git a/be/src/olap/memtable.h b/be/src/olap/memtable.h
index d92e7773a9..9be1f7d33d 100644
--- a/be/src/olap/memtable.h
+++ b/be/src/olap/memtable.h
@@ -42,8 +42,7 @@ class MemTable {
 public:
     MemTable(int64_t tablet_id, Schema* schema, const TabletSchema* tablet_schema,
              const std::vector<SlotDescriptor*>* slot_descs, TupleDescriptor* tuple_desc,
-             KeysType keys_type, RowsetWriter* rowset_writer, MemTracker* writer_mem_tracker,
-             bool support_vec = false);
+             KeysType keys_type, RowsetWriter* rowset_writer, bool support_vec = false);
     ~MemTable();
 
     int64_t tablet_id() const { return _tablet_id; }
@@ -153,7 +152,6 @@ private:
     std::shared_ptr<RowInBlockComparator> _vec_row_comparator;
 
     std::unique_ptr<MemTracker> _mem_tracker;
-    MemTracker* _writer_mem_tracker;
     // This is a buffer, to hold the memory referenced by the rows that have not
     // been inserted into the SkipList
     std::unique_ptr<MemPool> _buffer_mem_pool;
diff --git a/be/src/olap/memtable_flush_executor.cpp b/be/src/olap/memtable_flush_executor.cpp
index 3e722019e4..b9e622a9a7 100644
--- a/be/src/olap/memtable_flush_executor.cpp
+++ b/be/src/olap/memtable_flush_executor.cpp
@@ -26,6 +26,30 @@
 
 namespace doris {
 
+class MemtableFlushTask final : public Runnable {
+public:
+    MemtableFlushTask(FlushToken* flush_token, std::unique_ptr<MemTable> memtable,
+                      int64_t submit_task_time, MemTrackerLimiter* tracker)
+            : _flush_token(flush_token),
+              _memtable(std::move(memtable)),
+              _submit_task_time(submit_task_time),
+              _tracker(tracker) {}
+
+    ~MemtableFlushTask() override = default;
+
+    void run() override {
+        SCOPED_ATTACH_TASK(_tracker, ThreadContext::TaskType::LOAD);
+        _flush_token->_flush_memtable(_memtable.get(), _submit_task_time);
+        _memtable.reset();
+    }
+
+private:
+    FlushToken* _flush_token;
+    std::unique_ptr<MemTable> _memtable;
+    int64_t _submit_task_time;
+    MemTrackerLimiter* _tracker;
+};
+
 std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
     os << "(flush time(ms)=" << stat.flush_time_ns / NANOS_PER_MILLIS
        << ", flush wait time(ms)=" << stat.flush_wait_time_ns / NANOS_PER_MILLIS
@@ -34,20 +58,15 @@ std::ostream& operator<<(std::ostream& os, const FlushStatistic& stat) {
     return os;
 }
 
-// The type of parameter is safe to be a reference. Because the function object
-// returned by std::bind() will increase the reference count of Memtable. i.e.,
-// after the submit() method returns, even if the caller immediately releases the
-// passed shared_ptr object, the Memtable object will not be destructed because
-// its reference count is not 0.
-Status FlushToken::submit(const std::shared_ptr<MemTable>& memtable) {
+Status FlushToken::submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker) {
     ErrorCode s = _flush_status.load();
     if (s != OLAP_SUCCESS) {
         return Status::OLAPInternalError(s);
     }
     int64_t submit_task_time = MonotonicNanos();
-    _flush_token->submit_func(
-            std::bind(&FlushToken::_flush_memtable, this, memtable, submit_task_time));
-    return Status::OK();
+    auto task = std::make_shared<MemtableFlushTask>(this, std::move(mem_table), submit_task_time,
+                                                    tracker);
+    return _flush_token->submit(std::move(task));
 }
 
 void FlushToken::cancel() {
@@ -60,21 +79,8 @@ Status FlushToken::wait() {
     return s == OLAP_SUCCESS ? Status::OK() : Status::OLAPInternalError(s);
 }
 
-void FlushToken::_flush_memtable(std::shared_ptr<MemTable> memtable, int64_t submit_task_time) {
-#ifndef BE_TEST
-    // The memtable mem tracker needs to be completely accurate,
-    // because DeltaWriter judges whether to flush memtable according to the memtable memory usage.
-    // The macro of attach thread mem tracker is affected by the destructuring order of local variables,
-    // so it cannot completely correspond to the number of new/delete bytes recorded in scoped,
-    // and there is a small range of errors. So direct track load mem tracker.
-    // TODO(zxy) After rethinking the use of switch thread mem tracker, choose the appropriate way to get
-    // load mem tracke here.
-    // DCHECK(memtable->mem_tracker()->parent_task_mem_tracker_no_own());
-    // SCOPED_ATTACH_TASK(ThreadContext::TaskType::LOAD,
-    //                           memtable->mem_tracker()->parent_task_mem_tracker_no_own());
-#endif
+void FlushToken::_flush_memtable(MemTable* memtable, int64_t submit_task_time) {
     _stats.flush_wait_time_ns += (MonotonicNanos() - submit_task_time);
-    SCOPED_CLEANUP({ memtable.reset(); });
     // If previous flush has failed, return directly
     if (_flush_status.load() != OLAP_SUCCESS) {
         return;
diff --git a/be/src/olap/memtable_flush_executor.h b/be/src/olap/memtable_flush_executor.h
index 36d5c21be8..6f986af3f5 100644
--- a/be/src/olap/memtable_flush_executor.h
+++ b/be/src/olap/memtable_flush_executor.h
@@ -31,6 +31,7 @@ class DataDir;
 class DeltaWriter;
 class ExecEnv;
 class MemTable;
+class MemTrackerLimiter;
 
 // the statistic of a certain flush handler.
 // use atomic because it may be updated by multi threads
@@ -56,7 +57,7 @@ public:
     explicit FlushToken(std::unique_ptr<ThreadPoolToken> flush_pool_token)
             : _flush_token(std::move(flush_pool_token)), _flush_status(OLAP_SUCCESS) {}
 
-    Status submit(const std::shared_ptr<MemTable>& mem_table);
+    Status submit(std::unique_ptr<MemTable> mem_table, MemTrackerLimiter* tracker);
 
     // error has happpens, so we cancel this token
     // And remove all tasks in the queue.
@@ -69,7 +70,9 @@ public:
     const FlushStatistic& get_stats() const { return _stats; }
 
 private:
-    void _flush_memtable(std::shared_ptr<MemTable> mem_table, int64_t submit_task_time);
+    friend class MemtableFlushTask;
+
+    void _flush_memtable(MemTable* mem_table, int64_t submit_task_time);
 
     std::unique_ptr<ThreadPoolToken> _flush_token;
 
diff --git a/be/src/olap/olap_server.cpp b/be/src/olap/olap_server.cpp
index 28e9241985..ef53eeaee4 100644
--- a/be/src/olap/olap_server.cpp
+++ b/be/src/olap/olap_server.cpp
@@ -111,14 +111,20 @@ Status StorageEngine::start_bg_threads() {
             scoped_refptr<Thread> path_scan_thread;
             RETURN_IF_ERROR(Thread::create(
                     "StorageEngine", "path_scan_thread",
-                    [this, data_dir]() { this->_path_scan_thread_callback(data_dir); },
+                    [this, data_dir]() {
+                        SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+                        this->_path_scan_thread_callback(data_dir);
+                    },
                     &path_scan_thread));
             _path_scan_threads.emplace_back(path_scan_thread);
 
             scoped_refptr<Thread> path_gc_thread;
             RETURN_IF_ERROR(Thread::create(
                     "StorageEngine", "path_gc_thread",
-                    [this, data_dir]() { this->_path_gc_thread_callback(data_dir); },
+                    [this, data_dir]() {
+                        SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
+                        this->_path_gc_thread_callback(data_dir);
+                    },
                     &path_gc_thread));
             _path_gc_threads.emplace_back(path_gc_thread);
         }
diff --git a/be/src/olap/storage_engine.cpp b/be/src/olap/storage_engine.cpp
index 0179f3f064..9a7c414971 100644
--- a/be/src/olap/storage_engine.cpp
+++ b/be/src/olap/storage_engine.cpp
@@ -122,6 +122,7 @@ StorageEngine::StorageEngine(const EngineOptions& options)
                   std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::BatchLoad")),
           _consistency_mem_tracker(
                   std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Consistency")),
+          _mem_tracker(std::make_unique<MemTrackerLimiter>(-1, "StorageEngine::Self")),
           _stop_background_threads_latch(1),
           _tablet_manager(new TabletManager(config::tablet_map_shard_size)),
           _txn_manager(new TxnManager(config::txn_map_shard_size, config::txn_shard_size)),
@@ -166,7 +167,8 @@ StorageEngine::~StorageEngine() {
 void StorageEngine::load_data_dirs(const std::vector<DataDir*>& data_dirs) {
     std::vector<std::thread> threads;
     for (auto data_dir : data_dirs) {
-        threads.emplace_back([data_dir] {
+        threads.emplace_back([this, data_dir] {
+            SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
             auto res = data_dir->load();
             if (!res.ok()) {
                 LOG(WARNING) << "io error when init load tables. res=" << res
@@ -217,7 +219,8 @@ Status StorageEngine::_init_store_map() {
         DataDir* store = new DataDir(path.path, path.capacity_bytes, path.storage_medium,
                                      _tablet_manager.get(), _txn_manager.get());
         tmp_stores.emplace_back(store);
-        threads.emplace_back([store, &error_msg_lock, &error_msg]() {
+        threads.emplace_back([this, store, &error_msg_lock, &error_msg]() {
+            SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::STORAGE);
             auto st = store->init();
             if (!st.ok()) {
                 {
diff --git a/be/src/olap/storage_engine.h b/be/src/olap/storage_engine.h
index df018e27d3..d508a617c5 100644
--- a/be/src/olap/storage_engine.h
+++ b/be/src/olap/storage_engine.h
@@ -346,6 +346,8 @@ private:
     std::unique_ptr<MemTrackerLimiter> _batch_load_mem_tracker;
     // Count the memory consumption of all EngineChecksumTask.
     std::unique_ptr<MemTrackerLimiter> _consistency_mem_tracker;
+    // StorageEngine oneself
+    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
 
     CountDownLatch _stop_background_threads_latch;
     scoped_refptr<Thread> _unused_rowset_monitor_thread;
diff --git a/be/src/runtime/disk_io_mgr.cc b/be/src/runtime/disk_io_mgr.cc
index f2c68b1e99..e56c4bff69 100644
--- a/be/src/runtime/disk_io_mgr.cc
+++ b/be/src/runtime/disk_io_mgr.cc
@@ -373,8 +373,8 @@ Status DiskIoMgr::init(const int64_t mem_limit) {
             ss << "work-loop(Disk: " << i << ", Thread: " << j << ")";
             // _disk_thread_group.AddThread(new Thread("disk-io-mgr", ss.str(),
             //             &DiskIoMgr::work_loop, this, _disk_queues[i]));
-            _disk_thread_group.add_thread(new std::thread(
-                    std::bind(&DiskIoMgr::work_loop, this, _disk_queues[i], _mem_tracker.get())));
+            _disk_thread_group.add_thread(
+                    new std::thread(std::bind(&DiskIoMgr::work_loop, this, _disk_queues[i])));
         }
     }
     _request_context_cache.reset(new RequestContextCache(this));
@@ -947,7 +947,7 @@ void DiskIoMgr::handle_read_finished(DiskQueue* disk_queue, RequestContext* read
     state.decrement_request_thread();
 }
 
-void DiskIoMgr::work_loop(DiskQueue* disk_queue, MemTrackerLimiter* mem_tracker) {
+void DiskIoMgr::work_loop(DiskQueue* disk_queue) {
     // The thread waits until there is work or the entire system is being shut down.
     // If there is work, performs the read or write requested and re-enqueues the
     // requesting context.
@@ -960,8 +960,7 @@ void DiskIoMgr::work_loop(DiskQueue* disk_queue, MemTrackerLimiter* mem_tracker)
     //   3. Perform the read or write as specified.
     // Cancellation checking needs to happen in both steps 1 and 3.
 
-    // tracked in the process tracker
-    // SCOPED_ATTACH_TASK(ThreadContext::TaskType::STORAGE, mem_tracker);
+    thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
     while (!_shut_down) {
         RequestContext* worker_context = nullptr;
         ;
diff --git a/be/src/runtime/disk_io_mgr.h b/be/src/runtime/disk_io_mgr.h
index 3279ac2559..9d0aa2f5ae 100644
--- a/be/src/runtime/disk_io_mgr.h
+++ b/be/src/runtime/disk_io_mgr.h
@@ -795,7 +795,7 @@ private:
     // Disk worker thread loop. This function retrieves the next range to process on
     // the disk queue and invokes read_range() or Write() depending on the type of Range().
     // There can be multiple threads per disk running this loop.
-    void work_loop(DiskQueue* queue, MemTrackerLimiter* mem_tracker);
+    void work_loop(DiskQueue* queue);
 
     // This is called from the disk thread to get the next range to process. It will
     // wait until a scan range and buffer are available, or a write range is available.
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index a35ae80ee3..8281375331 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -191,6 +191,7 @@ Status ExecEnv::_init_mem_tracker() {
     }
     _process_mem_tracker = new MemTrackerLimiter(global_memory_limit_bytes, "Process");
     thread_context()->_thread_mem_tracker_mgr->init();
+    thread_context()->_thread_mem_tracker_mgr->set_check_attach(false);
 #if defined(USE_MEM_TRACKER) && !defined(__SANITIZE_ADDRESS__) && !defined(ADDRESS_SANITIZER) && \
         !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER) && !defined(USE_JEMALLOC)
     if (doris::config::enable_tcmalloc_hook) {
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index c5fcfa97c4..7b16f853ce 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -477,6 +477,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
     std::string func_name {"PlanFragmentExecutor::_exec_actual"};
 #ifndef BE_TEST
     auto span = exec_state->executor()->runtime_state()->get_tracer()->StartSpan(func_name);
+    SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state());
 #else
     auto span = telemetry::get_noop_tracer()->StartSpan(func_name);
 #endif
@@ -493,9 +494,7 @@ void FragmentMgr::_exec_actual(std::shared_ptr<FragmentExecState> exec_state, Fi
             .query_id(exec_state->query_id())
             .instance_id(exec_state->fragment_instance_id())
             .tag("pthread_id", std::to_string((uintptr_t)pthread_self()));
-#ifndef BE_TEST
-    SCOPED_ATTACH_TASK(exec_state->executor()->runtime_state());
-#endif
+
     exec_state->execute();
 
     std::shared_ptr<QueryFragmentsCtx> fragments_ctx = exec_state->get_fragments_ctx();
diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 9a9d1c808f..e1fc0a3463 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -50,7 +50,6 @@ LoadChannel::~LoadChannel() {
 }
 
 Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
-    // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     int64_t index_id = params.index_id();
     std::shared_ptr<TabletsChannel> channel;
     {
@@ -61,7 +60,7 @@ Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
         } else {
             // create a new tablets channel
             TabletsChannelKey key(params.id(), index_id);
-            channel.reset(new TabletsChannel(key, _is_high_priority, _is_vec));
+            channel.reset(new TabletsChannel(key, _mem_tracker.get(), _is_high_priority, _is_vec));
             _tablets_channels.insert({index_id, channel});
         }
     }
@@ -137,7 +136,6 @@ bool LoadChannel::is_finished() {
 
 Status LoadChannel::cancel() {
     std::lock_guard<std::mutex> l(_lock);
-    // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     for (auto& it : _tablets_channels) {
         it.second->cancel();
     }
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 4137c7fafc..ad8a476fcf 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -129,7 +129,6 @@ private:
 template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
 Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
                               TabletWriterAddResult* response) {
-    // SCOPED_ATTACH_TASK(_mem_tracker.get(), ThreadContext::TaskType::LOAD);
     int64_t index_id = request.index_id();
     // 1. get tablets channel
     std::shared_ptr<TabletsChannel> channel;
diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index dec04a20f5..0f4e185e46 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -80,12 +80,14 @@ MemTracker::~MemTracker() {
 MemTracker* MemTracker::get_static_mem_tracker(const std::string& label) {
     // First time this label registered, make a new object, otherwise do nothing.
     // Avoid using locks to resolve erase conflicts.
+    MemTracker* tracker;
     _static_mem_trackers.lazy_emplace_l(
-            label, [&](MemTracker*) {},
+            label, [&](MemTracker* v) { tracker = v; },
             [&](const auto& ctor) {
-                ctor(label, new MemTracker(fmt::format("[Static]-{}", label)));
+                tracker = new MemTracker(fmt::format("[Static]-{}", label));
+                ctor(label, tracker);
             });
-    return _static_mem_trackers[label];
+    return tracker;
 }
 
 MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const {
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 4e4af1723d..9f6e021a3c 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -30,9 +30,9 @@ class MemTrackerLimiter;
 // MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER,
 // which will automatically track all memory usage of the code segment where it is located.
 //
-// MemTracker's father can only be MemTrackerLimiter, which is only used to print tree-like statistics.
-// Consuming MemTracker will not consume its father synchronously.
-// Usually, it is not necessary to specify the father. by default, the MemTrackerLimiter in the thread context
+// MemTracker's parent can only be MemTrackerLimiter, which is only used to print tree-like statistics.
+// Consuming MemTracker will not consume its parent synchronously.
+// Usually, it is not necessary to specify the parent. by default, the MemTrackerLimiter in the thread context
 // is used as the parent, which is specified when the thread starts.
 //
 // This class is thread-safe.
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 1f3fb89800..786ad945bf 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -119,7 +119,13 @@ public:
     Status try_gc_memory(int64_t bytes);
 
 public:
-    void consumption_revise(int64_t bytes) { _consumption->add(bytes); }
+    // It is used for revise mem tracker consumption.
+    // If the location of memory alloc and free is different, the consumption value of mem tracker will be inaccurate.
+    // But the consumption value of the process mem tracker is not affecte
+    void consumption_revise(int64_t bytes) {
+        DCHECK(_label != "Process");
+        _consumption->add(bytes);
+    }
 
     // Logs the usage of this tracker limiter and optionally its children (recursively).
     // If 'logged_consumption' is non-nullptr, sets the consumption value logged.
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 86f2976f18..a4831114cf 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -23,11 +23,6 @@
 
 namespace doris {
 
-// When MemTracker is a negative value, it is considered that a memory leak has occurred,
-// but the actual MemTracker records inaccurately will also cause a negative value,
-// so this feature is in the experimental stage.
-const bool QUERY_MEMORY_LEAK_DETECTION = false;
-
 MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id,
                                                                       int64_t mem_limit,
                                                                       const std::string& label,
@@ -37,15 +32,15 @@ MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std:
     // Combine new tracker and emplace into one operation to avoid the use of locks
     // Name for task MemTrackers. '$0' is replaced with the task id.
     bool new_emplace = _task_mem_trackers.lazy_emplace_l(
-            task_id, [&](MemTrackerLimiter*) {},
+            task_id, [&](std::shared_ptr<MemTrackerLimiter>) {},
             [&](const auto& ctor) {
-                ctor(task_id, new MemTrackerLimiter(mem_limit, label, parent));
+                ctor(task_id, std::make_unique<MemTrackerLimiter>(mem_limit, label, parent));
             });
     if (new_emplace) {
         LOG(INFO) << "Register query/load memory tracker, query/load id: " << task_id
                   << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
     }
-    return _task_mem_trackers[task_id];
+    return _task_mem_trackers[task_id].get();
 }
 
 MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id,
@@ -67,35 +62,35 @@ MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& t
     DCHECK(!task_id.empty());
     MemTrackerLimiter* tracker = nullptr;
     // Avoid using locks to resolve erase conflicts
-    _task_mem_trackers.if_contains(task_id, [&tracker](MemTrackerLimiter* v) { tracker = v; });
+    _task_mem_trackers.if_contains(
+            task_id, [&tracker](std::shared_ptr<MemTrackerLimiter> v) { tracker = v.get(); });
     return tracker;
 }
 
 void MemTrackerTaskPool::logout_task_mem_tracker() {
-    std::vector<std::string> expired_tasks;
-    for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) {
+    for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end();) {
         if (!it->second) {
-            // https://github.com/apache/incubator-doris/issues/10006
-            expired_tasks.emplace_back(it->first);
+            // Unknown exception case with high concurrency, after _task_mem_trackers.erase,
+            // the key still exists in _task_mem_trackers. https://github.com/apache/incubator-doris/issues/10006
+            _task_mem_trackers._erase(it++);
         } else if (it->second->remain_child_count() == 0 && it->second->had_child_count() != 0) {
             // No RuntimeState uses this task MemTracker, it is only referenced by this map,
             // and tracker was not created soon, delete it.
-            if (QUERY_MEMORY_LEAK_DETECTION && it->second->consumption() != 0) {
-                // If consumption is not equal to 0 before query mem tracker is destructed,
-                // there are two possibilities in theory.
-                // 1. A memory leak occurs.
-                // 2. Some of the memory consumed/released on the query mem tracker is actually released/consume on
-                // other trackers such as the process mem tracker, and there is no manual transfer between the two trackers.
-                //
-                // The second case should be eliminated in theory, but it has not been done so far, so the query memory leak
-                // cannot be located, and the value of the query pool mem tracker statistics will be inaccurate.
-                LOG(WARNING) << "Task memory tracker memory leak:" << it->second->debug_string();
-            }
+            //
+            // If consumption is not equal to 0 before query mem tracker is destructed,
+            // there are two possibilities in theory.
+            // 1. A memory leak occurs.
+            // 2. memory consumed on query mem tracker, released on other trackers, and no manual transfer
+            //  between the two trackers.
+            // At present, it is impossible to effectively locate which memory consume and release on different trackers,
+            // so query memory leaks cannot be found.
+            //
             // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers,
             // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is,
             // the negative number of the current value of consume.
             it->second->parent()->consumption_revise(-it->second->consumption());
-            expired_tasks.emplace_back(it->first);
+            LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first;
+            _task_mem_trackers._erase(it++);
         } else {
             // Log limit exceeded query tracker.
             if (it->second->limit_exceeded()) {
@@ -104,16 +99,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
                         fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first),
                         0, Status::OK());
             }
-        }
-    }
-    for (auto tid : expired_tasks) {
-        if (!_task_mem_trackers[tid]) {
-            _task_mem_trackers.erase(tid);
-            LOG(INFO) << "Deregister null query/load memory tracker, query/load id: " << tid;
-        } else {
-            delete _task_mem_trackers[tid];
-            _task_mem_trackers.erase(tid);
-            LOG(INFO) << "Deregister not used query/load memory tracker, query/load id: " << tid;
+            ++it;
         }
     }
 }
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h
index ae7d82caf4..4890d72713 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.h
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -24,9 +24,10 @@
 namespace doris {
 
 using TaskTrackersMap = phmap::parallel_flat_hash_map<
-        std::string, MemTrackerLimiter*, phmap::priv::hash_default_hash<std::string>,
-        phmap::priv::hash_default_eq<std::string>,
-        std::allocator<std::pair<const std::string, MemTrackerLimiter*>>, 12, std::mutex>;
+        std::string, std::shared_ptr<MemTrackerLimiter>,
+        phmap::priv::hash_default_hash<std::string>, phmap::priv::hash_default_eq<std::string>,
+        std::allocator<std::pair<const std::string, std::shared_ptr<MemTrackerLimiter>>>, 12,
+        std::mutex>;
 
 // Global task pool for query MemTrackers. Owned by ExecEnv.
 class MemTrackerTaskPool {
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index c129deb8ee..53a47202b1 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -29,6 +29,7 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
                                                  const TUniqueId& fragment_instance_id,
                                                  MemTrackerLimiter* mem_tracker) {
     DCHECK(mem_tracker);
+    flush_untracked_mem<false>();
     _task_id = task_id;
     _fragment_instance_id = fragment_instance_id;
     _exceed_cb.cancel_msg = cancel_msg;
@@ -36,6 +37,13 @@ void ThreadMemTrackerMgr::attach_limiter_tracker(const std::string& cancel_msg,
 }
 
 void ThreadMemTrackerMgr::detach_limiter_tracker() {
+#ifndef BE_TEST
+    // Unexpectedly, the runtime state is destructed before the end of the query sub-thread,
+    // (_hash_table_build_thread has appeared) which is not a graceful exit.
+    // consider replacing CHECK with a conditional statement and checking for runtime state survival.
+    CHECK(_task_id == "" ||
+          ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->get_task_mem_tracker(_task_id));
+#endif
     flush_untracked_mem<false>();
     _task_id = "";
     _fragment_instance_id = TUniqueId();
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index d32eaaf082..8ccb6f70b1 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -19,12 +19,18 @@
 
 #include <fmt/format.h>
 #include <parallel_hashmap/phmap.h>
+#include <service/brpc_conflict.h>
 
 #include "runtime/memory/mem_tracker.h"
 #include "runtime/memory/mem_tracker_limiter.h"
+// After brpc_conflict.h
+#include <bthread/bthread.h>
 
 namespace doris {
 
+extern bthread_key_t btls_key;
+static const bthread_key_t EMPTY_BTLS_KEY = {0, 0};
+
 using ExceedCallBack = void (*)();
 struct MemExceedCallBackInfo {
     std::string cancel_msg;
@@ -113,6 +119,7 @@ public:
     MemTrackerLimiter* limiter_mem_tracker() { return _limiter_tracker; }
 
     void set_check_limit(bool check_limit) { _check_limit = check_limit; }
+    void set_check_attach(bool check_attach) { _check_attach = check_attach; }
 
     std::string print_debug_string() {
         fmt::memory_buffer consumer_tracker_buf;
@@ -144,6 +151,7 @@ private:
     bool _check_limit = false;
     // If there is a memory new/delete operation in the consume method, it may enter infinite recursion.
     bool _stop_consume = false;
+    bool _check_attach = true;
     std::string _task_id;
     TUniqueId _fragment_instance_id;
     MemExceedCallBackInfo _exceed_cb;
@@ -195,6 +203,16 @@ inline void ThreadMemTrackerMgr::flush_untracked_mem() {
     _stop_consume = true;
     DCHECK(_limiter_tracker);
     if (CheckLimit) {
+#ifndef BE_TEST
+        // When all threads are started, `attach_limiter_tracker` is expected to be called to bind the limiter tracker.
+        // If _check_attach is true and it is not in the brpc server (the protobuf will be operated when bthread is started),
+        // it will check whether the tracker label is equal to the default "Process" when flushing.
+        // If you do not want this check, set_check_attach=true
+        // TODO(zxy) The current p0 test cannot guarantee that all threads are checked,
+        // so disable it and try to open it when memory tracking is not on time.
+        DCHECK(!_check_attach || btls_key != EMPTY_BTLS_KEY ||
+               _limiter_tracker->label() != "Process");
+#endif
         Status st = _limiter_tracker->try_consume(_untracked_mem);
         if (!st) {
             // The memory has been allocated, so when TryConsume fails, need to continue to complete
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index f35ffec2b7..043f4c7eab 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -30,12 +30,15 @@ DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(tablet_writer_count, MetricUnit::NOUNIT);
 
 std::atomic<uint64_t> TabletsChannel::_s_tablet_writer_count;
 
-TabletsChannel::TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec)
+TabletsChannel::TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker,
+                               bool is_high_priority, bool is_vec)
         : _key(key),
           _state(kInitialized),
           _closed_senders(64),
           _is_high_priority(is_high_priority),
           _is_vec(is_vec) {
+    _mem_tracker = std::make_unique<MemTrackerLimiter>(
+            -1, fmt::format("TabletsChannel#indexID={}", key.index_id), parent_tracker);
     static std::once_flag once_flag;
     std::call_once(once_flag, [] {
         REGISTER_HOOK_METRIC(tablet_writer_count, [&]() { return _s_tablet_writer_count.load(); });
@@ -207,14 +210,6 @@ Status TabletsChannel::reduce_mem_usage(int64_t mem_limit) {
     return Status::OK();
 }
 
-int64_t TabletsChannel::mem_consumption() {
-    int64_t mem_usage = 0;
-    for (auto& it : _tablet_writers) {
-        mem_usage += it.second->mem_consumption();
-    }
-    return mem_usage;
-}
-
 Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request) {
     std::vector<SlotDescriptor*>* index_slots = nullptr;
     int32_t schema_hash = 0;
@@ -245,7 +240,7 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
         wrequest.ptable_schema_param = request.schema();
 
         DeltaWriter* writer = nullptr;
-        auto st = DeltaWriter::open(&wrequest, &writer, _is_vec);
+        auto st = DeltaWriter::open(&wrequest, &writer, _mem_tracker.get(), _is_vec);
         if (!st.ok()) {
             std::stringstream ss;
             ss << "open delta writer failed, tablet_id=" << tablet.tablet_id()
diff --git a/be/src/runtime/tablets_channel.h b/be/src/runtime/tablets_channel.h
index bd7504cd2f..318dd879a4 100644
--- a/be/src/runtime/tablets_channel.h
+++ b/be/src/runtime/tablets_channel.h
@@ -60,7 +60,8 @@ class OlapTableSchemaParam;
 // Write channel for a particular (load, index).
 class TabletsChannel {
 public:
-    TabletsChannel(const TabletsChannelKey& key, bool is_high_priority, bool is_vec);
+    TabletsChannel(const TabletsChannelKey& key, MemTrackerLimiter* parent_tracker,
+                   bool is_high_priority, bool is_vec);
 
     ~TabletsChannel();
 
@@ -88,7 +89,7 @@ public:
     // no-op when this channel has been closed or cancelled
     Status reduce_mem_usage(int64_t mem_limit);
 
-    int64_t mem_consumption();
+    int64_t mem_consumption() const { return _mem_tracker->consumption(); }
 
 private:
     template <typename Request>
@@ -143,6 +144,8 @@ private:
 
     static std::atomic<uint64_t> _s_tablet_writer_count;
 
+    std::unique_ptr<MemTrackerLimiter> _mem_tracker;
+
     bool _is_high_priority = false;
 
     bool _is_vec = false;
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index 5bfa58fa39..6071defe64 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -37,25 +37,6 @@ AttachTask::AttachTask(MemTrackerLimiter* mem_tracker, const ThreadContext::Task
 #endif
 }
 
-// AttachTask::AttachTask(const TQueryType::type& query_type,
-//                                    MemTrackerLimiter* mem_tracker) {
-//     DCHECK(mem_tracker);
-// #ifdef USE_MEM_TRACKER
-//     thread_context()->attach_task(query_to_task_type(query_type), "", TUniqueId(), mem_tracker);
-// #endif
-// }
-
-// AttachTask::AttachTask(const TQueryType::type& query_type,
-//                                    MemTrackerLimiter* mem_tracker, const std::string& task_id,
-//                                    const TUniqueId& fragment_instance_id) {
-//     DCHECK(task_id != "");
-//     DCHECK(fragment_instance_id != TUniqueId());
-//     DCHECK(mem_tracker);
-// #ifdef USE_MEM_TRACKER
-//     thread_context()->attach_task(query_to_task_type(query_type), task_id, fragment_instance_id, mem_tracker);
-// #endif
-// }
-
 AttachTask::AttachTask(RuntimeState* runtime_state) {
 #ifndef BE_TEST
     DCHECK(print_id(runtime_state->query_id()) != "");
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index f1439d1eea..2c14929432 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -43,7 +43,6 @@ class TUniqueId;
 class ThreadContext;
 
 extern bthread_key_t btls_key;
-static const bthread_key_t EMPTY_BTLS_KEY = {0, 0};
 
 // Using gcc11 compiles thread_local variable on lower versions of GLIBC will report an error,
 // see https://github.com/apache/doris/pull/7911
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index c68add8a76..a78bb801b1 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -473,9 +473,7 @@ int main(int argc, char** argv) {
 #endif
         doris::PerfCounters::refresh_proc_status();
 
-        // TODO(zxy) 10s is too long to clear the expired task mem tracker.
-        // A query mem tracker is about 57 bytes, assuming 10000 qps, which wastes about 55M of memory.
-        // It should be actively triggered at the end of query/load.
+        // 1s clear the expired task mem tracker, a query mem tracker is about 57 bytes.
         doris::ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->logout_task_mem_tracker();
         sleep(1);
     }
diff --git a/be/src/vec/exec/vbroker_scan_node.cpp b/be/src/vec/exec/vbroker_scan_node.cpp
index 0a85a60aa7..34cc7ddf26 100644
--- a/be/src/vec/exec/vbroker_scan_node.cpp
+++ b/be/src/vec/exec/vbroker_scan_node.cpp
@@ -109,6 +109,7 @@ Status VBrokerScanNode::start_scanners() {
 Status VBrokerScanNode::get_next(RuntimeState* state, vectorized::Block* block, bool* eos) {
     INIT_AND_SCOPE_GET_NEXT_SPAN(state->get_tracer(), _get_next_span, "VBrokerScanNode::get_next");
     SCOPED_TIMER(_runtime_profile->total_time_counter());
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // check if CANCELLED.
     if (state->is_cancelled()) {
         std::unique_lock<std::mutex> l(_batch_queue_lock);
@@ -275,6 +276,8 @@ Status VBrokerScanNode::scanner_scan(const TBrokerScanRange& scan_range, Scanner
 
 void VBrokerScanNode::scanner_worker(int start_idx, int length) {
     START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VBrokerScanNode::scanner_worker");
+    SCOPED_ATTACH_TASK(_runtime_state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     Thread::set_self_name("vbroker_scanner");
     Status status = Status::OK();
     ScannerCounter counter;
diff --git a/be/src/vec/exec/ves_http_scan_node.cpp b/be/src/vec/exec/ves_http_scan_node.cpp
index 3029da4f8c..3a9a1ef673 100644
--- a/be/src/vec/exec/ves_http_scan_node.cpp
+++ b/be/src/vec/exec/ves_http_scan_node.cpp
@@ -385,6 +385,7 @@ void VEsHttpScanNode::debug_string(int ident_level, std::stringstream* out) cons
 void VEsHttpScanNode::scanner_worker(int start_idx, int length, std::promise<Status>& p_status) {
     START_AND_SCOPE_SPAN(_runtime_state->get_tracer(), span, "VEsHttpScanNode::scanner_worker");
     SCOPED_ATTACH_TASK(_runtime_state);
+    SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
     // Clone expr context
     std::vector<ExprContext*> scanner_expr_ctxs;
     DCHECK(start_idx < length);
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 048395eec5..1dbcc05daa 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -141,6 +141,7 @@ Status VNodeChannel::open_wait() {
 }
 
 Status VNodeChannel::add_row(const BlockRow& block_row, int64_t tablet_id) {
+    SCOPED_CONSUME_MEM_TRACKER(_node_channel_tracker.get());
     // If add_row() when _eos_is_produced==true, there must be sth wrong, we can only mark this channel as failed.
     auto st = none_of({_cancelled, _eos_is_produced});
     if (!st.ok()) {
diff --git a/be/test/olap/delta_writer_test.cpp b/be/test/olap/delta_writer_test.cpp
index 33c4a48737..3126b30f94 100644
--- a/be/test/olap/delta_writer_test.cpp
+++ b/be/test/olap/delta_writer_test.cpp
@@ -398,7 +398,7 @@ TEST_F(TestDeltaWriter, open) {
     SAFE_DELETE(delta_writer);
 
     // test vec delta writer
-    DeltaWriter::open(&write_req, &delta_writer, true);
+    DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
     EXPECT_NE(delta_writer, nullptr);
     res = delta_writer->close();
     EXPECT_EQ(Status::OK(), res);
@@ -551,7 +551,7 @@ TEST_F(TestDeltaWriter, vec_write) {
     WriteRequest write_req = {10004, 270068376, WriteType::LOAD, 20002,
                               30002, load_id,   tuple_desc,      &(tuple_desc->slots())};
     DeltaWriter* delta_writer = nullptr;
-    DeltaWriter::open(&write_req, &delta_writer, true);
+    DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
     ASSERT_NE(delta_writer, nullptr);
 
     MemPool pool;
@@ -764,7 +764,7 @@ TEST_F(TestDeltaWriter, vec_sequence_col) {
     WriteRequest write_req = {10005, 270068377, WriteType::LOAD, 20003,
                               30003, load_id,   tuple_desc,      &(tuple_desc->slots())};
     DeltaWriter* delta_writer = nullptr;
-    DeltaWriter::open(&write_req, &delta_writer, true);
+    DeltaWriter::open(&write_req, &delta_writer, nullptr, true);
     ASSERT_NE(delta_writer, nullptr);
 
     MemPool pool;


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org