You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/30 10:34:00 UTC

[doris] branch master updated: weak relationship between MemTracker and MemTrackerLimiter (#11347)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 18864ab7fe weak relationship between MemTracker and MemTrackerLimiter (#11347)
18864ab7fe is described below

commit 18864ab7fe1c9982f907466cc8b0567a37134074
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Sat Jul 30 18:33:54 2022 +0800

    weak relationship between MemTracker and MemTrackerLimiter (#11347)
---
 be/src/exec/exec_node.cpp                        |   2 +-
 be/src/olap/memtable.cpp                         |   4 +-
 be/src/olap/schema_change.cpp                    |   6 +-
 be/src/runtime/bufferpool/reservation_tracker.cc |   2 +-
 be/src/runtime/data_stream_recvr.cc              |   2 +-
 be/src/runtime/data_stream_sender.cpp            |   2 +-
 be/src/runtime/memory/mem_tracker.cpp            | 106 ++++++++++++-----------
 be/src/runtime/memory/mem_tracker.h              |  48 ++++------
 be/src/runtime/memory/mem_tracker_limiter.cpp    |  53 +++++-------
 be/src/runtime/memory/mem_tracker_limiter.h      |  23 +++--
 be/src/runtime/memory/mem_tracker_task_pool.cpp  |  14 ++-
 be/src/runtime/memory/thread_mem_tracker_mgr.h   |   3 +-
 be/src/runtime/plan_fragment_executor.cpp        |   2 +
 be/src/runtime/runtime_filter_mgr.cpp            |   4 +-
 be/src/runtime/runtime_filter_mgr.h              |   2 +-
 be/src/runtime/runtime_state.cpp                 |   3 -
 be/src/vec/exec/join/vhash_join_node.cpp         |   2 +-
 be/src/vec/runtime/vdata_stream_recvr.cpp        |   5 +-
 be/src/vec/sink/vdata_stream_sender.cpp          |   2 +-
 19 files changed, 139 insertions(+), 146 deletions(-)

diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 97afb5fa05..0a854ba89b 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -206,7 +206,7 @@ Status ExecNode::prepare(RuntimeState* state) {
             std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
                                runtime_profile()->total_time_counter()),
             "");
-    _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(), nullptr,
+    _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(),
                                                 _runtime_profile.get());
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index a1ade071d0..a9675fba7d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -144,7 +144,9 @@ MemTable::~MemTable() {
     _mem_tracker->release(_mem_usage);
     _buffer_mem_pool->free_all();
     _table_mem_pool->free_all();
-    _mem_tracker->memory_leak_check();
+    DCHECK_EQ(_mem_tracker->consumption(), 0)
+            << std::endl
+            << MemTracker::log_usage(_mem_tracker->make_snapshot(0));
 }
 
 MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {}
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ee9653203f..ac203e858f 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1364,10 +1364,8 @@ VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_bl
         : _changer(row_block_changer),
           _memory_limitation(memory_limitation),
           _temp_delta_versions(Version::mock()) {
-    _mem_tracker =
-            std::make_unique<MemTracker>(fmt::format("VSchemaChangeWithSorting:changer={}",
-                                                     std::to_string(int64(&row_block_changer))),
-                                         StorageEngine::instance()->schema_change_mem_tracker());
+    _mem_tracker = std::make_unique<MemTracker>(fmt::format(
+            "VSchemaChangeWithSorting:changer={}", std::to_string(int64(&row_block_changer))));
 }
 
 Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
diff --git a/be/src/runtime/bufferpool/reservation_tracker.cc b/be/src/runtime/bufferpool/reservation_tracker.cc
index 6bcd0bf38a..6985edaef7 100644
--- a/be/src/runtime/bufferpool/reservation_tracker.cc
+++ b/be/src/runtime/bufferpool/reservation_tracker.cc
@@ -73,7 +73,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile, ReservationTr
         MemTracker* parent_mem_tracker = GetParentMemTracker();
         if (parent_mem_tracker != nullptr) {
             // Make sure the parent links of the MemTrackers correspond to our parent links.
-            DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
+            // DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
         } else {
             // Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
             // shouldn't have a MemTracker.
diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc
index ffd20c58d7..0d456ca74a 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -461,7 +461,7 @@ DataStreamRecvr::DataStreamRecvr(
           _profile(profile),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
     _mem_tracker = std::make_unique<MemTracker>(
-            "DataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile);
+            "DataStreamRecvr:" + print_id(_fragment_instance_id), _profile);
 
     // Create one queue per sender if is_merging is true.
     int num_queues = is_merging ? num_senders : 1;
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index 2731587e45..13b2308d7b 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -403,7 +403,7 @@ Status DataStreamSender::prepare(RuntimeState* state) {
     _profile = _pool->add(new RuntimeProfile(title.str()));
     SCOPED_TIMER(_profile->total_time_counter());
     _mem_tracker = std::make_unique<MemTracker>(
-            "DataStreamSender:" + print_id(state->fragment_instance_id()), nullptr, _profile);
+            "DataStreamSender:" + print_id(state->fragment_instance_id()), _profile);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index 0f4e185e46..ca7e4943ff 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -21,37 +21,28 @@
 #include "runtime/memory/mem_tracker.h"
 
 #include <fmt/format.h>
-#include <parallel_hashmap/phmap.h>
 
-#include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/thread_context.h"
 #include "util/pretty_printer.h"
+#include "util/string_util.h"
 #include "util/time.h"
 
 namespace doris {
 
 const std::string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
 
-using StaticTrackersMap = phmap::parallel_flat_hash_map<
-        std::string, MemTracker*, phmap::priv::hash_default_hash<std::string>,
-        phmap::priv::hash_default_eq<std::string>,
-        std::allocator<std::pair<const std::string, MemTracker*>>, 12, std::mutex>;
+struct TrackerGroup {
+    std::list<MemTracker*> trackers;
+    std::mutex group_lock;
+};
 
-static StaticTrackersMap _static_mem_trackers;
+// Save all MemTrackers in use to maintain the weak relationship between MemTracker and MemTrackerLimiter.
+// When MemTrackerLimiter prints statistics, all MemTracker statistics with weak relationship will be printed together.
+// Each group corresponds to several MemTrackerLimiters and has a lock.
+// Multiple groups are used to reduce the impact of locks.
+static std::vector<TrackerGroup> mem_tracker_pool(1000);
 
-MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile,
-                       bool is_limiter) {
-    // Do not check limit exceed when add_child_tracker, otherwise it will cause deadlock when log_usage is called.
-    STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
-    _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
-    DCHECK(_parent || label == "Process");
-    if (_parent && _parent->label().find("queryId=") != _parent->label().npos) {
-        // Add the queryId suffix to the tracker below the query.
-        _label = fmt::format("{}#{}", label,
-                             _parent->label().substr(_parent->label().find("queryId="), -1));
-    } else {
-        _label = label;
-    }
+MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, bool is_limiter) {
     if (profile == nullptr) {
         _consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
     } else {
@@ -66,36 +57,43 @@ MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent, Runt
         // release().
         _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
     }
+
     _is_limiter = is_limiter;
-    if (_parent && !_is_limiter) _parent->add_child(this);
-}
+    if (!_is_limiter) {
+        if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
+            _label = fmt::format(
+                    "{} | {}", label,
+                    thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
+        } else {
+            _label = label + " | ";
+        }
 
-MemTracker::~MemTracker() {
-    if (_parent && !_is_limiter) _parent->remove_child(this);
+        _bind_group_num =
+                thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
+        {
+            std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
+            _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
+                    mem_tracker_pool[_bind_group_num].trackers.end(), this);
+        }
+    } else {
+        _label = label;
+    }
 }
 
-// Count the memory in the scope to a temporary tracker with the specified label name.
-// This is very useful when debugging. You can find the position where the tracker statistics are
-// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots.
-MemTracker* MemTracker::get_static_mem_tracker(const std::string& label) {
-    // First time this label registered, make a new object, otherwise do nothing.
-    // Avoid using locks to resolve erase conflicts.
-    MemTracker* tracker;
-    _static_mem_trackers.lazy_emplace_l(
-            label, [&](MemTracker* v) { tracker = v; },
-            [&](const auto& ctor) {
-                tracker = new MemTracker(fmt::format("[Static]-{}", label));
-                ctor(label, tracker);
-            });
-    return tracker;
+MemTracker::~MemTracker() {
+    if (!_is_limiter) {
+        std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
+        if (_tracker_group_it != mem_tracker_pool[_bind_group_num].trackers.end()) {
+            mem_tracker_pool[_bind_group_num].trackers.erase(_tracker_group_it);
+            _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.end();
+        }
+    }
 }
 
 MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const {
     Snapshot snapshot;
-    snapshot.label = _label;
-    if (_parent != nullptr) {
-        snapshot.parent = _parent->label();
-    }
+    snapshot.label = split(_label, " | ")[0];
+    snapshot.parent = split(_label, " | ")[1];
     snapshot.level = level;
     snapshot.limit = -1;
     snapshot.cur_consumption = _consumption->current_value();
@@ -104,15 +102,21 @@ MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const {
     return snapshot;
 }
 
-std::string MemTracker::log_usage() {
-    // Make sure the consumption is up to date.
-    int64_t curr_consumption = consumption();
-    int64_t peak_consumption = _consumption->value();
-    if (curr_consumption == 0) return "";
-    std::string detail = "MemTracker Label={}, Total={}, Peak={}";
-    detail = fmt::format(detail, _label, PrettyPrinter::print(curr_consumption, TUnit::BYTES),
-                         PrettyPrinter::print(peak_consumption, TUnit::BYTES));
-    return detail;
+void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshots, size_t level,
+                                     int64_t group_num, std::string related_label) {
+    std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
+    for (auto tracker : mem_tracker_pool[group_num].trackers) {
+        if (split(tracker->label(), " | ")[1] == related_label) {
+            snapshots->push_back(tracker->make_snapshot(level));
+        }
+    }
+}
+
+std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
+    return fmt::format("MemTracker Label={}, Parent Label={}, Used={}, Peak={}", snapshot.label,
+                       snapshot.parent,
+                       PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
+                       PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES));
 }
 
 } // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 9f6e021a3c..4a12375344 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -23,23 +23,18 @@
 
 namespace doris {
 
-class MemTrackerLimiter;
-
 // Used to track memory usage.
 //
 // MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER,
 // which will automatically track all memory usage of the code segment where it is located.
 //
-// MemTracker's parent can only be MemTrackerLimiter, which is only used to print tree-like statistics.
-// Consuming MemTracker will not consume its parent synchronously.
-// Usually, it is not necessary to specify the parent. by default, the MemTrackerLimiter in the thread context
-// is used as the parent, which is specified when the thread starts.
-//
 // This class is thread-safe.
 class MemTracker {
 public:
     struct Snapshot {
         std::string label;
+        // For MemTracker, it is only weakly related to parent through label, ensuring MemTracker Independence.
+        // For MemTrackerLimiter, it is strongly related to parent and saves pointer objects to each other.
         std::string parent = "";
         size_t level = 0;
         int64_t limit = 0;
@@ -48,24 +43,18 @@ public:
         size_t child_count = 0;
     };
 
-    // Creates and adds the tracker to the tree.
-    MemTracker(const std::string& label = std::string(), MemTrackerLimiter* parent = nullptr,
-               RuntimeProfile* profile = nullptr, bool is_limiter = false);
+    // Creates and adds the tracker to the mem_tracker_pool.
+    MemTracker(const std::string& label = std::string(), RuntimeProfile* profile = nullptr,
+               bool is_limiter = false);
 
     ~MemTracker();
 
-    // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get.
-    // Temporary trackers are not automatically destructed, which is usually used for debugging.
-    static MemTracker* get_static_mem_tracker(const std::string& label);
-
 public:
     const std::string& label() const { return _label; }
-    MemTrackerLimiter* parent() const { return _parent; }
     // Returns the memory consumed in bytes.
     int64_t consumption() const { return _consumption->current_value(); }
     int64_t peak_consumption() const { return _consumption->value(); }
 
-public:
     void consume(int64_t bytes);
     void release(int64_t bytes) { consume(-bytes); }
     // Transfer 'bytes' of consumption from this tracker to 'dst'.
@@ -78,17 +67,12 @@ public:
         return limit >= 0 && limit > consumption() + bytes;
     }
 
-    // Usually, a negative values means that the statistics are not accurate,
-    // 1. The released memory is not consumed.
-    // 2. The same block of memory, tracker A calls consume, and tracker B calls release.
-    // 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker,
-    //    after the release is called on the parent MemTracker,
-    //    the child ~MemTracker will cause repeated releases.
-    void memory_leak_check() { DCHECK_EQ(consumption(), 0) << std::endl << log_usage(); }
-
     Snapshot make_snapshot(size_t level) const;
-
-    std::string log_usage();
+    // Specify group_num from mem_tracker_pool to generate snapshot, requiring tracker.label to be related
+    // with parameter related_label
+    static void make_group_snapshot(std::vector<Snapshot>* snapshots, size_t level,
+                                    int64_t group_num, std::string related_label);
+    static std::string log_usage(MemTracker::Snapshot snapshot);
 
     std::string debug_string() {
         std::stringstream msg;
@@ -98,20 +82,22 @@ public:
         return msg.str();
     }
 
-    // Iterator into parent_->_child_trackers for this object. Stored to have O(1) remove.
-    std::list<MemTracker*>::iterator _child_tracker_it;
-
     static const std::string COUNTER_NAME;
 
 protected:
-    // label used in the usage string (log_usage())
+    // label used in the make snapshot, not guaranteed unique.
     std::string _label;
 
     std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _consumption; // in bytes
 
-    MemTrackerLimiter* _parent; // The parent of this tracker.
+    // Tracker is located in group num in mem_tracker_pool
+    int64_t _bind_group_num;
 
+    // Whether is a MemTrackerLimiter
     bool _is_limiter;
+
+    // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove.
+    std::list<MemTracker*>::iterator _tracker_group_it;
 };
 
 inline void MemTracker::consume(int64_t bytes) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 4bdc2a2781..2223fce5a5 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -30,10 +30,14 @@ namespace doris {
 
 MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& label,
                                      MemTrackerLimiter* parent, RuntimeProfile* profile)
-        : MemTracker(label, parent, profile, true) {
-    // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors
+        : MemTracker(label, profile, true) {
     DCHECK_GE(byte_limit, -1);
     _limit = byte_limit;
+    _group_num = GetCurrentTimeMicros() % 1000;
+    _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
+    DCHECK(_parent || label == "Process");
+
+    // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors
     MemTrackerLimiter* tracker = this;
     while (tracker != nullptr) {
         _all_ancestors.push_back(tracker);
@@ -59,12 +63,6 @@ void MemTrackerLimiter::add_child(MemTrackerLimiter* tracker) {
     _had_child_count++;
 }
 
-void MemTrackerLimiter::add_child(MemTracker* tracker) {
-    std::lock_guard<std::mutex> l(_child_tracker_lock);
-    tracker->_child_tracker_it = _child_trackers.insert(_child_trackers.end(), tracker);
-    _had_child_count++;
-}
-
 void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) {
     std::lock_guard<std::mutex> l(_child_tracker_limiter_lock);
     if (tracker->_child_tracker_it != _child_tracker_limiters.end()) {
@@ -73,19 +71,21 @@ void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) {
     }
 }
 
-void MemTrackerLimiter::remove_child(MemTracker* tracker) {
-    std::lock_guard<std::mutex> l(_child_tracker_lock);
-    if (tracker->_child_tracker_it != _child_trackers.end()) {
-        _child_trackers.erase(tracker->_child_tracker_it);
-        tracker->_child_tracker_it = _child_trackers.end();
-    }
+MemTracker::Snapshot MemTrackerLimiter::make_snapshot(size_t level) const {
+    Snapshot snapshot;
+    snapshot.label = _label;
+    snapshot.parent = _parent != nullptr ? _parent->label() : "Root";
+    snapshot.level = level;
+    snapshot.limit = _limit;
+    snapshot.cur_consumption = _consumption->current_value();
+    snapshot.peak_consumption = _consumption->value();
+    snapshot.child_count = remain_child_count();
+    return snapshot;
 }
 
 void MemTrackerLimiter::make_snapshot(std::vector<MemTracker::Snapshot>* snapshots,
                                       size_t cur_level, size_t upper_level) const {
-    Snapshot snapshot = MemTracker::make_snapshot(cur_level);
-    snapshot.limit = _limit;
-    snapshot.child_count = remain_child_count();
+    Snapshot snapshot = MemTrackerLimiter::make_snapshot(cur_level);
     (*snapshots).emplace_back(snapshot);
     if (cur_level < upper_level) {
         {
@@ -94,12 +94,7 @@ void MemTrackerLimiter::make_snapshot(std::vector<MemTracker::Snapshot>* snapsho
                 child->make_snapshot(snapshots, cur_level + 1, upper_level);
             }
         }
-        {
-            std::lock_guard<std::mutex> l(_child_tracker_lock);
-            for (const auto& child : _child_trackers) {
-                (*snapshots).emplace_back(child->make_snapshot(cur_level + 1));
-            }
-        }
+        MemTracker::make_group_snapshot(snapshots, cur_level + 1, _group_num, _label);
     }
 }
 
@@ -183,8 +178,7 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge
     int64_t peak_consumption = _consumption->value();
     if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
 
-    std::string detail =
-            "MemTrackerLimiter log_usage Label={}, Limit={}, Total={}, Peak={}, Exceeded={}";
+    std::string detail = "MemTrackerLimiter Label={}, Limit={}, Used={}, Peak={}, Exceeded={}";
     detail = fmt::format(detail, _label, PrettyPrinter::print(_limit, TUnit::BYTES),
                          PrettyPrinter::print(curr_consumption, TUnit::BYTES),
                          PrettyPrinter::print(peak_consumption, TUnit::BYTES),
@@ -201,11 +195,10 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge
         child_trackers_usage =
                 log_usage(max_recursive_depth - 1, _child_tracker_limiters, &child_consumption);
     }
-    {
-        std::lock_guard<std::mutex> l(_child_tracker_lock);
-        for (const auto& child : _child_trackers) {
-            child_trackers_usage += "\n" + child->log_usage();
-        }
+    std::vector<MemTracker::Snapshot> snapshots;
+    MemTracker::make_group_snapshot(&snapshots, 0, _group_num, _label);
+    for (const auto& snapshot : snapshots) {
+        child_trackers_usage += MemTracker::log_usage(snapshot);
     }
     if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage;
     return detail;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 3852cbe52d..5c41ce7cda 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -45,19 +45,21 @@ public:
     MemTrackerLimiter(int64_t byte_limit = -1, const std::string& label = std::string(),
                       MemTrackerLimiter* parent = nullptr, RuntimeProfile* profile = nullptr);
 
+    // If the final consumption is not as expected, this usually means that the same memory is calling
+    // consume and release on different trackers. If the two trackers have a parent-child relationship,
+    // the parent tracker consumption is correct, and the child tracker is wrong; if the two trackers have
+    // no parent-child relationship, the two tracker consumptions are wrong.
     ~MemTrackerLimiter();
 
+    MemTrackerLimiter* parent() const { return _parent; }
+
     void add_child(MemTrackerLimiter* tracker);
-    void add_child(MemTracker* tracker);
     void remove_child(MemTrackerLimiter* tracker);
-    void remove_child(MemTracker* tracker);
 
-    // Leaf tracker, without any child
-    size_t remain_child_count() const {
-        return _child_tracker_limiters.size() + _child_trackers.size();
-    }
+    size_t remain_child_count() const { return _child_tracker_limiters.size(); }
     size_t had_child_count() const { return _had_child_count; }
 
+    Snapshot make_snapshot(size_t level) const;
     // Returns a list of all the valid tracker snapshots.
     void make_snapshot(std::vector<MemTracker::Snapshot>* snapshots, size_t cur_level,
                        size_t upper_level) const;
@@ -76,6 +78,7 @@ public:
         return Status::OK();
     }
 
+    int64_t group_num() const { return _group_num; }
     bool has_limit() const { return _limit >= 0; }
     int64_t limit() const { return _limit; }
     void update_limit(int64_t limit) {
@@ -181,6 +184,11 @@ private:
     // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
     int64_t _limit;
 
+    // Group number in MemTracker::mem_tracker_pool, generated by the timestamp.
+    int64_t _group_num;
+
+    MemTrackerLimiter* _parent; // The parent of this tracker.
+
     // this tracker limiter plus all of its ancestors
     std::vector<MemTrackerLimiter*> _all_ancestors;
     // _all_ancestors with valid limits
@@ -192,9 +200,6 @@ private:
     mutable std::mutex _child_tracker_limiter_lock;
     std::list<MemTrackerLimiter*> _child_tracker_limiters;
 
-    mutable std::mutex _child_tracker_lock;
-    std::list<MemTracker*> _child_trackers;
-
     // The number of child trackers that have been added.
     std::atomic_size_t _had_child_count = 0;
 
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 02b38acdb5..24a8c95180 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -68,11 +68,12 @@ MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& t
 }
 
 void MemTrackerTaskPool::logout_task_mem_tracker() {
-    for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end();) {
+    std::vector<std::string> expired_task_ids;
+    for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) {
         if (!it->second) {
             // Unknown exception case with high concurrency, after _task_mem_trackers.erase,
             // the key still exists in _task_mem_trackers. https://github.com/apache/incubator-doris/issues/10006
-            _task_mem_trackers._erase(it++);
+            expired_task_ids.emplace_back(it->first);
         } else if (it->second->remain_child_count() == 0 && it->second->had_child_count() != 0) {
             // No RuntimeState uses this task MemTracker, it is only referenced by this map,
             // and tracker was not created soon, delete it.
@@ -90,7 +91,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
             // the negative number of the current value of consume.
             it->second->parent()->consumption_revise(-it->second->consumption());
             LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first;
-            _task_mem_trackers._erase(it++);
+            expired_task_ids.emplace_back(it->first);
         } else {
             // Log limit exceeded query tracker.
             if (it->second->limit_exceeded()) {
@@ -99,9 +100,14 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
                         fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first),
                         0, Status::OK());
             }
-            ++it;
         }
     }
+    for (auto tid : expired_task_ids) {
+        // Verify the condition again to make sure the tracker is not being used again.
+        _task_mem_trackers.erase_if(tid, [&](std::shared_ptr<MemTrackerLimiter> v) {
+            return !v || v->remain_child_count() == 0;
+        });
+    }
 }
 
 // TODO(zxy) More observable methods
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 1862c2830a..2d23388a81 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -124,7 +124,8 @@ public:
     std::string print_debug_string() {
         fmt::memory_buffer consumer_tracker_buf;
         for (const auto& v : _consumer_tracker_stack) {
-            fmt::format_to(consumer_tracker_buf, "{}, ", v->log_usage());
+            fmt::format_to(consumer_tracker_buf, "{}, ",
+                           MemTracker::log_usage(v->make_snapshot(0)));
         }
         return fmt::format(
                 "ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, "
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index f098de7541..5b5aaae861 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -36,6 +36,7 @@
 #include "runtime/result_buffer_mgr.h"
 #include "runtime/result_queue_mgr.h"
 #include "runtime/row_batch.h"
+#include "runtime/runtime_filter_mgr.h"
 #include "runtime/thread_context.h"
 #include "util/container_util.hpp"
 #include "util/defer_op.h"
@@ -99,6 +100,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
 
     RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
     SCOPED_ATTACH_TASK(_runtime_state.get());
+    _runtime_state->runtime_filter_mgr()->init();
     _runtime_state->set_be_number(request.backend_num);
     if (request.__isset.backend_id) {
         _runtime_state->set_backend_id(request.backend_id);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index d231cd75c8..dd58feb1e2 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -44,9 +44,9 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state
 
 RuntimeFilterMgr::~RuntimeFilterMgr() {}
 
-Status RuntimeFilterMgr::init(MemTrackerLimiter* parent_tracker) {
+Status RuntimeFilterMgr::init() {
     DCHECK(_state->instance_mem_tracker() != nullptr);
-    _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr", parent_tracker);
+    _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr");
     return Status::OK();
 }
 
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 340e9b27c7..1471f664f7 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -57,7 +57,7 @@ public:
 
     ~RuntimeFilterMgr();
 
-    Status init(MemTrackerLimiter* parent_tracker);
+    Status init();
 
     // get a consumer filter by filter-id
     Status get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index f3284d05f6..4c3d114447 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -258,9 +258,6 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
         _instance_buffer_reservation->InitChildTracker(&_profile, _buffer_reservation,
                                                        std::numeric_limits<int64_t>::max());
     }
-
-    // filter manager depends _instance_mem_tracker
-    _runtime_filter_mgr->init(_instance_mem_tracker.get());
     return Status::OK();
 }
 
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index f076f251ec..c88fbbb683 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -855,7 +855,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
             std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
                                runtime_profile()->total_time_counter()),
             "");
-    _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(), nullptr,
+    _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(),
                                                 _runtime_profile.get());
     SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
 
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp
index bce11043ac..020f188cac 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -263,10 +263,9 @@ VDataStreamRecvr::VDataStreamRecvr(
           _num_buffered_bytes(0),
           _profile(profile),
           _sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
-    // DataStreamRecvr may be destructed after the instance execution thread ends, `instance_mem_tracker`
-    // will be a null pointer, and remove_child fails when _mem_tracker is destructed.
+    // DataStreamRecvr may be destructed after the instance execution thread ends.
     _mem_tracker = std::make_unique<MemTracker>(
-            "VDataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile);
+            "VDataStreamRecvr:" + print_id(_fragment_instance_id), _profile);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     // Create one queue per sender if is_merging is true.
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index c378b3c33d..e137d3ca18 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -397,7 +397,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
     _profile = _pool->add(new RuntimeProfile(std::move(title)));
     SCOPED_TIMER(_profile->total_time_counter());
     _mem_tracker = std::make_unique<MemTracker>(
-            "VDataStreamSender:" + print_id(state->fragment_instance_id()), nullptr, _profile);
+            "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile);
     SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
 
     if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org