You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/30 10:34:00 UTC
[doris] branch master updated: weak relationship between MemTracker and MemTrackerLimiter (#11347)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 18864ab7fe weak relationship between MemTracker and MemTrackerLimiter (#11347)
18864ab7fe is described below
commit 18864ab7fe1c9982f907466cc8b0567a37134074
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Sat Jul 30 18:33:54 2022 +0800
weak relationship between MemTracker and MemTrackerLimiter (#11347)
---
be/src/exec/exec_node.cpp | 2 +-
be/src/olap/memtable.cpp | 4 +-
be/src/olap/schema_change.cpp | 6 +-
be/src/runtime/bufferpool/reservation_tracker.cc | 2 +-
be/src/runtime/data_stream_recvr.cc | 2 +-
be/src/runtime/data_stream_sender.cpp | 2 +-
be/src/runtime/memory/mem_tracker.cpp | 106 ++++++++++++-----------
be/src/runtime/memory/mem_tracker.h | 48 ++++------
be/src/runtime/memory/mem_tracker_limiter.cpp | 53 +++++-------
be/src/runtime/memory/mem_tracker_limiter.h | 23 +++--
be/src/runtime/memory/mem_tracker_task_pool.cpp | 14 ++-
be/src/runtime/memory/thread_mem_tracker_mgr.h | 3 +-
be/src/runtime/plan_fragment_executor.cpp | 2 +
be/src/runtime/runtime_filter_mgr.cpp | 4 +-
be/src/runtime/runtime_filter_mgr.h | 2 +-
be/src/runtime/runtime_state.cpp | 3 -
be/src/vec/exec/join/vhash_join_node.cpp | 2 +-
be/src/vec/runtime/vdata_stream_recvr.cpp | 5 +-
be/src/vec/sink/vdata_stream_sender.cpp | 2 +-
19 files changed, 139 insertions(+), 146 deletions(-)
diff --git a/be/src/exec/exec_node.cpp b/be/src/exec/exec_node.cpp
index 97afb5fa05..0a854ba89b 100644
--- a/be/src/exec/exec_node.cpp
+++ b/be/src/exec/exec_node.cpp
@@ -206,7 +206,7 @@ Status ExecNode::prepare(RuntimeState* state) {
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
- _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(), nullptr,
+ _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(),
_runtime_profile.get());
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
diff --git a/be/src/olap/memtable.cpp b/be/src/olap/memtable.cpp
index a1ade071d0..a9675fba7d 100644
--- a/be/src/olap/memtable.cpp
+++ b/be/src/olap/memtable.cpp
@@ -144,7 +144,9 @@ MemTable::~MemTable() {
_mem_tracker->release(_mem_usage);
_buffer_mem_pool->free_all();
_table_mem_pool->free_all();
- _mem_tracker->memory_leak_check();
+ DCHECK_EQ(_mem_tracker->consumption(), 0)
+ << std::endl
+ << MemTracker::log_usage(_mem_tracker->make_snapshot(0));
}
MemTable::RowCursorComparator::RowCursorComparator(const Schema* schema) : _schema(schema) {}
diff --git a/be/src/olap/schema_change.cpp b/be/src/olap/schema_change.cpp
index ee9653203f..ac203e858f 100644
--- a/be/src/olap/schema_change.cpp
+++ b/be/src/olap/schema_change.cpp
@@ -1364,10 +1364,8 @@ VSchemaChangeWithSorting::VSchemaChangeWithSorting(const RowBlockChanger& row_bl
: _changer(row_block_changer),
_memory_limitation(memory_limitation),
_temp_delta_versions(Version::mock()) {
- _mem_tracker =
- std::make_unique<MemTracker>(fmt::format("VSchemaChangeWithSorting:changer={}",
- std::to_string(int64(&row_block_changer))),
- StorageEngine::instance()->schema_change_mem_tracker());
+ _mem_tracker = std::make_unique<MemTracker>(fmt::format(
+ "VSchemaChangeWithSorting:changer={}", std::to_string(int64(&row_block_changer))));
}
Status SchemaChangeWithSorting::_inner_process(RowsetReaderSharedPtr rowset_reader,
diff --git a/be/src/runtime/bufferpool/reservation_tracker.cc b/be/src/runtime/bufferpool/reservation_tracker.cc
index 6bcd0bf38a..6985edaef7 100644
--- a/be/src/runtime/bufferpool/reservation_tracker.cc
+++ b/be/src/runtime/bufferpool/reservation_tracker.cc
@@ -73,7 +73,7 @@ void ReservationTracker::InitChildTracker(RuntimeProfile* profile, ReservationTr
MemTracker* parent_mem_tracker = GetParentMemTracker();
if (parent_mem_tracker != nullptr) {
// Make sure the parent links of the MemTrackers correspond to our parent links.
- DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
+ // DCHECK_EQ(parent_mem_tracker, mem_tracker_->parent());
} else {
// Make sure we didn't leave a gap in the links. E.g. this tracker's grandparent
// shouldn't have a MemTracker.
diff --git a/be/src/runtime/data_stream_recvr.cc b/be/src/runtime/data_stream_recvr.cc
index ffd20c58d7..0d456ca74a 100644
--- a/be/src/runtime/data_stream_recvr.cc
+++ b/be/src/runtime/data_stream_recvr.cc
@@ -461,7 +461,7 @@ DataStreamRecvr::DataStreamRecvr(
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
_mem_tracker = std::make_unique<MemTracker>(
- "DataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile);
+ "DataStreamRecvr:" + print_id(_fragment_instance_id), _profile);
// Create one queue per sender if is_merging is true.
int num_queues = is_merging ? num_senders : 1;
diff --git a/be/src/runtime/data_stream_sender.cpp b/be/src/runtime/data_stream_sender.cpp
index 2731587e45..13b2308d7b 100644
--- a/be/src/runtime/data_stream_sender.cpp
+++ b/be/src/runtime/data_stream_sender.cpp
@@ -403,7 +403,7 @@ Status DataStreamSender::prepare(RuntimeState* state) {
_profile = _pool->add(new RuntimeProfile(title.str()));
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker = std::make_unique<MemTracker>(
- "DataStreamSender:" + print_id(state->fragment_instance_id()), nullptr, _profile);
+ "DataStreamSender:" + print_id(state->fragment_instance_id()), _profile);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index 0f4e185e46..ca7e4943ff 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -21,37 +21,28 @@
#include "runtime/memory/mem_tracker.h"
#include <fmt/format.h>
-#include <parallel_hashmap/phmap.h>
-#include "runtime/memory/mem_tracker_limiter.h"
#include "runtime/thread_context.h"
#include "util/pretty_printer.h"
+#include "util/string_util.h"
#include "util/time.h"
namespace doris {
const std::string MemTracker::COUNTER_NAME = "PeakMemoryUsage";
-using StaticTrackersMap = phmap::parallel_flat_hash_map<
- std::string, MemTracker*, phmap::priv::hash_default_hash<std::string>,
- phmap::priv::hash_default_eq<std::string>,
- std::allocator<std::pair<const std::string, MemTracker*>>, 12, std::mutex>;
+struct TrackerGroup {
+ std::list<MemTracker*> trackers;
+ std::mutex group_lock;
+};
-static StaticTrackersMap _static_mem_trackers;
+// Save all MemTrackers in use to maintain the weak relationship between MemTracker and MemTrackerLimiter.
+// When MemTrackerLimiter prints statistics, all MemTracker statistics with weak relationship will be printed together.
+// Each group corresponds to several MemTrackerLimiters and has a lock.
+// Multiple groups are used to reduce the impact of locks.
+static std::vector<TrackerGroup> mem_tracker_pool(1000);
-MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile,
- bool is_limiter) {
- // Do not check limit exceed when add_child_tracker, otherwise it will cause deadlock when log_usage is called.
- STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
- _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
- DCHECK(_parent || label == "Process");
- if (_parent && _parent->label().find("queryId=") != _parent->label().npos) {
- // Add the queryId suffix to the tracker below the query.
- _label = fmt::format("{}#{}", label,
- _parent->label().substr(_parent->label().find("queryId="), -1));
- } else {
- _label = label;
- }
+MemTracker::MemTracker(const std::string& label, RuntimeProfile* profile, bool is_limiter) {
if (profile == nullptr) {
_consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
} else {
@@ -66,36 +57,43 @@ MemTracker::MemTracker(const std::string& label, MemTrackerLimiter* parent, Runt
// release().
_consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
}
+
_is_limiter = is_limiter;
- if (_parent && !_is_limiter) _parent->add_child(this);
-}
+ if (!_is_limiter) {
+ if (thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()) {
+ _label = fmt::format(
+ "{} | {}", label,
+ thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->label());
+ } else {
+ _label = label + " | ";
+ }
-MemTracker::~MemTracker() {
- if (_parent && !_is_limiter) _parent->remove_child(this);
+ _bind_group_num =
+ thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker()->group_num();
+ {
+ std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
+ _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.insert(
+ mem_tracker_pool[_bind_group_num].trackers.end(), this);
+ }
+ } else {
+ _label = label;
+ }
}
-// Count the memory in the scope to a temporary tracker with the specified label name.
-// This is very useful when debugging. You can find the position where the tracker statistics are
-// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots.
-MemTracker* MemTracker::get_static_mem_tracker(const std::string& label) {
- // First time this label registered, make a new object, otherwise do nothing.
- // Avoid using locks to resolve erase conflicts.
- MemTracker* tracker;
- _static_mem_trackers.lazy_emplace_l(
- label, [&](MemTracker* v) { tracker = v; },
- [&](const auto& ctor) {
- tracker = new MemTracker(fmt::format("[Static]-{}", label));
- ctor(label, tracker);
- });
- return tracker;
+MemTracker::~MemTracker() {
+ if (!_is_limiter) {
+ std::lock_guard<std::mutex> l(mem_tracker_pool[_bind_group_num].group_lock);
+ if (_tracker_group_it != mem_tracker_pool[_bind_group_num].trackers.end()) {
+ mem_tracker_pool[_bind_group_num].trackers.erase(_tracker_group_it);
+ _tracker_group_it = mem_tracker_pool[_bind_group_num].trackers.end();
+ }
+ }
}
MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const {
Snapshot snapshot;
- snapshot.label = _label;
- if (_parent != nullptr) {
- snapshot.parent = _parent->label();
- }
+ snapshot.label = split(_label, " | ")[0];
+ snapshot.parent = split(_label, " | ")[1];
snapshot.level = level;
snapshot.limit = -1;
snapshot.cur_consumption = _consumption->current_value();
@@ -104,15 +102,21 @@ MemTracker::Snapshot MemTracker::make_snapshot(size_t level) const {
return snapshot;
}
-std::string MemTracker::log_usage() {
- // Make sure the consumption is up to date.
- int64_t curr_consumption = consumption();
- int64_t peak_consumption = _consumption->value();
- if (curr_consumption == 0) return "";
- std::string detail = "MemTracker Label={}, Total={}, Peak={}";
- detail = fmt::format(detail, _label, PrettyPrinter::print(curr_consumption, TUnit::BYTES),
- PrettyPrinter::print(peak_consumption, TUnit::BYTES));
- return detail;
+void MemTracker::make_group_snapshot(std::vector<MemTracker::Snapshot>* snapshots, size_t level,
+ int64_t group_num, std::string related_label) {
+ std::lock_guard<std::mutex> l(mem_tracker_pool[group_num].group_lock);
+ for (auto tracker : mem_tracker_pool[group_num].trackers) {
+ if (split(tracker->label(), " | ")[1] == related_label) {
+ snapshots->push_back(tracker->make_snapshot(level));
+ }
+ }
+}
+
+std::string MemTracker::log_usage(MemTracker::Snapshot snapshot) {
+ return fmt::format("MemTracker Label={}, Parent Label={}, Used={}, Peak={}", snapshot.label,
+ snapshot.parent,
+ PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
+ PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES));
}
} // namespace doris
\ No newline at end of file
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 9f6e021a3c..4a12375344 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -23,23 +23,18 @@
namespace doris {
-class MemTrackerLimiter;
-
// Used to track memory usage.
//
// MemTracker can be consumed manually by consume()/release(), or put into SCOPED_CONSUME_MEM_TRACKER,
// which will automatically track all memory usage of the code segment where it is located.
//
-// MemTracker's parent can only be MemTrackerLimiter, which is only used to print tree-like statistics.
-// Consuming MemTracker will not consume its parent synchronously.
-// Usually, it is not necessary to specify the parent. by default, the MemTrackerLimiter in the thread context
-// is used as the parent, which is specified when the thread starts.
-//
// This class is thread-safe.
class MemTracker {
public:
struct Snapshot {
std::string label;
+ // For MemTracker, it is only weakly related to parent through label, ensuring MemTracker Independence.
+ // For MemTrackerLimiter, it is strongly related to parent and saves pointer objects to each other.
std::string parent = "";
size_t level = 0;
int64_t limit = 0;
@@ -48,24 +43,18 @@ public:
size_t child_count = 0;
};
- // Creates and adds the tracker to the tree.
- MemTracker(const std::string& label = std::string(), MemTrackerLimiter* parent = nullptr,
- RuntimeProfile* profile = nullptr, bool is_limiter = false);
+ // Creates and adds the tracker to the mem_tracker_pool.
+ MemTracker(const std::string& label = std::string(), RuntimeProfile* profile = nullptr,
+ bool is_limiter = false);
~MemTracker();
- // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get.
- // Temporary trackers are not automatically destructed, which is usually used for debugging.
- static MemTracker* get_static_mem_tracker(const std::string& label);
-
public:
const std::string& label() const { return _label; }
- MemTrackerLimiter* parent() const { return _parent; }
// Returns the memory consumed in bytes.
int64_t consumption() const { return _consumption->current_value(); }
int64_t peak_consumption() const { return _consumption->value(); }
-public:
void consume(int64_t bytes);
void release(int64_t bytes) { consume(-bytes); }
// Transfer 'bytes' of consumption from this tracker to 'dst'.
@@ -78,17 +67,12 @@ public:
return limit >= 0 && limit > consumption() + bytes;
}
- // Usually, a negative values means that the statistics are not accurate,
- // 1. The released memory is not consumed.
- // 2. The same block of memory, tracker A calls consume, and tracker B calls release.
- // 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker,
- // after the release is called on the parent MemTracker,
- // the child ~MemTracker will cause repeated releases.
- void memory_leak_check() { DCHECK_EQ(consumption(), 0) << std::endl << log_usage(); }
-
Snapshot make_snapshot(size_t level) const;
-
- std::string log_usage();
+ // Specify group_num from mem_tracker_pool to generate snapshot, requiring tracker.label to be related
+ // with parameter related_label
+ static void make_group_snapshot(std::vector<Snapshot>* snapshots, size_t level,
+ int64_t group_num, std::string related_label);
+ static std::string log_usage(MemTracker::Snapshot snapshot);
std::string debug_string() {
std::stringstream msg;
@@ -98,20 +82,22 @@ public:
return msg.str();
}
- // Iterator into parent_->_child_trackers for this object. Stored to have O(1) remove.
- std::list<MemTracker*>::iterator _child_tracker_it;
-
static const std::string COUNTER_NAME;
protected:
- // label used in the usage string (log_usage())
+ // label used in the make snapshot, not guaranteed unique.
std::string _label;
std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _consumption; // in bytes
- MemTrackerLimiter* _parent; // The parent of this tracker.
+ // Tracker is located in group num in mem_tracker_pool
+ int64_t _bind_group_num;
+ // Whether is a MemTrackerLimiter
bool _is_limiter;
+
+ // Iterator into mem_tracker_pool for this object. Stored to have O(1) remove.
+ std::list<MemTracker*>::iterator _tracker_group_it;
};
inline void MemTracker::consume(int64_t bytes) {
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 4bdc2a2781..2223fce5a5 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -30,10 +30,14 @@ namespace doris {
MemTrackerLimiter::MemTrackerLimiter(int64_t byte_limit, const std::string& label,
MemTrackerLimiter* parent, RuntimeProfile* profile)
- : MemTracker(label, parent, profile, true) {
- // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors
+ : MemTracker(label, profile, true) {
DCHECK_GE(byte_limit, -1);
_limit = byte_limit;
+ _group_num = GetCurrentTimeMicros() % 1000;
+ _parent = parent ? parent : thread_context()->_thread_mem_tracker_mgr->limiter_mem_tracker();
+ DCHECK(_parent || label == "Process");
+
+ // Walks the MemTrackerLimiter hierarchy and populates _all_ancestors and _limited_ancestors
MemTrackerLimiter* tracker = this;
while (tracker != nullptr) {
_all_ancestors.push_back(tracker);
@@ -59,12 +63,6 @@ void MemTrackerLimiter::add_child(MemTrackerLimiter* tracker) {
_had_child_count++;
}
-void MemTrackerLimiter::add_child(MemTracker* tracker) {
- std::lock_guard<std::mutex> l(_child_tracker_lock);
- tracker->_child_tracker_it = _child_trackers.insert(_child_trackers.end(), tracker);
- _had_child_count++;
-}
-
void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) {
std::lock_guard<std::mutex> l(_child_tracker_limiter_lock);
if (tracker->_child_tracker_it != _child_tracker_limiters.end()) {
@@ -73,19 +71,21 @@ void MemTrackerLimiter::remove_child(MemTrackerLimiter* tracker) {
}
}
-void MemTrackerLimiter::remove_child(MemTracker* tracker) {
- std::lock_guard<std::mutex> l(_child_tracker_lock);
- if (tracker->_child_tracker_it != _child_trackers.end()) {
- _child_trackers.erase(tracker->_child_tracker_it);
- tracker->_child_tracker_it = _child_trackers.end();
- }
+MemTracker::Snapshot MemTrackerLimiter::make_snapshot(size_t level) const {
+ Snapshot snapshot;
+ snapshot.label = _label;
+ snapshot.parent = _parent != nullptr ? _parent->label() : "Root";
+ snapshot.level = level;
+ snapshot.limit = _limit;
+ snapshot.cur_consumption = _consumption->current_value();
+ snapshot.peak_consumption = _consumption->value();
+ snapshot.child_count = remain_child_count();
+ return snapshot;
}
void MemTrackerLimiter::make_snapshot(std::vector<MemTracker::Snapshot>* snapshots,
size_t cur_level, size_t upper_level) const {
- Snapshot snapshot = MemTracker::make_snapshot(cur_level);
- snapshot.limit = _limit;
- snapshot.child_count = remain_child_count();
+ Snapshot snapshot = MemTrackerLimiter::make_snapshot(cur_level);
(*snapshots).emplace_back(snapshot);
if (cur_level < upper_level) {
{
@@ -94,12 +94,7 @@ void MemTrackerLimiter::make_snapshot(std::vector<MemTracker::Snapshot>* snapsho
child->make_snapshot(snapshots, cur_level + 1, upper_level);
}
}
- {
- std::lock_guard<std::mutex> l(_child_tracker_lock);
- for (const auto& child : _child_trackers) {
- (*snapshots).emplace_back(child->make_snapshot(cur_level + 1));
- }
- }
+ MemTracker::make_group_snapshot(snapshots, cur_level + 1, _group_num, _label);
}
}
@@ -183,8 +178,7 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge
int64_t peak_consumption = _consumption->value();
if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
- std::string detail =
- "MemTrackerLimiter log_usage Label={}, Limit={}, Total={}, Peak={}, Exceeded={}";
+ std::string detail = "MemTrackerLimiter Label={}, Limit={}, Used={}, Peak={}, Exceeded={}";
detail = fmt::format(detail, _label, PrettyPrinter::print(_limit, TUnit::BYTES),
PrettyPrinter::print(curr_consumption, TUnit::BYTES),
PrettyPrinter::print(peak_consumption, TUnit::BYTES),
@@ -201,11 +195,10 @@ std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logge
child_trackers_usage =
log_usage(max_recursive_depth - 1, _child_tracker_limiters, &child_consumption);
}
- {
- std::lock_guard<std::mutex> l(_child_tracker_lock);
- for (const auto& child : _child_trackers) {
- child_trackers_usage += "\n" + child->log_usage();
- }
+ std::vector<MemTracker::Snapshot> snapshots;
+ MemTracker::make_group_snapshot(&snapshots, 0, _group_num, _label);
+ for (const auto& snapshot : snapshots) {
+ child_trackers_usage += MemTracker::log_usage(snapshot);
}
if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage;
return detail;
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 3852cbe52d..5c41ce7cda 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -45,19 +45,21 @@ public:
MemTrackerLimiter(int64_t byte_limit = -1, const std::string& label = std::string(),
MemTrackerLimiter* parent = nullptr, RuntimeProfile* profile = nullptr);
+ // If the final consumption is not as expected, this usually means that the same memory is calling
+ // consume and release on different trackers. If the two trackers have a parent-child relationship,
+ // the parent tracker consumption is correct, and the child tracker is wrong; if the two trackers have
+ // no parent-child relationship, the two tracker consumptions are wrong.
~MemTrackerLimiter();
+ MemTrackerLimiter* parent() const { return _parent; }
+
void add_child(MemTrackerLimiter* tracker);
- void add_child(MemTracker* tracker);
void remove_child(MemTrackerLimiter* tracker);
- void remove_child(MemTracker* tracker);
- // Leaf tracker, without any child
- size_t remain_child_count() const {
- return _child_tracker_limiters.size() + _child_trackers.size();
- }
+ size_t remain_child_count() const { return _child_tracker_limiters.size(); }
size_t had_child_count() const { return _had_child_count; }
+ Snapshot make_snapshot(size_t level) const;
// Returns a list of all the valid tracker snapshots.
void make_snapshot(std::vector<MemTracker::Snapshot>* snapshots, size_t cur_level,
size_t upper_level) const;
@@ -76,6 +78,7 @@ public:
return Status::OK();
}
+ int64_t group_num() const { return _group_num; }
bool has_limit() const { return _limit >= 0; }
int64_t limit() const { return _limit; }
void update_limit(int64_t limit) {
@@ -181,6 +184,11 @@ private:
// Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
int64_t _limit;
+ // Group number in MemTracker::mem_tracker_pool, generated by the timestamp.
+ int64_t _group_num;
+
+ MemTrackerLimiter* _parent; // The parent of this tracker.
+
// this tracker limiter plus all of its ancestors
std::vector<MemTrackerLimiter*> _all_ancestors;
// _all_ancestors with valid limits
@@ -192,9 +200,6 @@ private:
mutable std::mutex _child_tracker_limiter_lock;
std::list<MemTrackerLimiter*> _child_tracker_limiters;
- mutable std::mutex _child_tracker_lock;
- std::list<MemTracker*> _child_trackers;
-
// The number of child trackers that have been added.
std::atomic_size_t _had_child_count = 0;
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index 02b38acdb5..24a8c95180 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -68,11 +68,12 @@ MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& t
}
void MemTrackerTaskPool::logout_task_mem_tracker() {
- for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end();) {
+ std::vector<std::string> expired_task_ids;
+ for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) {
if (!it->second) {
// Unknown exception case with high concurrency, after _task_mem_trackers.erase,
// the key still exists in _task_mem_trackers. https://github.com/apache/incubator-doris/issues/10006
- _task_mem_trackers._erase(it++);
+ expired_task_ids.emplace_back(it->first);
} else if (it->second->remain_child_count() == 0 && it->second->had_child_count() != 0) {
// No RuntimeState uses this task MemTracker, it is only referenced by this map,
// and tracker was not created soon, delete it.
@@ -90,7 +91,7 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
// the negative number of the current value of consume.
it->second->parent()->consumption_revise(-it->second->consumption());
LOG(INFO) << "Deregister query/load memory tracker, queryId/loadId: " << it->first;
- _task_mem_trackers._erase(it++);
+ expired_task_ids.emplace_back(it->first);
} else {
// Log limit exceeded query tracker.
if (it->second->limit_exceeded()) {
@@ -99,9 +100,14 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first),
0, Status::OK());
}
- ++it;
}
}
+ for (auto tid : expired_task_ids) {
+ // Verify the condition again to make sure the tracker is not being used again.
+ _task_mem_trackers.erase_if(tid, [&](std::shared_ptr<MemTrackerLimiter> v) {
+ return !v || v->remain_child_count() == 0;
+ });
+ }
}
// TODO(zxy) More observable methods
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 1862c2830a..2d23388a81 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -124,7 +124,8 @@ public:
std::string print_debug_string() {
fmt::memory_buffer consumer_tracker_buf;
for (const auto& v : _consumer_tracker_stack) {
- fmt::format_to(consumer_tracker_buf, "{}, ", v->log_usage());
+ fmt::format_to(consumer_tracker_buf, "{}, ",
+ MemTracker::log_usage(v->make_snapshot(0)));
}
return fmt::format(
"ThreadMemTrackerMgr debug, _untracked_mem:{}, _task_id:{}, "
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index f098de7541..5b5aaae861 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -36,6 +36,7 @@
#include "runtime/result_buffer_mgr.h"
#include "runtime/result_queue_mgr.h"
#include "runtime/row_batch.h"
+#include "runtime/runtime_filter_mgr.h"
#include "runtime/thread_context.h"
#include "util/container_util.hpp"
#include "util/defer_op.h"
@@ -99,6 +100,7 @@ Status PlanFragmentExecutor::prepare(const TExecPlanFragmentParams& request,
RETURN_IF_ERROR(_runtime_state->init_mem_trackers(_query_id));
SCOPED_ATTACH_TASK(_runtime_state.get());
+ _runtime_state->runtime_filter_mgr()->init();
_runtime_state->set_be_number(request.backend_num);
if (request.__isset.backend_id) {
_runtime_state->set_backend_id(request.backend_id);
diff --git a/be/src/runtime/runtime_filter_mgr.cpp b/be/src/runtime/runtime_filter_mgr.cpp
index d231cd75c8..dd58feb1e2 100644
--- a/be/src/runtime/runtime_filter_mgr.cpp
+++ b/be/src/runtime/runtime_filter_mgr.cpp
@@ -44,9 +44,9 @@ RuntimeFilterMgr::RuntimeFilterMgr(const UniqueId& query_id, RuntimeState* state
RuntimeFilterMgr::~RuntimeFilterMgr() {}
-Status RuntimeFilterMgr::init(MemTrackerLimiter* parent_tracker) {
+Status RuntimeFilterMgr::init() {
DCHECK(_state->instance_mem_tracker() != nullptr);
- _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr", parent_tracker);
+ _tracker = std::make_unique<MemTracker>("RuntimeFilterMgr");
return Status::OK();
}
diff --git a/be/src/runtime/runtime_filter_mgr.h b/be/src/runtime/runtime_filter_mgr.h
index 340e9b27c7..1471f664f7 100644
--- a/be/src/runtime/runtime_filter_mgr.h
+++ b/be/src/runtime/runtime_filter_mgr.h
@@ -57,7 +57,7 @@ public:
~RuntimeFilterMgr();
- Status init(MemTrackerLimiter* parent_tracker);
+ Status init();
// get a consumer filter by filter-id
Status get_consume_filter(const int filter_id, IRuntimeFilter** consumer_filter);
diff --git a/be/src/runtime/runtime_state.cpp b/be/src/runtime/runtime_state.cpp
index f3284d05f6..4c3d114447 100644
--- a/be/src/runtime/runtime_state.cpp
+++ b/be/src/runtime/runtime_state.cpp
@@ -258,9 +258,6 @@ Status RuntimeState::init_mem_trackers(const TUniqueId& query_id) {
_instance_buffer_reservation->InitChildTracker(&_profile, _buffer_reservation,
std::numeric_limits<int64_t>::max());
}
-
- // filter manager depends _instance_mem_tracker
- _runtime_filter_mgr->init(_instance_mem_tracker.get());
return Status::OK();
}
diff --git a/be/src/vec/exec/join/vhash_join_node.cpp b/be/src/vec/exec/join/vhash_join_node.cpp
index f076f251ec..c88fbbb683 100644
--- a/be/src/vec/exec/join/vhash_join_node.cpp
+++ b/be/src/vec/exec/join/vhash_join_node.cpp
@@ -855,7 +855,7 @@ Status HashJoinNode::prepare(RuntimeState* state) {
std::bind<int64_t>(&RuntimeProfile::units_per_second, _rows_returned_counter,
runtime_profile()->total_time_counter()),
"");
- _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(), nullptr,
+ _mem_tracker = std::make_unique<MemTracker>("ExecNode:" + _runtime_profile->name(),
_runtime_profile.get());
SCOPED_CONSUME_MEM_TRACKER(mem_tracker());
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp
index bce11043ac..020f188cac 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -263,10 +263,9 @@ VDataStreamRecvr::VDataStreamRecvr(
_num_buffered_bytes(0),
_profile(profile),
_sub_plan_query_statistics_recvr(sub_plan_query_statistics_recvr) {
- // DataStreamRecvr may be destructed after the instance execution thread ends, `instance_mem_tracker`
- // will be a null pointer, and remove_child fails when _mem_tracker is destructed.
+ // DataStreamRecvr may be destructed after the instance execution thread ends.
_mem_tracker = std::make_unique<MemTracker>(
- "VDataStreamRecvr:" + print_id(_fragment_instance_id), query_mem_tracker, _profile);
+ "VDataStreamRecvr:" + print_id(_fragment_instance_id), _profile);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
// Create one queue per sender if is_merging is true.
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index c378b3c33d..e137d3ca18 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -397,7 +397,7 @@ Status VDataStreamSender::prepare(RuntimeState* state) {
_profile = _pool->add(new RuntimeProfile(std::move(title)));
SCOPED_TIMER(_profile->total_time_counter());
_mem_tracker = std::make_unique<MemTracker>(
- "VDataStreamSender:" + print_id(state->fragment_instance_id()), nullptr, _profile);
+ "VDataStreamSender:" + print_id(state->fragment_instance_id()), _profile);
SCOPED_CONSUME_MEM_TRACKER(_mem_tracker.get());
if (_part_type == TPartitionType::UNPARTITIONED || _part_type == TPartitionType::RANDOM) {
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org