You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/12 10:09:47 UTC
[doris] branch master updated: mem_tracker_factor_v2 (#10743)
This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new 41f9ee2f9e mem_tracker_factor_v2 (#10743)
41f9ee2f9e is described below
commit 41f9ee2f9e07864bfa5d2a1512db336c83802496
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Tue Jul 12 18:09:41 2022 +0800
mem_tracker_factor_v2 (#10743)
---
be/src/runtime/memory/mem_tracker_base.cpp | 53 ++++
be/src/runtime/memory/mem_tracker_base.h | 78 ++++++
be/src/runtime/memory/mem_tracker_limiter.cpp | 333 +++++++++++++++++++++++
be/src/runtime/memory/mem_tracker_limiter.h | 348 ++++++++++++++++++++++++
be/src/runtime/memory/mem_tracker_observe.cpp | 87 ++++++
be/src/runtime/memory/mem_tracker_observe.h | 91 +++++++
be/src/runtime/memory/mem_tracker_task_pool.cpp | 153 +++++++++++
be/src/runtime/memory/mem_tracker_task_pool.h | 58 ++++
8 files changed, 1201 insertions(+)
diff --git a/be/src/runtime/memory/mem_tracker_base.cpp b/be/src/runtime/memory/mem_tracker_base.cpp
new file mode 100644
index 0000000000..bb407e2bf8
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_base.cpp
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.cpp
+// and modified by Doris
+
+#include "runtime/memory/mem_tracker_base.h"
+
+#include "util/time.h"
+
+namespace doris {
+
+const std::string MemTrackerBase::COUNTER_NAME = "PeakMemoryUsage";
+
+MemTrackerBase::MemTrackerBase(const std::string& label, MemTrackerLimiter* parent,
+ RuntimeProfile* profile)
+ : _label(label),
+ // Not 100% sure the id is unique. This is generated because it is faster than converting to int after hash.
+ _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()),
+ _parent(parent) {
+ if (profile == nullptr) {
+ _consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
+ } else {
+ // By default, memory consumption is tracked via calls to consume()/release(), either to
+ // the tracker itself or to one of its descendents. Alternatively, a consumption metric
+ // can be specified, and then the metric's value is used as the consumption rather than
+ // the tally maintained by consume() and release(). A tcmalloc metric is used to track
+ // process memory consumption, since the process memory usage may be higher than the
+ // computed total memory (tcmalloc does not release deallocated memory immediately).
+ // Other consumption metrics are used in trackers below the process level to account
+ // for memory (such as free buffer pool buffers) that is not tracked by consume() and
+ // release().
+ _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
+ }
+}
+
+MemTrackerBase::MemTrackerBase(const std::string& label)
+ : MemTrackerBase(label, nullptr, nullptr) {}
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_base.h b/be/src/runtime/memory/mem_tracker_base.h
new file mode 100644
index 0000000000..10d554839b
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_base.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.h
+// and modified by Doris
+
+#pragma once
+
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+class MemTrackerLimiter;
+
+// A MemTracker tracks memory consumption.
+// This class is thread-safe.
+class MemTrackerBase {
+public:
+ const std::string& label() const { return _label; }
+
+ // Returns the memory consumed in bytes.
+ int64_t consumption() const { return _consumption->current_value(); }
+ int64_t peak_consumption() const { return _consumption->value(); }
+
+ MemTrackerBase(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile);
+
+ // this is used for creating an orphan mem tracker, or for unit test.
+ // If a mem tracker has parent, it should be created by `create_tracker()`
+ MemTrackerBase(const std::string& label = std::string());
+
+ MemTrackerLimiter* parent() const { return _parent; }
+ int64_t id() { return _id; }
+ bool is_limited() { return _is_limited; } // MemTrackerLimiter
+ bool is_observed() { return _is_observed; }
+ void set_is_limited() { _is_limited = true; } // MemTrackerObserve
+ void set_is_observed() { _is_observed = true; }
+
+ // Usually, a negative values means that the statistics are not accurate,
+ // 1. The released memory is not consumed.
+ // 2. The same block of memory, tracker A calls consume, and tracker B calls release.
+ // 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker,
+ // after the release is called on the parent MemTracker,
+ // the child ~MemTracker will cause repeated releases.
+ void memory_leak_check() { DCHECK_EQ(_consumption->current_value(), 0); }
+
+ static const std::string COUNTER_NAME;
+
+protected:
+ // label used in the usage string (log_usage())
+ std::string _label;
+
+ // Automatically generated, unique for each mem tracker.
+ int64_t _id;
+
+ std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _consumption; // in bytes
+
+ bool _is_limited = false; // is MemTrackerLimiter
+
+ bool _is_observed = false; // is MemTrackerObserve
+
+ MemTrackerLimiter* _parent; // The parent of this tracker.
+};
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
new file mode 100644
index 0000000000..6193558ab0
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/mem_tracker_limiter.h"
+
+#include <fmt/format.h>
+
+#include "gutil/once.h"
+#include "runtime/memory/mem_tracker_observe.h"
+#include "runtime/thread_context.h"
+#include "service/backend_options.h"
+#include "util/pretty_printer.h"
+#include "util/string_util.h"
+
+namespace doris {
+
+// The ancestor for all trackers. Every tracker is visible from the process down.
+// All manually created trackers should specify the process tracker as the parent.
+static MemTrackerLimiter* process_tracker;
+static GoogleOnceType process_tracker_once = GOOGLE_ONCE_INIT;
+
+MemTrackerLimiter* MemTrackerLimiter::create_tracker(int64_t byte_limit, const std::string& label,
+ MemTrackerLimiter* parent,
+ RuntimeProfile* profile) {
+ // Do not check limit exceed when add_child_tracker, otherwise it will cause deadlock when log_usage is called.
+ STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+ if (!parent) {
+ parent = MemTrackerLimiter::get_process_tracker();
+ }
+ MemTrackerLimiter* tracker(new MemTrackerLimiter("[Limit]-" + label, parent, profile));
+ parent->add_child_tracker(tracker);
+ tracker->set_is_limited();
+ tracker->init(byte_limit);
+ return tracker;
+}
+
+void MemTrackerLimiter::init(int64_t limit) {
+ DCHECK_GE(limit, -1);
+ _limit = limit;
+ MemTrackerLimiter* tracker = this;
+ while (tracker != nullptr) {
+ _ancestor_all_trackers.push_back(tracker);
+ if (tracker->has_limit()) _ancestor_limiter_trackers.push_back(tracker);
+ tracker = tracker->_parent;
+ }
+ DCHECK_GT(_ancestor_all_trackers.size(), 0);
+ DCHECK_EQ(_ancestor_all_trackers[0], this);
+}
+
+MemTrackerLimiter::~MemTrackerLimiter() {
+ // TCMalloc hook will be triggered during destructor memtracker, may cause crash.
+ if (_label == "Process") doris::thread_local_ctx._init = false;
+ flush_untracked_mem();
+ if (parent()) {
+ // Do not call release on the parent tracker to avoid repeated releases.
+ // Ensure that all consume/release are triggered by TCMalloc new/delete hook.
+ std::lock_guard<SpinLock> l(_parent->_child_trackers_lock);
+ if (_child_tracker_it != _parent->_child_limiter_trackers.end()) {
+ _parent->_child_limiter_trackers.erase(_child_tracker_it);
+ _child_tracker_it = _parent->_child_limiter_trackers.end();
+ }
+ }
+ // The child observe tracker life cycle is controlled by its parent limiter tarcker.
+ for (audo tracker : _child_observe_trackers) {
+ delete tracker;
+ }
+ DCHECK_EQ(_untracked_mem, 0);
+}
+
+void MemTrackerLimiter::add_child_tracker(MemTrackerLimiter* tracker) {
+ std::lock_guard<SpinLock> l(_child_trackers_lock);
+ tracker->_child_tracker_it =
+ _child_limiter_trackers.insert(_child_limiter_trackers.end(), tracker);
+}
+
+void MemTrackerLimiter::add_child_tracker(MemTrackerObserve* tracker) {
+ std::lock_guard<SpinLock> l(_child_trackers_lock);
+ tracker->_child_tracker_it =
+ _child_observe_trackers.insert(_child_observe_trackers.end(), tracker);
+}
+
+void MemTrackerLimiter::remove_child_tracker(MemTrackerLimiter* tracker) {
+ std::lock_guard<SpinLock> l(_child_trackers_lock);
+ if (tracker->_child_tracker_it != _child_limiter_trackers.end()) {
+ _child_limiter_trackers.erase(tracker->_child_tracker_it);
+ tracker->_child_tracker_it = _child_limiter_trackers.end();
+ }
+}
+
+void MemTrackerLimiter::remove_child_tracker(MemTrackerObserve* tracker) {
+ std::lock_guard<SpinLock> l(_child_trackers_lock);
+ if (tracker->_child_tracker_it != _child_observe_trackers.end()) {
+ _child_observe_trackers.erase(tracker->_child_tracker_it);
+ tracker->_child_tracker_it = _child_observe_trackers.end();
+ }
+}
+
+void MemTrackerLimiter::create_process_tracker() {
+ process_tracker = new MemTrackerLimiter("Process", nullptr, nullptr);
+ process_tracker->init(-1);
+}
+
+MemTrackerLimiter* MemTrackerLimiter::get_process_tracker() {
+ GoogleOnceInit(&process_tracker_once, &MemTrackerLimiter::create_process_tracker);
+ return process_tracker;
+}
+
+void MemTrackerLimiter::list_process_trackers(std::vector<MemTrackerBase*>* trackers) {
+ trackers->clear();
+ std::deque<MemTrackerLimiter*> to_process;
+ to_process.push_front(get_process_tracker());
+ while (!to_process.empty()) {
+ MemTrackerLimiter* t = to_process.back();
+ to_process.pop_back();
+
+ trackers->push_back(t);
+ std::list<MemTrackerLimiter*> limiter_children;
+ std::list<MemTrackerObserve*> observe_children;
+ {
+ std::lock_guard<SpinLock> l(t->_child_trackers_lock);
+ limiter_children = t->_child_limiter_trackers;
+ observe_children = t->_child_observe_trackers;
+ }
+ for (const auto& child : limiter_children) {
+ to_process.emplace_back(std::move(child));
+ }
+ if (config::show_observe_tracker) {
+ for (const auto& child : observe_children) {
+ trackers->push_back(child);
+ }
+ }
+ }
+}
+
+MemTrackerLimiter* MemTrackerLimiter::common_ancestor(MemTrackerLimiter* dst) {
+ if (id() == dst->id()) return dst;
+ DCHECK_EQ(_ancestor_all_trackers.back(), dst->_ancestor_all_trackers.back())
+ << "Must have same ancestor";
+ int ancestor_idx = _ancestor_all_trackers.size() - 1;
+ int dst_ancestor_idx = dst->_ancestor_all_trackers.size() - 1;
+ while (ancestor_idx > 0 && dst_ancestor_idx > 0 &&
+ _ancestor_all_trackers[ancestor_idx - 1] ==
+ dst->_ancestor_all_trackers[dst_ancestor_idx - 1]) {
+ --ancestor_idx;
+ --dst_ancestor_idx;
+ }
+ return _ancestor_all_trackers[ancestor_idx];
+}
+
+MemTrackerLimiter* MemTrackerLimiter::limit_exceeded_tracker() const {
+ for (const auto& tracker : _ancestor_limiter_trackers) {
+ if (tracker->limit_exceeded()) {
+ return tracker;
+ }
+ }
+ return nullptr;
+}
+
+int64_t MemTrackerLimiter::spare_capacity() const {
+ int64_t result = std::numeric_limits<int64_t>::max();
+ for (const auto& tracker : _ancestor_limiter_trackers) {
+ int64_t mem_left = tracker->limit() - tracker->consumption();
+ result = std::min(result, mem_left);
+ }
+ return result;
+}
+
+int64_t MemTrackerLimiter::get_lowest_limit() const {
+ if (_ancestor_limiter_trackers.empty()) return -1;
+ int64_t min_limit = std::numeric_limits<int64_t>::max();
+ for (const auto& tracker : _ancestor_limiter_trackers) {
+ DCHECK(tracker->has_limit());
+ min_limit = std::min(min_limit, tracker->limit());
+ }
+ return min_limit;
+}
+
+bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
+ if (max_consumption < 0) return true;
+ std::lock_guard<std::mutex> l(_gc_lock);
+ int64_t pre_gc_consumption = consumption();
+ // Check if someone gc'd before us
+ if (pre_gc_consumption < max_consumption) return false;
+
+ int64_t curr_consumption = pre_gc_consumption;
+ // Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later.
+ const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L;
+ // Try to free up some memory
+ for (int i = 0; i < _gc_functions.size(); ++i) {
+ // Try to free up the amount we are over plus some extra so that we don't have to
+ // immediately GC again. Don't free all the memory since that can be unnecessarily
+ // expensive.
+ int64_t bytes_to_free = curr_consumption - max_consumption + EXTRA_BYTES_TO_FREE;
+ _gc_functions[i](bytes_to_free);
+ curr_consumption = consumption();
+ if (max_consumption - curr_consumption <= EXTRA_BYTES_TO_FREE) break;
+ }
+
+ return curr_consumption > max_consumption;
+}
+
+Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
+ if (UNLIKELY(gc_memory(_limit - bytes))) {
+ return Status::MemoryLimitExceeded(
+ fmt::format("label={} TryConsume failed size={}, used={}, limit={}", label(), bytes,
+ _consumption->current_value(), _limit));
+ }
+ VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
+ << " consumption=" << _consumption->current_value() << " limit=" << _limit;
+ return Status::OK();
+}
+
+// Calling this on the query tracker results in output like:
+//
+// Query(4a4c81fedaed337d:4acadfda00000000) Limit=10.00 GB Total=508.28 MB Peak=508.45 MB
+// Fragment 4a4c81fedaed337d:4acadfda00000000: Total=8.00 KB Peak=8.00 KB
+// EXCHANGE_NODE (id=4): Total=0 Peak=0
+// DataStreamRecvr: Total=0 Peak=0
+// Block Manager: Limit=6.68 GB Total=394.00 MB Peak=394.00 MB
+// Fragment 4a4c81fedaed337d:4acadfda00000006: Total=233.72 MB Peak=242.24 MB
+// AGGREGATION_NODE (id=1): Total=139.21 MB Peak=139.84 MB
+// HDFS_SCAN_NODE (id=0): Total=93.94 MB Peak=102.24 MB
+// DataStreamSender (dst_id=2): Total=45.99 KB Peak=85.99 KB
+// Fragment 4a4c81fedaed337d:4acadfda00000003: Total=274.55 MB Peak=274.62 MB
+// AGGREGATION_NODE (id=3): Total=274.50 MB Peak=274.50 MB
+// EXCHANGE_NODE (id=2): Total=0 Peak=0
+// DataStreamRecvr: Total=45.91 KB Peak=684.07 KB
+// DataStreamSender (dst_id=4): Total=680.00 B Peak=680.00 B
+//
+// If 'reservation_metrics_' are set, we ge a more granular breakdown:
+// TrackerName: Limit=5.00 MB Reservation=5.00 MB OtherMemory=1.04 MB
+// Total=6.04 MB Peak=6.45 MB
+//
+std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logged_consumption) {
+ // Make sure the consumption is up to date.
+ int64_t curr_consumption = consumption();
+ int64_t peak_consumption = _consumption->value();
+ if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
+
+ std::string detail =
+ "MemTracker log_usage Label: {}, Limit: {}, Total: {}, Peak: {}, Exceeded: {}";
+ detail = fmt::format(detail, _label, PrettyPrinter::print(_limit, TUnit::BYTES),
+ PrettyPrinter::print(curr_consumption, TUnit::BYTES),
+ PrettyPrinter::print(peak_consumption, TUnit::BYTES),
+ limit_exceeded() ? "true" : "false");
+
+ // This call does not need the children, so return early.
+ if (max_recursive_depth == 0) return detail;
+
+ // Recurse and get information about the children
+ int64_t child_consumption;
+ std::string child_trackers_usage;
+ std::list<MemTrackerLimiter*> limiter_children;
+ std::list<MemTrackerObserve*> observe_children;
+ {
+ std::lock_guard<SpinLock> l(_child_trackers_lock);
+ limiter_children = _child_limiter_trackers;
+ observe_children = _child_observe_trackers;
+ }
+ child_trackers_usage = log_usage(max_recursive_depth - 1, limiter_children, &child_consumption);
+ for (const auto& child : observe_children) {
+ child_trackers_usage += "\n" + child->log_usage(&child_consumption);
+ }
+ if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage;
+ return detail;
+}
+
+std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
+ const std::list<MemTrackerLimiter*>& trackers,
+ int64_t* logged_consumption) {
+ *logged_consumption = 0;
+ std::vector<std::string> usage_strings;
+ for (const auto& tracker : trackers) {
+ if (tracker) {
+ int64_t tracker_consumption;
+ std::string usage_string =
+ tracker->log_usage(max_recursive_depth, &tracker_consumption);
+ if (!usage_string.empty()) usage_strings.push_back(usage_string);
+ *logged_consumption += tracker_consumption;
+ }
+ }
+ return join(usage_strings, "\n");
+}
+
+Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& details,
+ int64_t failed_allocation_size, Status failed_alloc) {
+ STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+ MemTrackerLimiter* process_tracker = MemTrackerLimiter::get_process_tracker();
+ std::string detail =
+ "Memory exceed limit. fragment={}, details={}, on backend={}. Memory left in process "
+ "limit={}.";
+ detail = fmt::format(detail, state != nullptr ? print_id(state->fragment_instance_id()) : "",
+ details, BackendOptions::get_localhost(),
+ PrettyPrinter::print(process_tracker->spare_capacity(), TUnit::BYTES));
+ if (!failed_alloc) {
+ detail += " failed alloc=<{}>. current tracker={}.";
+ detail = fmt::format(detail, failed_alloc.to_string(), _label);
+ } else {
+ detail += " current tracker <label={}, used={}, limit={}, failed alloc size={}>.";
+ detail = fmt::format(detail, _label, _consumption->current_value(), _limit,
+ PrettyPrinter::print(failed_allocation_size, TUnit::BYTES));
+ }
+ detail += " If this is a query, can change the limit by session variable exec_mem_limit.";
+ Status status = Status::MemoryLimitExceeded(detail);
+ if (state != nullptr) state->log_error(detail);
+
+ // only print the tracker log_usage in be log.
+ if (process_tracker->spare_capacity() < failed_allocation_size) {
+ // Dumping the process MemTracker is expensive. Limiting the recursive depth to two
+ // levels limits the level of detail to a one-line summary for each query MemTracker.
+ detail += "\n" + process_tracker->log_usage(2);
+ }
+ detail += "\n" + log_usage();
+
+ LOG(WARNING) << detail;
+ return status;
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
new file mode 100644
index 0000000000..d5d937523f
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/config.h"
+#include "runtime/memory/mem_tracker_base.h"
+#include "runtime/runtime_state.h"
+#include "util/mem_info.h"
+
+namespace doris {
+
+class MemTrackerObserve;
+
+// Tracker contains an limit, and can be arranged into a tree structure such that the consumption
+// tracked by a MemTracker is also tracked by its ancestors.
+// Used for:
+// 1. Track and limit the memory usage of process and query.
+// Automatic memory consume based on system memory allocation (Currently, based on TCMlloc hook).
+// 2. Execution logic that requires memory size to participate in control.
+// Manual consumption, but will not affect the overall statistics of the process.
+//
+// We use a five-level hierarchy of mem trackers: process, query pool, query, instance,
+// node. Specific parts of the fragment (exec nodes, sinks, etc) will add a
+// fifth level when they are initialized.
+//
+// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is
+// reached. If limit_exceeded() is called and the limit is exceeded, it will first call
+// the GcFunctions to try to free memory and recheck the limit. For example, the process
+// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so
+// this will be called before the process limit is reported as exceeded. GcFunctions are
+// called in the order they are added, so expensive functions should be added last.
+// GcFunctions are called with a global lock held, so should be non-blocking and not
+// call back into MemTrackers, except to release memory.
+class MemTrackerLimiter final : public MemTrackerBase {
+public:
+ // Creates and adds the tracker to the tree
+ static MemTrackerLimiter* create_tracker(int64_t byte_limit, const std::string& label,
+ MemTrackerLimiter* parent = nullptr,
+ RuntimeProfile* profile = nullptr);
+
+ // Walks the MemTrackerLimiter hierarchy and populates _ancestor_all_trackers and limit_trackers_
+ void init(int64_t limit);
+
+ ~MemTrackerLimiter();
+
+ // Adds tracker to _child_trackers
+ void add_child_tracker(MemTrackerLimiter* tracker);
+ void add_child_tracker(MemTrackerObserve* tracker);
+ // Remove tracker from _child_trackers
+ void remove_child_tracker(MemTrackerLimiter* tracker);
+ void remove_child_tracker(MemTrackerObserve* tracker);
+
+ // Leaf tracker, without any child
+ bool is_leaf() { _child_limiter_trackers.size() + _child_observe_trackers.size() == 0; }
+
+ // Gets a "process" tracker, creating it if necessary.
+ static MemTrackerLimiter* get_process_tracker();
+
+ // Returns a list of all the valid trackers.
+ static void list_process_trackers(std::vector<MemTrackerBase*>* trackers);
+
+public:
+ // The following func, for execution logic that requires memory size to participate in control.
+ // this does not change the value of process tracker.
+
+ // only consume self, will not sync to parent. Usually used to manually record the specified memory,
+ // It is independent of the automatically recording of thread local tracker, so the same block of memory
+ // will be recorded in the thread local tracker and the current tracker at the same time.
+ void consume_self(int64_t bytes);
+ void release_self(int64_t bytes) { consume_self(-bytes); }
+
+ // up to (but not including) end_tracker.
+ // This is useful if we want to move tracking between trackers that share a common (i.e. end_tracker)
+ // ancestor. This happens when we want to update tracking on a particular mem tracker but the consumption
+ // against the limit recorded in one of its ancestors already happened.
+ void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker);
+ void release_local(int64_t bytes, MemTrackerLimiter* end_tracker) {
+ consume_local(-bytes, end_tracker);
+ }
+
+ // Transfer 'bytes' of consumption from this tracker to 'dst'.
+ // Forced transfer, 'dst' may limit exceed, and more ancestor trackers will be updated.
+ void transfer_to(MemTrackerLimiter* dst, int64_t bytes);
+
+ // When the accumulated untracked memory value exceeds the upper limit,
+ // the current value is returned and set to 0.
+ // Thread safety.
+ int64_t add_untracked_mem(int64_t bytes);
+
+ // In most cases, no need to call flush_untracked_mem on the child tracker,
+ // because when it is destructed, theoretically all its children have been destructed.
+ void flush_untracked_mem() { consume(_untracked_mem.exchange(0)); }
+
+ // Find the common ancestor and update trackers between 'this'/'dst' and
+ // the common ancestor. This logic handles all cases, including the
+ // two trackers being the same or being ancestors of each other because
+ // 'all_trackers_' includes the current tracker.
+ MemTrackerLimiter* common_ancestor(MemTrackerLimiter* dst);
+
+public:
+ // The following func, for mem limit.
+
+ Status check_sys_mem_info(int64_t bytes) {
+ // TODO add mmap
+ if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
+ return Status::MemoryLimitExceeded(fmt::format(
+ "{}: TryConsume failed, bytes={} process whole consumption={} mem limit={}",
+ _label, bytes, MemInfo::current_mem(), MemInfo::mem_limit()));
+ }
+ return Status::OK();
+ }
+
+ bool has_limit() const { return _limit >= 0; }
+ int64_t limit() const { return _limit; }
+ void update_limit(int64_t limit) {
+ DCHECK(has_limit());
+ _limit = limit;
+ }
+ bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); }
+ bool any_limit_exceeded() const { return limit_exceeded_tracker() != nullptr; }
+
+ // Returns true if a valid limit of this tracker or one of its ancestors is exceeded.
+ MemTrackerLimiter* limit_exceeded_tracker() const;
+
+ Status check_limit(int64_t bytes);
+
+ // Returns the maximum consumption that can be made without exceeding the limit on
+ // this tracker or any of its parents. Returns int64_t::max() if there are no
+ // limits and a negative value if any limit is already exceeded.
+ int64_t spare_capacity() const;
+
+ // Returns the lowest limit for this tracker and its ancestors. Returns -1 if there is no limit.
+ int64_t get_lowest_limit() const;
+
+ typedef std::function<void(int64_t bytes_to_free)> GcFunction;
+ /// Add a function 'f' to be called if the limit is reached, if none of the other
+ /// previously-added GC functions were successful at freeing up enough memory.
+ /// 'f' does not need to be thread-safe as long as it is added to only one MemTrackerLimiter.
+ /// Note that 'f' must be valid for the lifetime of this MemTrackerLimiter.
+ void add_gc_function(GcFunction f) { _gc_functions.push_back(f); }
+
+ // If consumption is higher than max_consumption, attempts to free memory by calling
+ // any added GC functions. Returns true if max_consumption is still exceeded. Takes gc_lock.
+ // Note: If the cache of segment/chunk is released due to insufficient query memory at a certain moment,
+ // the performance of subsequent queries may be degraded, so the use of gc function should be careful enough.
+ bool gc_memory(int64_t max_consumption);
+ Status try_gc_memory(int64_t bytes);
+
+ /// Logs the usage of this tracker and optionally its children (recursively).
+ /// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
+ /// 'max_recursive_depth' specifies the maximum number of levels of children
+ /// to include in the dump. If it is zero, then no children are dumped.
+ /// Limiting the recursive depth reduces the cost of dumping, particularly
+ /// for the process MemTracker.
+ std::string log_usage(int max_recursive_depth = INT_MAX, int64_t* logged_consumption = nullptr);
+
+ // Log the memory usage when memory limit is exceeded and return a status object with
+ // details of the allocation which caused the limit to be exceeded.
+ // If 'failed_allocation_size' is greater than zero, logs the allocation size. If
+ // 'failed_allocation_size' is zero, nothing about the allocation size is logged.
+ // If 'state' is non-nullptr, logs the error to 'state'.
+ Status mem_limit_exceeded(RuntimeState* state, const std::string& details = std::string(),
+ int64_t failed_allocation = -1, Status failed_alloc = Status::OK());
+
+ std::string debug_string() {
+ std::stringstream msg;
+ msg << "limit: " << _limit << "; "
+ << "consumption: " << _consumption->current_value() << "; "
+ << "label: " << _label << "; "
+ << "all tracker size: " << _ancestor_all_trackers.size() << "; "
+ << "limit trackers size: " << _ancestor_limiter_trackers.size() << "; "
+ << "parent is null: " << ((_parent == nullptr) ? "true" : "false") << "; ";
+ return msg.str();
+ }
+
+private:
+ // The following func, for automatic memory tracking and limiting based on system memory allocation.
+ friend class ThreadMemTrackerMgr;
+
+ MemTrackerLimiter(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile)
+ : MemTrackerBase(label, parent, profile) {}
+
+ // Creates the process tracker.
+ static void create_process_tracker();
+
+ // Increases consumption of this tracker and its ancestors by 'bytes'.
+ void consume(int64_t bytes);
+
+ // Decreases consumption of this tracker and its ancestors by 'bytes'.
+ void release(int64_t bytes) { consume(-bytes); }
+
+ // Increases consumption of this tracker and its ancestors by 'bytes' only if
+ // they can all consume 'bytes' without exceeding limit. If limit would be exceed,
+ // no MemTrackers are updated. Returns true if the consumption was successfully updated.
+ WARN_UNUSED_RESULT
+ Status try_consume(int64_t bytes);
+
+ /// Log consumption of all the trackers provided. Returns the sum of consumption in
+ /// 'logged_consumption'. 'max_recursive_depth' specifies the maximum number of levels
+ /// of children to include in the dump. If it is zero, then no children are dumped.
+ static std::string log_usage(int max_recursive_depth,
+ const std::list<MemTrackerLimiter*>& trackers,
+ int64_t* logged_consumption);
+
+private:
+ // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
+ int64_t _limit;
+
+ // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate
+ // to avoid frequent calls to consume/release of MemTracker.
+ std::atomic<int64_t> _untracked_mem = 0;
+
+ // All the child trackers of this tracker. Used for error reporting and
+ // listing only (i.e. updating the consumption of a parent tracker does not
+ // update that of its children).
+ SpinLock _child_trackers_lock;
+ std::list<MemTrackerLimiter*> _child_limiter_trackers;
+ std::list<MemTrackerObserve*> _child_observe_trackers;
+ // Iterator into parent_->_child_limiter_trackers for this object. Stored to have O(1) remove.
+ std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
+
+ // this tracker plus all of its ancestors
+ std::vector<MemTrackerLimiter*> _ancestor_all_trackers;
+ // _ancestor_all_trackers with valid limits
+ std::vector<MemTrackerLimiter*> _ancestor_limiter_trackers;
+
+ // Lock to protect gc_memory(). This prevents many GCs from occurring at once.
+ std::mutex _gc_lock;
+ // Functions to call after the limit is reached to free memory.
+ std::vector<GcFunction> _gc_functions;
+};
+
+inline void MemTrackerLimiter::consume(int64_t bytes) {
+ if (bytes == 0) {
+ return;
+ } else {
+ for (auto& tracker : _ancestor_all_trackers) {
+ tracker->_consumption->add(bytes);
+ }
+ }
+}
+
+inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
+ if (bytes <= 0) {
+ release(-bytes);
+ return Status::OK();
+ }
+ RETURN_IF_ERROR(check_sys_mem_info(bytes));
+ int i;
+ // Walk the tracker tree top-down.
+ for (i = _ancestor_all_trackers.size() - 1; i >= 0; --i) {
+ MemTrackerLimiter* tracker = _ancestor_all_trackers[i];
+ if (tracker->limit() < 0) {
+ tracker->_consumption->add(bytes); // No limit at this tracker.
+ } else {
+ // If TryConsume fails, we can try to GC, but we may need to try several times if
+ // there are concurrent consumers because we don't take a lock before trying to
+ // update _consumption.
+ while (true) {
+ if (LIKELY(tracker->_consumption->try_add(bytes, tracker->limit()))) break;
+ Status st = tracker->try_gc_memory(bytes);
+ if (!st) {
+ // Failed for this mem tracker. Roll back the ones that succeeded.
+ for (int j = _ancestor_all_trackers.size() - 1; j > i; --j) {
+ _ancestor_all_trackers[j]->_consumption->add(-bytes);
+ }
+ return st;
+ }
+ }
+ }
+ }
+ // Everyone succeeded, return.
+ DCHECK_EQ(i, -1);
+ return Status::OK();
+}
+
+inline void MemTrackerLimiter::consume_self(int64_t bytes) {
+ int64_t consume_bytes = add_untracked_mem(bytes);
+ if (consume_bytes != 0) {
+ _consumption->add(consume_bytes);
+ }
+}
+
+inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter* end_tracker) {
+ DCHECK(end_tracker);
+ if (bytes == 0) return;
+ for (auto& tracker : _ancestor_all_trackers) {
+ if (tracker == end_tracker) return;
+ tracker->consume_self(bytes);
+ }
+}
+
+inline void MemTrackerLimiter::transfer_to(MemTrackerLimiter* dst, int64_t bytes) {
+ DCHECK(dst->is_limited());
+ if (id() == dst->id()) return;
+ release_local(bytes, MemTrackerLimiter::get_process_tracker());
+ dst->consume_local(bytes, MemTrackerLimiter::get_process_tracker());
+}
+
+inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
+ _untracked_mem += bytes;
+ if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) {
+ return _untracked_mem.exchange(0);
+ }
+ return 0;
+}
+
+inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
+ if (bytes <= 0) return Status::OK();
+ RETURN_IF_ERROR(check_sys_mem_info(bytes));
+ int i;
+ // Walk the tracker tree top-down.
+ for (i = _ancestor_all_trackers.size() - 1; i >= 0; --i) {
+ MemTrackerLimiter* tracker = _ancestor_all_trackers[i];
+ if (tracker->limit() > 0) {
+ while (true) {
+ if (LIKELY(tracker->_consumption->current_value() + bytes < tracker->limit()))
+ break;
+ RETURN_IF_ERROR(tracker->try_gc_memory(bytes));
+ }
+ }
+ }
+ return Status::OK();
+}
+
+#define RETURN_LIMIT_EXCEEDED(tracker, ...) return tracker->mem_limit_exceeded(__VA_ARGS__);
+#define RETURN_IF_LIMIT_EXCEEDED(tracker, state, msg) \
+ if (tracker->any_limit_exceeded()) RETURN_LIMIT_EXCEEDED(tracker, state, msg);
+#define RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, msg) \
+ if (state->instance_mem_tracker()->any_limit_exceeded()) \
+ RETURN_LIMIT_EXCEEDED(state->instance_mem_tracker(), state, msg);
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_observe.cpp b/be/src/runtime/memory/mem_tracker_observe.cpp
new file mode 100644
index 0000000000..f696df2f94
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_observe.cpp
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/mem_tracker_observe.h"
+
+#include <fmt/format.h>
+#include <parallel_hashmap/phmap.h>
+
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
+#include "util/pretty_printer.h"
+
+namespace doris {
+
+using TemporaryTrackersMap = phmap::parallel_flat_hash_map<
+ std::string, MemTrackerObserve*, phmap::priv::hash_default_hash<std::string>,
+ phmap::priv::hash_default_eq<std::string>,
+ std::allocator<std::pair<const std::string, MemTrackerObserve*>>, 12, std::mutex>;
+
+static TemporaryTrackersMap _temporary_mem_trackers;
+
+MemTrackerObserve* MemTrackerObserve::create_tracker(const std::string& label,
+ RuntimeProfile* profile) {
+ STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+ MemTrackerLimiter* parent = tls_ctx()->_thread_mem_tracker_mgr->limiter_mem_tracker();
+ DCHECK(parent);
+ std::string parent_label = parent->label();
+ std::string reset_label;
+ if (parent_label.find_first_of("#") != parent_label.npos) {
+ reset_label = fmt::format("[Observe]-{}#{}", label,
+ parent_label.substr(parent_label.find_first_of("#"), -1));
+ } else {
+ reset_label = fmt::format("[Observe]-{}", label);
+ }
+ MemTrackerObserve* tracker(new MemTrackerObserve(reset_label, parent, profile));
+ parent->add_child_tracker(tracker);
+ tracker->set_is_observed();
+ return tracker;
+}
+
+MemTrackerObserve::~MemTrackerObserve() {
+ if (parent()) {
+ parent()->remove_child_tracker(this);
+ }
+}
+
+// Count the memory in the scope to a temporary tracker with the specified label name.
+// This is very useful when debugging. You can find the position where the tracker statistics are
+// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots.
+// TODO(zxy) track specifies the memory for each line in the code segment, instead of manually adding
+// a switch temporary tracker to each line. Maybe there are open source tools to do this?
+MemTrackerObserve* MemTrackerObserve::get_temporary_mem_tracker(const std::string& label) {
+ // First time this label registered, make a new object, otherwise do nothing.
+ // Avoid using locks to resolve erase conflicts.
+ _temporary_mem_trackers.try_emplace_l(
+ label, [](MemTrackerObserve*) {},
+ MemTrackerObserve::create_tracker(fmt::format("[Temporary]-{}", label)));
+ return _temporary_mem_trackers[label];
+}
+
+std::string MemTrackerObserve::log_usage(int64_t* logged_consumption) {
+ // Make sure the consumption is up to date.
+ int64_t curr_consumption = consumption();
+ int64_t peak_consumption = _consumption->value();
+ if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
+ if (curr_consumption == 0) return "";
+ std::string detail = "MemTracker log_usage Label: {}, Total: {}, Peak: {}";
+ detail = fmt::format(detail, _label, PrettyPrinter::print(curr_consumption, TUnit::BYTES),
+ PrettyPrinter::print(peak_consumption, TUnit::BYTES));
+ return detail;
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_observe.h b/be/src/runtime/memory/mem_tracker_observe.h
new file mode 100644
index 0000000000..3213319207
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_observe.h
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/memory/mem_tracker_base.h"
+
+namespace doris {
+
+class MemTrackerLimiter;
+
+// Used to manually track memory usage at specified locations, including all exec node trackers.
+//
+// There is no parent-child relationship between MemTrackerObserves. Both fathers are fragment instance trakcers,
+// but their consumption will not consume fragment instance trakcers synchronously. Therefore, errors in statistics
+// will not affect the memory tracking and restrictions of processes and Query.
+class MemTrackerObserve final : public MemTrackerBase {
+public:
+ // Creates and adds the tracker to the tree
+ static MemTrackerObserve* create_tracker(const std::string& label,
+ RuntimeProfile* profile = nullptr);
+
+ ~MemTrackerObserve();
+
+ // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get.
+ // Temporary trackers are not automatically destructed, which is usually used for debugging.
+ static MemTrackerObserve* get_temporary_mem_tracker(const std::string& label);
+
+public:
+ void consume(int64_t bytes);
+
+ void release(int64_t bytes) { consume(-bytes); }
+
+ static void batch_consume(int64_t bytes, const std::vector<MemTrackerObserve*>& trackers) {
+ for (auto& tracker : trackers) {
+ tracker->consume(bytes);
+ }
+ }
+
+ // Transfer 'bytes' of consumption from this tracker to 'dst'.
+ void transfer_to(MemTrackerObserve* dst, int64_t bytes);
+
+ bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < consumption(); }
+
+ std::string log_usage(int64_t* logged_consumption = nullptr);
+
+ std::string debug_string() {
+ std::stringstream msg;
+ msg << "label: " << _label << "; "
+ << "consumption: " << _consumption->current_value() << "; "
+ << "parent is null: " << ((_parent == nullptr) ? "true" : "false") << "; ";
+ return msg.str();
+ }
+
+ // Iterator into parent_->_child_observe_trackers for this object. Stored to have O(1) remove.
+ std::list<MemTrackerObserve*>::iterator _child_tracker_it;
+
+private:
+ MemTrackerObserve(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile)
+ : MemTrackerBase(label, parent, profile) {}
+};
+
+inline void MemTrackerObserve::consume(int64_t bytes) {
+ if (bytes == 0) {
+ return;
+ } else {
+ _consumption->add(bytes);
+ }
+}
+
+inline void MemTrackerObserve::transfer_to(MemTrackerObserve* dst, int64_t bytes) {
+ if (id() == dst->id()) return;
+ release(bytes);
+ dst->consume(bytes);
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
new file mode 100644
index 0000000000..d643acdc4b
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/mem_tracker_task_pool.h"
+
+#include "common/config.h"
+#include "runtime/exec_env.h"
+#include "util/pretty_printer.h"
+
+namespace doris {
+
+MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id,
+ int64_t mem_limit,
+ const std::string& label,
+ MemTrackerLimiter* parent) {
+ DCHECK(!task_id.empty());
+ // First time this task_id registered, make a new object, otherwise do nothing.
+ // Combine create_tracker and emplace into one operation to avoid the use of locks
+ // Name for task MemTrackers. '$0' is replaced with the task id.
+ _task_mem_trackers.try_emplace_l(
+ task_id, [](MemTrackerLimiter*) {},
+ MemTrackerLimiter::create_tracker(mem_limit, label, parent));
+ return get_task_mem_tracker(task_id);
+}
+
+MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id,
+ int64_t mem_limit) {
+ VLOG_FILE << "Register Query memory tracker, query id: " << query_id
+ << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
+ return register_task_mem_tracker_impl(query_id, mem_limit,
+ fmt::format("Query#queryId={}", query_id),
+ ExecEnv::GetInstance()->query_pool_mem_tracker());
+}
+
+MemTrackerLimiter* MemTrackerTaskPool::register_load_mem_tracker(const std::string& load_id,
+ int64_t mem_limit) {
+ // In load, the query id of the fragment is executed, which is the same as the load id of the load channel.
+ VLOG_FILE << "Register Load memory tracker, load id: " << load_id
+ << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
+ return register_task_mem_tracker_impl(load_id, mem_limit,
+ fmt::format("Load#loadId={}", load_id),
+ ExecEnv::GetInstance()->load_pool_mem_tracker());
+}
+
+MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& task_id) {
+ DCHECK(!task_id.empty());
+ MemTrackerLimiter* tracker = nullptr;
+ // Avoid using locks to resolve erase conflicts
+ _task_mem_trackers.if_contains(task_id, [&tracker](MemTrackerLimiter* v) { tracker = v; });
+ return tracker;
+}
+
+void MemTrackerTaskPool::logout_task_mem_tracker() {
+ std::vector<std::string> expired_tasks;
+ for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) {
+ if (!it->second) {
+ // https://github.com/apache/incubator-doris/issues/10006
+ expired_tasks.emplace_back(it->first);
+ } else if (it->second->is_leaf() == true && it->second->peak_consumption() > 0) {
+ // No RuntimeState uses this task MemTracker, it is only referenced by this map,
+ // and tracker was not created soon, delete it.
+ if (config::memory_leak_detection && it->second->consumption() != 0) {
+ // If consumption is not equal to 0 before query mem tracker is destructed,
+ // there are two possibilities in theory.
+ // 1. A memory leak occurs.
+ // 2. Some of the memory consumed/released on the query mem tracker is actually released/consume on
+ // other trackers such as the process mem tracker, and there is no manual transfer between the two trackers.
+ //
+ // The second case should be eliminated in theory, but it has not been done so far, so the query memory leak
+ // cannot be located, and the value of the query pool mem tracker statistics will be inaccurate.
+ LOG(WARNING) << "Task memory tracker memory leak:" << it->second->debug_string();
+ }
+ // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers,
+ // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is,
+ // the negative number of the current value of consume.
+ it->second->parent()->consume_local(-it->second->consumption(),
+ MemTrackerLimiter::get_process_tracker());
+ expired_tasks.emplace_back(it->first);
+ } else {
+ // Log limit exceeded query tracker.
+ if (it->second->limit_exceeded()) {
+ it->second->mem_limit_exceeded(
+ nullptr,
+ fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first),
+ 0, Status::OK());
+ }
+ }
+ }
+ for (auto tid : expired_tasks) {
+ if (!_task_mem_trackers[tid]) {
+ _task_mem_trackers.erase(tid);
+ VLOG_FILE << "Deregister null task mem tracker, task id: " << tid;
+ } else {
+ delete _task_mem_trackers[tid];
+ _task_mem_trackers.erase(tid);
+ VLOG_FILE << "Deregister not used task mem tracker, task id: " << tid;
+ }
+ }
+}
+
+// TODO(zxy) More observable methods
+// /// Logs the usage of 'limit' number of queries based on maximum total memory
+// /// consumption.
+// std::string MemTracker::LogTopNQueries(int limit) {
+// if (limit == 0) return "";
+// priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
+// std::greater<pair<int64_t, string>>>
+// min_pq;
+// GetTopNQueries(min_pq, limit);
+// std::vector<string> usage_strings(min_pq.size());
+// while (!min_pq.empty()) {
+// usage_strings.push_back(min_pq.top().second);
+// min_pq.pop();
+// }
+// std::reverse(usage_strings.begin(), usage_strings.end());
+// return join(usage_strings, "\n");
+// }
+
+// /// Helper function for LogTopNQueries that iterates through the MemTracker hierarchy
+// /// and populates 'min_pq' with 'limit' number of elements (that contain state related
+// /// to query MemTrackers) based on maximum total memory consumption.
+// void MemTracker::GetTopNQueries(
+// priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
+// greater<pair<int64_t, string>>>& min_pq,
+// int limit) {
+// list<weak_ptr<MemTracker>> children;
+// {
+// lock_guard<SpinLock> l(child_trackers_lock_);
+// children = child_trackers_;
+// }
+// for (const auto& child_weak : children) {
+// shared_ptr<MemTracker> child = child_weak.lock();
+// if (child) {
+// child->GetTopNQueries(min_pq, limit);
+// }
+// }
+// }
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h
new file mode 100644
index 0000000000..ae7d82caf4
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <parallel_hashmap/phmap.h>
+
+#include "runtime/memory/mem_tracker_limiter.h"
+
+namespace doris {
+
+using TaskTrackersMap = phmap::parallel_flat_hash_map<
+ std::string, MemTrackerLimiter*, phmap::priv::hash_default_hash<std::string>,
+ phmap::priv::hash_default_eq<std::string>,
+ std::allocator<std::pair<const std::string, MemTrackerLimiter*>>, 12, std::mutex>;
+
+// Global task pool for query MemTrackers. Owned by ExecEnv.
+class MemTrackerTaskPool {
+public:
+ // Construct a MemTracker object for 'task_id' with 'mem_limit' as the memory limit.
+ // The MemTracker is a child of the pool MemTracker, Calling this with the same
+ // 'task_id' will return the same MemTracker object. This is used to track the local
+ // memory usage of all tasks executing. The first time this is called for a task,
+ // a new MemTracker object is created with the pool tracker as its parent.
+ // Newly created trackers will always have a limit of -1.
+ MemTrackerLimiter* register_task_mem_tracker_impl(const std::string& task_id, int64_t mem_limit,
+ const std::string& label,
+ MemTrackerLimiter* parent);
+ MemTrackerLimiter* register_query_mem_tracker(const std::string& query_id, int64_t mem_limit);
+ MemTrackerLimiter* register_load_mem_tracker(const std::string& load_id, int64_t mem_limit);
+
+ MemTrackerLimiter* get_task_mem_tracker(const std::string& task_id);
+
+ // Remove the mem tracker that has ended the query.
+ void logout_task_mem_tracker();
+
+private:
+ // All per-task MemTracker objects.
+ // The life cycle of task memtracker in the process is the same as task runtime state,
+ // MemTrackers will be removed from this map after query finish or cancel.
+ TaskTrackersMap _task_mem_trackers;
+};
+
+} // namespace doris
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org