You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2022/07/12 10:09:47 UTC

[doris] branch master updated: mem_tracker_factor_v2 (#10743)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 41f9ee2f9e mem_tracker_factor_v2 (#10743)
41f9ee2f9e is described below

commit 41f9ee2f9e07864bfa5d2a1512db336c83802496
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Tue Jul 12 18:09:41 2022 +0800

    mem_tracker_factor_v2 (#10743)
---
 be/src/runtime/memory/mem_tracker_base.cpp      |  53 ++++
 be/src/runtime/memory/mem_tracker_base.h        |  78 ++++++
 be/src/runtime/memory/mem_tracker_limiter.cpp   | 333 +++++++++++++++++++++++
 be/src/runtime/memory/mem_tracker_limiter.h     | 348 ++++++++++++++++++++++++
 be/src/runtime/memory/mem_tracker_observe.cpp   |  87 ++++++
 be/src/runtime/memory/mem_tracker_observe.h     |  91 +++++++
 be/src/runtime/memory/mem_tracker_task_pool.cpp | 153 +++++++++++
 be/src/runtime/memory/mem_tracker_task_pool.h   |  58 ++++
 8 files changed, 1201 insertions(+)

diff --git a/be/src/runtime/memory/mem_tracker_base.cpp b/be/src/runtime/memory/mem_tracker_base.cpp
new file mode 100644
index 0000000000..bb407e2bf8
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_base.cpp
@@ -0,0 +1,53 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.cpp
+// and modified by Doris
+
+#include "runtime/memory/mem_tracker_base.h"
+
+#include "util/time.h"
+
+namespace doris {
+
+const std::string MemTrackerBase::COUNTER_NAME = "PeakMemoryUsage";
+
+MemTrackerBase::MemTrackerBase(const std::string& label, MemTrackerLimiter* parent,
+                               RuntimeProfile* profile)
+        : _label(label),
+          // Not 100% sure the id is unique. This is generated because it is faster than converting to int after hash.
+          _id((GetCurrentTimeMicros() % 1000000) * 100 + _label.length()),
+          _parent(parent) {
+    if (profile == nullptr) {
+        _consumption = std::make_shared<RuntimeProfile::HighWaterMarkCounter>(TUnit::BYTES);
+    } else {
+        // By default, memory consumption is tracked via calls to consume()/release(), either to
+        // the tracker itself or to one of its descendents. Alternatively, a consumption metric
+        // can be specified, and then the metric's value is used as the consumption rather than
+        // the tally maintained by consume() and release(). A tcmalloc metric is used to track
+        // process memory consumption, since the process memory usage may be higher than the
+        // computed total memory (tcmalloc does not release deallocated memory immediately).
+        // Other consumption metrics are used in trackers below the process level to account
+        // for memory (such as free buffer pool buffers) that is not tracked by consume() and
+        // release().
+        _consumption = profile->AddSharedHighWaterMarkCounter(COUNTER_NAME, TUnit::BYTES);
+    }
+}
+
+MemTrackerBase::MemTrackerBase(const std::string& label)
+        : MemTrackerBase(label, nullptr, nullptr) {}
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_base.h b/be/src/runtime/memory/mem_tracker_base.h
new file mode 100644
index 0000000000..10d554839b
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_base.h
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+// This file is copied from
+// https://github.com/apache/impala/blob/branch-2.9.0/be/src/runtime/mem-tracker.h
+// and modified by Doris
+
+#pragma once
+
+#include "util/runtime_profile.h"
+
+namespace doris {
+
+class MemTrackerLimiter;
+
+// A MemTracker tracks memory consumption.
+// This class is thread-safe.
+class MemTrackerBase {
+public:
+    const std::string& label() const { return _label; }
+
+    // Returns the memory consumed in bytes.
+    int64_t consumption() const { return _consumption->current_value(); }
+    int64_t peak_consumption() const { return _consumption->value(); }
+
+    MemTrackerBase(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile);
+
+    // this is used for creating an orphan mem tracker, or for unit test.
+    // If a mem tracker has parent, it should be created by `create_tracker()`
+    MemTrackerBase(const std::string& label = std::string());
+
+    MemTrackerLimiter* parent() const { return _parent; }
+    int64_t id() { return _id; }
+    bool is_limited() { return _is_limited; } // MemTrackerLimiter
+    bool is_observed() { return _is_observed; }
+    void set_is_limited() { _is_limited = true; } // MemTrackerObserve
+    void set_is_observed() { _is_observed = true; }
+
+    // Usually, a negative values means that the statistics are not accurate,
+    // 1. The released memory is not consumed.
+    // 2. The same block of memory, tracker A calls consume, and tracker B calls release.
+    // 3. Repeated releases of MemTacker. When the consume is called on the child MemTracker,
+    //    after the release is called on the parent MemTracker,
+    //    the child ~MemTracker will cause repeated releases.
+    void memory_leak_check() { DCHECK_EQ(_consumption->current_value(), 0); }
+
+    static const std::string COUNTER_NAME;
+
+protected:
+    // label used in the usage string (log_usage())
+    std::string _label;
+
+    // Automatically generated, unique for each mem tracker.
+    int64_t _id;
+
+    std::shared_ptr<RuntimeProfile::HighWaterMarkCounter> _consumption; // in bytes
+
+    bool _is_limited = false; // is MemTrackerLimiter
+
+    bool _is_observed = false; // is MemTrackerObserve
+
+    MemTrackerLimiter* _parent; // The parent of this tracker.
+};
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
new file mode 100644
index 0000000000..6193558ab0
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -0,0 +1,333 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/mem_tracker_limiter.h"
+
+#include <fmt/format.h>
+
+#include "gutil/once.h"
+#include "runtime/memory/mem_tracker_observe.h"
+#include "runtime/thread_context.h"
+#include "service/backend_options.h"
+#include "util/pretty_printer.h"
+#include "util/string_util.h"
+
+namespace doris {
+
+// The ancestor for all trackers. Every tracker is visible from the process down.
+// All manually created trackers should specify the process tracker as the parent.
+static MemTrackerLimiter* process_tracker;
+static GoogleOnceType process_tracker_once = GOOGLE_ONCE_INIT;
+
+MemTrackerLimiter* MemTrackerLimiter::create_tracker(int64_t byte_limit, const std::string& label,
+                                                     MemTrackerLimiter* parent,
+                                                     RuntimeProfile* profile) {
+    // Do not check limit exceed when add_child_tracker, otherwise it will cause deadlock when log_usage is called.
+    STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+    if (!parent) {
+        parent = MemTrackerLimiter::get_process_tracker();
+    }
+    MemTrackerLimiter* tracker(new MemTrackerLimiter("[Limit]-" + label, parent, profile));
+    parent->add_child_tracker(tracker);
+    tracker->set_is_limited();
+    tracker->init(byte_limit);
+    return tracker;
+}
+
+void MemTrackerLimiter::init(int64_t limit) {
+    DCHECK_GE(limit, -1);
+    _limit = limit;
+    MemTrackerLimiter* tracker = this;
+    while (tracker != nullptr) {
+        _ancestor_all_trackers.push_back(tracker);
+        if (tracker->has_limit()) _ancestor_limiter_trackers.push_back(tracker);
+        tracker = tracker->_parent;
+    }
+    DCHECK_GT(_ancestor_all_trackers.size(), 0);
+    DCHECK_EQ(_ancestor_all_trackers[0], this);
+}
+
+MemTrackerLimiter::~MemTrackerLimiter() {
+    // TCMalloc hook will be triggered during destructor memtracker, may cause crash.
+    if (_label == "Process") doris::thread_local_ctx._init = false;
+    flush_untracked_mem();
+    if (parent()) {
+        // Do not call release on the parent tracker to avoid repeated releases.
+        // Ensure that all consume/release are triggered by TCMalloc new/delete hook.
+        std::lock_guard<SpinLock> l(_parent->_child_trackers_lock);
+        if (_child_tracker_it != _parent->_child_limiter_trackers.end()) {
+            _parent->_child_limiter_trackers.erase(_child_tracker_it);
+            _child_tracker_it = _parent->_child_limiter_trackers.end();
+        }
+    }
+    // The child observe tracker life cycle is controlled by its parent limiter tarcker.
+    for (audo tracker : _child_observe_trackers) {
+        delete tracker;
+    }
+    DCHECK_EQ(_untracked_mem, 0);
+}
+
+void MemTrackerLimiter::add_child_tracker(MemTrackerLimiter* tracker) {
+    std::lock_guard<SpinLock> l(_child_trackers_lock);
+    tracker->_child_tracker_it =
+            _child_limiter_trackers.insert(_child_limiter_trackers.end(), tracker);
+}
+
+void MemTrackerLimiter::add_child_tracker(MemTrackerObserve* tracker) {
+    std::lock_guard<SpinLock> l(_child_trackers_lock);
+    tracker->_child_tracker_it =
+            _child_observe_trackers.insert(_child_observe_trackers.end(), tracker);
+}
+
+void MemTrackerLimiter::remove_child_tracker(MemTrackerLimiter* tracker) {
+    std::lock_guard<SpinLock> l(_child_trackers_lock);
+    if (tracker->_child_tracker_it != _child_limiter_trackers.end()) {
+        _child_limiter_trackers.erase(tracker->_child_tracker_it);
+        tracker->_child_tracker_it = _child_limiter_trackers.end();
+    }
+}
+
+void MemTrackerLimiter::remove_child_tracker(MemTrackerObserve* tracker) {
+    std::lock_guard<SpinLock> l(_child_trackers_lock);
+    if (tracker->_child_tracker_it != _child_observe_trackers.end()) {
+        _child_observe_trackers.erase(tracker->_child_tracker_it);
+        tracker->_child_tracker_it = _child_observe_trackers.end();
+    }
+}
+
+void MemTrackerLimiter::create_process_tracker() {
+    process_tracker = new MemTrackerLimiter("Process", nullptr, nullptr);
+    process_tracker->init(-1);
+}
+
+MemTrackerLimiter* MemTrackerLimiter::get_process_tracker() {
+    GoogleOnceInit(&process_tracker_once, &MemTrackerLimiter::create_process_tracker);
+    return process_tracker;
+}
+
+void MemTrackerLimiter::list_process_trackers(std::vector<MemTrackerBase*>* trackers) {
+    trackers->clear();
+    std::deque<MemTrackerLimiter*> to_process;
+    to_process.push_front(get_process_tracker());
+    while (!to_process.empty()) {
+        MemTrackerLimiter* t = to_process.back();
+        to_process.pop_back();
+
+        trackers->push_back(t);
+        std::list<MemTrackerLimiter*> limiter_children;
+        std::list<MemTrackerObserve*> observe_children;
+        {
+            std::lock_guard<SpinLock> l(t->_child_trackers_lock);
+            limiter_children = t->_child_limiter_trackers;
+            observe_children = t->_child_observe_trackers;
+        }
+        for (const auto& child : limiter_children) {
+            to_process.emplace_back(std::move(child));
+        }
+        if (config::show_observe_tracker) {
+            for (const auto& child : observe_children) {
+                trackers->push_back(child);
+            }
+        }
+    }
+}
+
+MemTrackerLimiter* MemTrackerLimiter::common_ancestor(MemTrackerLimiter* dst) {
+    if (id() == dst->id()) return dst;
+    DCHECK_EQ(_ancestor_all_trackers.back(), dst->_ancestor_all_trackers.back())
+            << "Must have same ancestor";
+    int ancestor_idx = _ancestor_all_trackers.size() - 1;
+    int dst_ancestor_idx = dst->_ancestor_all_trackers.size() - 1;
+    while (ancestor_idx > 0 && dst_ancestor_idx > 0 &&
+           _ancestor_all_trackers[ancestor_idx - 1] ==
+                   dst->_ancestor_all_trackers[dst_ancestor_idx - 1]) {
+        --ancestor_idx;
+        --dst_ancestor_idx;
+    }
+    return _ancestor_all_trackers[ancestor_idx];
+}
+
+MemTrackerLimiter* MemTrackerLimiter::limit_exceeded_tracker() const {
+    for (const auto& tracker : _ancestor_limiter_trackers) {
+        if (tracker->limit_exceeded()) {
+            return tracker;
+        }
+    }
+    return nullptr;
+}
+
+int64_t MemTrackerLimiter::spare_capacity() const {
+    int64_t result = std::numeric_limits<int64_t>::max();
+    for (const auto& tracker : _ancestor_limiter_trackers) {
+        int64_t mem_left = tracker->limit() - tracker->consumption();
+        result = std::min(result, mem_left);
+    }
+    return result;
+}
+
+int64_t MemTrackerLimiter::get_lowest_limit() const {
+    if (_ancestor_limiter_trackers.empty()) return -1;
+    int64_t min_limit = std::numeric_limits<int64_t>::max();
+    for (const auto& tracker : _ancestor_limiter_trackers) {
+        DCHECK(tracker->has_limit());
+        min_limit = std::min(min_limit, tracker->limit());
+    }
+    return min_limit;
+}
+
+bool MemTrackerLimiter::gc_memory(int64_t max_consumption) {
+    if (max_consumption < 0) return true;
+    std::lock_guard<std::mutex> l(_gc_lock);
+    int64_t pre_gc_consumption = consumption();
+    // Check if someone gc'd before us
+    if (pre_gc_consumption < max_consumption) return false;
+
+    int64_t curr_consumption = pre_gc_consumption;
+    // Free some extra memory to avoid frequent GC, 4M is an empirical value, maybe it will be tested later.
+    const int64_t EXTRA_BYTES_TO_FREE = 4L * 1024L * 1024L * 1024L;
+    // Try to free up some memory
+    for (int i = 0; i < _gc_functions.size(); ++i) {
+        // Try to free up the amount we are over plus some extra so that we don't have to
+        // immediately GC again. Don't free all the memory since that can be unnecessarily
+        // expensive.
+        int64_t bytes_to_free = curr_consumption - max_consumption + EXTRA_BYTES_TO_FREE;
+        _gc_functions[i](bytes_to_free);
+        curr_consumption = consumption();
+        if (max_consumption - curr_consumption <= EXTRA_BYTES_TO_FREE) break;
+    }
+
+    return curr_consumption > max_consumption;
+}
+
+Status MemTrackerLimiter::try_gc_memory(int64_t bytes) {
+    if (UNLIKELY(gc_memory(_limit - bytes))) {
+        return Status::MemoryLimitExceeded(
+                fmt::format("label={} TryConsume failed size={}, used={}, limit={}", label(), bytes,
+                            _consumption->current_value(), _limit));
+    }
+    VLOG_NOTICE << "GC succeeded, TryConsume bytes=" << bytes
+                << " consumption=" << _consumption->current_value() << " limit=" << _limit;
+    return Status::OK();
+}
+
+// Calling this on the query tracker results in output like:
+//
+//  Query(4a4c81fedaed337d:4acadfda00000000) Limit=10.00 GB Total=508.28 MB Peak=508.45 MB
+//    Fragment 4a4c81fedaed337d:4acadfda00000000: Total=8.00 KB Peak=8.00 KB
+//      EXCHANGE_NODE (id=4): Total=0 Peak=0
+//      DataStreamRecvr: Total=0 Peak=0
+//    Block Manager: Limit=6.68 GB Total=394.00 MB Peak=394.00 MB
+//    Fragment 4a4c81fedaed337d:4acadfda00000006: Total=233.72 MB Peak=242.24 MB
+//      AGGREGATION_NODE (id=1): Total=139.21 MB Peak=139.84 MB
+//      HDFS_SCAN_NODE (id=0): Total=93.94 MB Peak=102.24 MB
+//      DataStreamSender (dst_id=2): Total=45.99 KB Peak=85.99 KB
+//    Fragment 4a4c81fedaed337d:4acadfda00000003: Total=274.55 MB Peak=274.62 MB
+//      AGGREGATION_NODE (id=3): Total=274.50 MB Peak=274.50 MB
+//      EXCHANGE_NODE (id=2): Total=0 Peak=0
+//      DataStreamRecvr: Total=45.91 KB Peak=684.07 KB
+//      DataStreamSender (dst_id=4): Total=680.00 B Peak=680.00 B
+//
+// If 'reservation_metrics_' are set, we ge a more granular breakdown:
+//   TrackerName: Limit=5.00 MB Reservation=5.00 MB OtherMemory=1.04 MB
+//                Total=6.04 MB Peak=6.45 MB
+//
+std::string MemTrackerLimiter::log_usage(int max_recursive_depth, int64_t* logged_consumption) {
+    // Make sure the consumption is up to date.
+    int64_t curr_consumption = consumption();
+    int64_t peak_consumption = _consumption->value();
+    if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
+
+    std::string detail =
+            "MemTracker log_usage Label: {}, Limit: {}, Total: {}, Peak: {}, Exceeded: {}";
+    detail = fmt::format(detail, _label, PrettyPrinter::print(_limit, TUnit::BYTES),
+                         PrettyPrinter::print(curr_consumption, TUnit::BYTES),
+                         PrettyPrinter::print(peak_consumption, TUnit::BYTES),
+                         limit_exceeded() ? "true" : "false");
+
+    // This call does not need the children, so return early.
+    if (max_recursive_depth == 0) return detail;
+
+    // Recurse and get information about the children
+    int64_t child_consumption;
+    std::string child_trackers_usage;
+    std::list<MemTrackerLimiter*> limiter_children;
+    std::list<MemTrackerObserve*> observe_children;
+    {
+        std::lock_guard<SpinLock> l(_child_trackers_lock);
+        limiter_children = _child_limiter_trackers;
+        observe_children = _child_observe_trackers;
+    }
+    child_trackers_usage = log_usage(max_recursive_depth - 1, limiter_children, &child_consumption);
+    for (const auto& child : observe_children) {
+        child_trackers_usage += "\n" + child->log_usage(&child_consumption);
+    }
+    if (!child_trackers_usage.empty()) detail += "\n" + child_trackers_usage;
+    return detail;
+}
+
+std::string MemTrackerLimiter::log_usage(int max_recursive_depth,
+                                         const std::list<MemTrackerLimiter*>& trackers,
+                                         int64_t* logged_consumption) {
+    *logged_consumption = 0;
+    std::vector<std::string> usage_strings;
+    for (const auto& tracker : trackers) {
+        if (tracker) {
+            int64_t tracker_consumption;
+            std::string usage_string =
+                    tracker->log_usage(max_recursive_depth, &tracker_consumption);
+            if (!usage_string.empty()) usage_strings.push_back(usage_string);
+            *logged_consumption += tracker_consumption;
+        }
+    }
+    return join(usage_strings, "\n");
+}
+
+Status MemTrackerLimiter::mem_limit_exceeded(RuntimeState* state, const std::string& details,
+                                             int64_t failed_allocation_size, Status failed_alloc) {
+    STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+    MemTrackerLimiter* process_tracker = MemTrackerLimiter::get_process_tracker();
+    std::string detail =
+            "Memory exceed limit. fragment={}, details={}, on backend={}. Memory left in process "
+            "limit={}.";
+    detail = fmt::format(detail, state != nullptr ? print_id(state->fragment_instance_id()) : "",
+                         details, BackendOptions::get_localhost(),
+                         PrettyPrinter::print(process_tracker->spare_capacity(), TUnit::BYTES));
+    if (!failed_alloc) {
+        detail += " failed alloc=<{}>. current tracker={}.";
+        detail = fmt::format(detail, failed_alloc.to_string(), _label);
+    } else {
+        detail += " current tracker <label={}, used={}, limit={}, failed alloc size={}>.";
+        detail = fmt::format(detail, _label, _consumption->current_value(), _limit,
+                             PrettyPrinter::print(failed_allocation_size, TUnit::BYTES));
+    }
+    detail += " If this is a query, can change the limit by session variable exec_mem_limit.";
+    Status status = Status::MemoryLimitExceeded(detail);
+    if (state != nullptr) state->log_error(detail);
+
+    // only print the tracker log_usage in be log.
+    if (process_tracker->spare_capacity() < failed_allocation_size) {
+        // Dumping the process MemTracker is expensive. Limiting the recursive depth to two
+        // levels limits the level of detail to a one-line summary for each query MemTracker.
+        detail += "\n" + process_tracker->log_usage(2);
+    }
+    detail += "\n" + log_usage();
+
+    LOG(WARNING) << detail;
+    return status;
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
new file mode 100644
index 0000000000..d5d937523f
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -0,0 +1,348 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "common/config.h"
+#include "runtime/memory/mem_tracker_base.h"
+#include "runtime/runtime_state.h"
+#include "util/mem_info.h"
+
+namespace doris {
+
+class MemTrackerObserve;
+
+// Tracker contains an limit, and can be arranged into a tree structure such that the consumption
+// tracked by a MemTracker is also tracked by its ancestors.
+// Used for:
+// 1. Track and limit the memory usage of process and query.
+//    Automatic memory consume based on system memory allocation (Currently, based on TCMlloc hook).
+// 2. Execution logic that requires memory size to participate in control.
+//    Manual consumption, but will not affect the overall statistics of the process.
+//
+// We use a five-level hierarchy of mem trackers: process, query pool, query, instance,
+// node. Specific parts of the fragment (exec nodes, sinks, etc) will add a
+// fifth level when they are initialized.
+//
+// GcFunctions can be attached to a MemTracker in order to free up memory if the limit is
+// reached. If limit_exceeded() is called and the limit is exceeded, it will first call
+// the GcFunctions to try to free memory and recheck the limit. For example, the process
+// tracker has a GcFunction that releases any unused memory still held by tcmalloc, so
+// this will be called before the process limit is reported as exceeded. GcFunctions are
+// called in the order they are added, so expensive functions should be added last.
+// GcFunctions are called with a global lock held, so should be non-blocking and not
+// call back into MemTrackers, except to release memory.
+class MemTrackerLimiter final : public MemTrackerBase {
+public:
+    // Creates and adds the tracker to the tree
+    static MemTrackerLimiter* create_tracker(int64_t byte_limit, const std::string& label,
+                                             MemTrackerLimiter* parent = nullptr,
+                                             RuntimeProfile* profile = nullptr);
+
+    // Walks the MemTrackerLimiter hierarchy and populates _ancestor_all_trackers and limit_trackers_
+    void init(int64_t limit);
+
+    ~MemTrackerLimiter();
+
+    // Adds tracker to _child_trackers
+    void add_child_tracker(MemTrackerLimiter* tracker);
+    void add_child_tracker(MemTrackerObserve* tracker);
+    // Remove tracker from _child_trackers
+    void remove_child_tracker(MemTrackerLimiter* tracker);
+    void remove_child_tracker(MemTrackerObserve* tracker);
+
+    // Leaf tracker, without any child
+    bool is_leaf() { _child_limiter_trackers.size() + _child_observe_trackers.size() == 0; }
+
+    // Gets a "process" tracker, creating it if necessary.
+    static MemTrackerLimiter* get_process_tracker();
+
+    // Returns a list of all the valid trackers.
+    static void list_process_trackers(std::vector<MemTrackerBase*>* trackers);
+
+public:
+    // The following func, for execution logic that requires memory size to participate in control.
+    // this does not change the value of process tracker.
+
+    // only consume self, will not sync to parent. Usually used to manually record the specified memory,
+    // It is independent of the automatically recording of thread local tracker, so the same block of memory
+    // will be recorded in the thread local tracker and the current tracker at the same time.
+    void consume_self(int64_t bytes);
+    void release_self(int64_t bytes) { consume_self(-bytes); }
+
+    // up to (but not including) end_tracker.
+    // This is useful if we want to move tracking between trackers that share a common (i.e. end_tracker)
+    // ancestor. This happens when we want to update tracking on a particular mem tracker but the consumption
+    // against the limit recorded in one of its ancestors already happened.
+    void consume_local(int64_t bytes, MemTrackerLimiter* end_tracker);
+    void release_local(int64_t bytes, MemTrackerLimiter* end_tracker) {
+        consume_local(-bytes, end_tracker);
+    }
+
+    // Transfer 'bytes' of consumption from this tracker to 'dst'.
+    // Forced transfer, 'dst' may limit exceed, and more ancestor trackers will be updated.
+    void transfer_to(MemTrackerLimiter* dst, int64_t bytes);
+
+    // When the accumulated untracked memory value exceeds the upper limit,
+    // the current value is returned and set to 0.
+    // Thread safety.
+    int64_t add_untracked_mem(int64_t bytes);
+
+    // In most cases, no need to call flush_untracked_mem on the child tracker,
+    // because when it is destructed, theoretically all its children have been destructed.
+    void flush_untracked_mem() { consume(_untracked_mem.exchange(0)); }
+
+    // Find the common ancestor and update trackers between 'this'/'dst' and
+    // the common ancestor. This logic handles all cases, including the
+    // two trackers being the same or being ancestors of each other because
+    // 'all_trackers_' includes the current tracker.
+    MemTrackerLimiter* common_ancestor(MemTrackerLimiter* dst);
+
+public:
+    // The following func, for mem limit.
+
+    Status check_sys_mem_info(int64_t bytes) {
+        // TODO add mmap
+        if (MemInfo::initialized() && MemInfo::current_mem() + bytes >= MemInfo::mem_limit()) {
+            return Status::MemoryLimitExceeded(fmt::format(
+                    "{}: TryConsume failed, bytes={} process whole consumption={}  mem limit={}",
+                    _label, bytes, MemInfo::current_mem(), MemInfo::mem_limit()));
+        }
+        return Status::OK();
+    }
+
+    bool has_limit() const { return _limit >= 0; }
+    int64_t limit() const { return _limit; }
+    void update_limit(int64_t limit) {
+        DCHECK(has_limit());
+        _limit = limit;
+    }
+    bool limit_exceeded() const { return _limit >= 0 && _limit < consumption(); }
+    bool any_limit_exceeded() const { return limit_exceeded_tracker() != nullptr; }
+
+    // Returns true if a valid limit of this tracker or one of its ancestors is exceeded.
+    MemTrackerLimiter* limit_exceeded_tracker() const;
+
+    Status check_limit(int64_t bytes);
+
+    // Returns the maximum consumption that can be made without exceeding the limit on
+    // this tracker or any of its parents. Returns int64_t::max() if there are no
+    // limits and a negative value if any limit is already exceeded.
+    int64_t spare_capacity() const;
+
+    // Returns the lowest limit for this tracker and its ancestors. Returns -1 if there is no limit.
+    int64_t get_lowest_limit() const;
+
+    typedef std::function<void(int64_t bytes_to_free)> GcFunction;
+    /// Add a function 'f' to be called if the limit is reached, if none of the other
+    /// previously-added GC functions were successful at freeing up enough memory.
+    /// 'f' does not need to be thread-safe as long as it is added to only one MemTrackerLimiter.
+    /// Note that 'f' must be valid for the lifetime of this MemTrackerLimiter.
+    void add_gc_function(GcFunction f) { _gc_functions.push_back(f); }
+
+    // If consumption is higher than max_consumption, attempts to free memory by calling
+    // any added GC functions.  Returns true if max_consumption is still exceeded. Takes gc_lock.
+    // Note: If the cache of segment/chunk is released due to insufficient query memory at a certain moment,
+    // the performance of subsequent queries may be degraded, so the use of gc function should be careful enough.
+    bool gc_memory(int64_t max_consumption);
+    Status try_gc_memory(int64_t bytes);
+
+    /// Logs the usage of this tracker and optionally its children (recursively).
+    /// If 'logged_consumption' is non-nullptr, sets the consumption value logged.
+    /// 'max_recursive_depth' specifies the maximum number of levels of children
+    /// to include in the dump. If it is zero, then no children are dumped.
+    /// Limiting the recursive depth reduces the cost of dumping, particularly
+    /// for the process MemTracker.
+    std::string log_usage(int max_recursive_depth = INT_MAX, int64_t* logged_consumption = nullptr);
+
+    // Log the memory usage when memory limit is exceeded and return a status object with
+    // details of the allocation which caused the limit to be exceeded.
+    // If 'failed_allocation_size' is greater than zero, logs the allocation size. If
+    // 'failed_allocation_size' is zero, nothing about the allocation size is logged.
+    // If 'state' is non-nullptr, logs the error to 'state'.
+    Status mem_limit_exceeded(RuntimeState* state, const std::string& details = std::string(),
+                              int64_t failed_allocation = -1, Status failed_alloc = Status::OK());
+
+    std::string debug_string() {
+        std::stringstream msg;
+        msg << "limit: " << _limit << "; "
+            << "consumption: " << _consumption->current_value() << "; "
+            << "label: " << _label << "; "
+            << "all tracker size: " << _ancestor_all_trackers.size() << "; "
+            << "limit trackers size: " << _ancestor_limiter_trackers.size() << "; "
+            << "parent is null: " << ((_parent == nullptr) ? "true" : "false") << "; ";
+        return msg.str();
+    }
+
+private:
+    // The following func, for automatic memory tracking and limiting based on system memory allocation.
+    friend class ThreadMemTrackerMgr;
+
+    MemTrackerLimiter(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile)
+            : MemTrackerBase(label, parent, profile) {}
+
+    // Creates the process tracker.
+    static void create_process_tracker();
+
+    // Increases consumption of this tracker and its ancestors by 'bytes'.
+    void consume(int64_t bytes);
+
+    // Decreases consumption of this tracker and its ancestors by 'bytes'.
+    void release(int64_t bytes) { consume(-bytes); }
+
+    // Increases consumption of this tracker and its ancestors by 'bytes' only if
+    // they can all consume 'bytes' without exceeding limit. If limit would be exceed,
+    // no MemTrackers are updated. Returns true if the consumption was successfully updated.
+    WARN_UNUSED_RESULT
+    Status try_consume(int64_t bytes);
+
+    /// Log consumption of all the trackers provided. Returns the sum of consumption in
+    /// 'logged_consumption'. 'max_recursive_depth' specifies the maximum number of levels
+    /// of children to include in the dump. If it is zero, then no children are dumped.
+    static std::string log_usage(int max_recursive_depth,
+                                 const std::list<MemTrackerLimiter*>& trackers,
+                                 int64_t* logged_consumption);
+
+private:
+    // Limit on memory consumption, in bytes. If limit_ == -1, there is no consumption limit. Used in log_usage。
+    int64_t _limit;
+
+    // Consume size smaller than mem_tracker_consume_min_size_bytes will continue to accumulate
+    // to avoid frequent calls to consume/release of MemTracker.
+    std::atomic<int64_t> _untracked_mem = 0;
+
+    // All the child trackers of this tracker. Used for error reporting and
+    // listing only (i.e. updating the consumption of a parent tracker does not
+    // update that of its children).
+    SpinLock _child_trackers_lock;
+    std::list<MemTrackerLimiter*> _child_limiter_trackers;
+    std::list<MemTrackerObserve*> _child_observe_trackers;
+    // Iterator into parent_->_child_limiter_trackers for this object. Stored to have O(1) remove.
+    std::list<MemTrackerLimiter*>::iterator _child_tracker_it;
+
+    // this tracker plus all of its ancestors
+    std::vector<MemTrackerLimiter*> _ancestor_all_trackers;
+    // _ancestor_all_trackers with valid limits
+    std::vector<MemTrackerLimiter*> _ancestor_limiter_trackers;
+
+    // Lock to protect gc_memory(). This prevents many GCs from occurring at once.
+    std::mutex _gc_lock;
+    // Functions to call after the limit is reached to free memory.
+    std::vector<GcFunction> _gc_functions;
+};
+
+inline void MemTrackerLimiter::consume(int64_t bytes) {
+    if (bytes == 0) {
+        return;
+    } else {
+        for (auto& tracker : _ancestor_all_trackers) {
+            tracker->_consumption->add(bytes);
+        }
+    }
+}
+
+inline Status MemTrackerLimiter::try_consume(int64_t bytes) {
+    if (bytes <= 0) {
+        release(-bytes);
+        return Status::OK();
+    }
+    RETURN_IF_ERROR(check_sys_mem_info(bytes));
+    int i;
+    // Walk the tracker tree top-down.
+    for (i = _ancestor_all_trackers.size() - 1; i >= 0; --i) {
+        MemTrackerLimiter* tracker = _ancestor_all_trackers[i];
+        if (tracker->limit() < 0) {
+            tracker->_consumption->add(bytes); // No limit at this tracker.
+        } else {
+            // If TryConsume fails, we can try to GC, but we may need to try several times if
+            // there are concurrent consumers because we don't take a lock before trying to
+            // update _consumption.
+            while (true) {
+                if (LIKELY(tracker->_consumption->try_add(bytes, tracker->limit()))) break;
+                Status st = tracker->try_gc_memory(bytes);
+                if (!st) {
+                    // Failed for this mem tracker. Roll back the ones that succeeded.
+                    for (int j = _ancestor_all_trackers.size() - 1; j > i; --j) {
+                        _ancestor_all_trackers[j]->_consumption->add(-bytes);
+                    }
+                    return st;
+                }
+            }
+        }
+    }
+    // Everyone succeeded, return.
+    DCHECK_EQ(i, -1);
+    return Status::OK();
+}
+
+inline void MemTrackerLimiter::consume_self(int64_t bytes) {
+    int64_t consume_bytes = add_untracked_mem(bytes);
+    if (consume_bytes != 0) {
+        _consumption->add(consume_bytes);
+    }
+}
+
+inline void MemTrackerLimiter::consume_local(int64_t bytes, MemTrackerLimiter* end_tracker) {
+    DCHECK(end_tracker);
+    if (bytes == 0) return;
+    for (auto& tracker : _ancestor_all_trackers) {
+        if (tracker == end_tracker) return;
+        tracker->consume_self(bytes);
+    }
+}
+
+inline void MemTrackerLimiter::transfer_to(MemTrackerLimiter* dst, int64_t bytes) {
+    DCHECK(dst->is_limited());
+    if (id() == dst->id()) return;
+    release_local(bytes, MemTrackerLimiter::get_process_tracker());
+    dst->consume_local(bytes, MemTrackerLimiter::get_process_tracker());
+}
+
+inline int64_t MemTrackerLimiter::add_untracked_mem(int64_t bytes) {
+    _untracked_mem += bytes;
+    if (std::abs(_untracked_mem) >= config::mem_tracker_consume_min_size_bytes) {
+        return _untracked_mem.exchange(0);
+    }
+    return 0;
+}
+
+inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
+    if (bytes <= 0) return Status::OK();
+    RETURN_IF_ERROR(check_sys_mem_info(bytes));
+    int i;
+    // Walk the tracker tree top-down.
+    for (i = _ancestor_all_trackers.size() - 1; i >= 0; --i) {
+        MemTrackerLimiter* tracker = _ancestor_all_trackers[i];
+        if (tracker->limit() > 0) {
+            while (true) {
+                if (LIKELY(tracker->_consumption->current_value() + bytes < tracker->limit()))
+                    break;
+                RETURN_IF_ERROR(tracker->try_gc_memory(bytes));
+            }
+        }
+    }
+    return Status::OK();
+}
+
+#define RETURN_LIMIT_EXCEEDED(tracker, ...) return tracker->mem_limit_exceeded(__VA_ARGS__);
+#define RETURN_IF_LIMIT_EXCEEDED(tracker, state, msg) \
+    if (tracker->any_limit_exceeded()) RETURN_LIMIT_EXCEEDED(tracker, state, msg);
+#define RETURN_IF_INSTANCE_LIMIT_EXCEEDED(state, msg)        \
+    if (state->instance_mem_tracker()->any_limit_exceeded()) \
+        RETURN_LIMIT_EXCEEDED(state->instance_mem_tracker(), state, msg);
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_observe.cpp b/be/src/runtime/memory/mem_tracker_observe.cpp
new file mode 100644
index 0000000000..f696df2f94
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_observe.cpp
@@ -0,0 +1,87 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/mem_tracker_observe.h"
+
+#include <fmt/format.h>
+#include <parallel_hashmap/phmap.h>
+
+#include "runtime/memory/mem_tracker_limiter.h"
+#include "runtime/thread_context.h"
+#include "util/pretty_printer.h"
+
+namespace doris {
+
+using TemporaryTrackersMap = phmap::parallel_flat_hash_map<
+        std::string, MemTrackerObserve*, phmap::priv::hash_default_hash<std::string>,
+        phmap::priv::hash_default_eq<std::string>,
+        std::allocator<std::pair<const std::string, MemTrackerObserve*>>, 12, std::mutex>;
+
+static TemporaryTrackersMap _temporary_mem_trackers;
+
+MemTrackerObserve* MemTrackerObserve::create_tracker(const std::string& label,
+                                                     RuntimeProfile* profile) {
+    STOP_CHECK_THREAD_MEM_TRACKER_LIMIT();
+    MemTrackerLimiter* parent = tls_ctx()->_thread_mem_tracker_mgr->limiter_mem_tracker();
+    DCHECK(parent);
+    std::string parent_label = parent->label();
+    std::string reset_label;
+    if (parent_label.find_first_of("#") != parent_label.npos) {
+        reset_label = fmt::format("[Observe]-{}#{}", label,
+                                  parent_label.substr(parent_label.find_first_of("#"), -1));
+    } else {
+        reset_label = fmt::format("[Observe]-{}", label);
+    }
+    MemTrackerObserve* tracker(new MemTrackerObserve(reset_label, parent, profile));
+    parent->add_child_tracker(tracker);
+    tracker->set_is_observed();
+    return tracker;
+}
+
+MemTrackerObserve::~MemTrackerObserve() {
+    if (parent()) {
+        parent()->remove_child_tracker(this);
+    }
+}
+
+// Count the memory in the scope to a temporary tracker with the specified label name.
+// This is very useful when debugging. You can find the position where the tracker statistics are
+// inaccurate through the temporary tracker layer by layer. As well as finding memory hotspots.
+// TODO(zxy) track specifies the memory for each line in the code segment, instead of manually adding
+// a switch temporary tracker to each line. Maybe there are open source tools to do this?
+MemTrackerObserve* MemTrackerObserve::get_temporary_mem_tracker(const std::string& label) {
+    // First time this label registered, make a new object, otherwise do nothing.
+    // Avoid using locks to resolve erase conflicts.
+    _temporary_mem_trackers.try_emplace_l(
+            label, [](MemTrackerObserve*) {},
+            MemTrackerObserve::create_tracker(fmt::format("[Temporary]-{}", label)));
+    return _temporary_mem_trackers[label];
+}
+
+std::string MemTrackerObserve::log_usage(int64_t* logged_consumption) {
+    // Make sure the consumption is up to date.
+    int64_t curr_consumption = consumption();
+    int64_t peak_consumption = _consumption->value();
+    if (logged_consumption != nullptr) *logged_consumption = curr_consumption;
+    if (curr_consumption == 0) return "";
+    std::string detail = "MemTracker log_usage Label: {}, Total: {}, Peak: {}";
+    detail = fmt::format(detail, _label, PrettyPrinter::print(curr_consumption, TUnit::BYTES),
+                         PrettyPrinter::print(peak_consumption, TUnit::BYTES));
+    return detail;
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_observe.h b/be/src/runtime/memory/mem_tracker_observe.h
new file mode 100644
index 0000000000..3213319207
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_observe.h
@@ -0,0 +1,91 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include "runtime/memory/mem_tracker_base.h"
+
+namespace doris {
+
+class MemTrackerLimiter;
+
+// Used to manually track memory usage at specified locations, including all exec node trackers.
+//
+// There is no parent-child relationship between MemTrackerObserves. Both fathers are fragment instance trakcers,
+// but their consumption will not consume fragment instance trakcers synchronously. Therefore, errors in statistics
+// will not affect the memory tracking and restrictions of processes and Query.
+class MemTrackerObserve final : public MemTrackerBase {
+public:
+    // Creates and adds the tracker to the tree
+    static MemTrackerObserve* create_tracker(const std::string& label,
+                                             RuntimeProfile* profile = nullptr);
+
+    ~MemTrackerObserve();
+
+    // Get a temporary tracker with a specified label, and the tracker will be created when the label is first get.
+    // Temporary trackers are not automatically destructed, which is usually used for debugging.
+    static MemTrackerObserve* get_temporary_mem_tracker(const std::string& label);
+
+public:
+    void consume(int64_t bytes);
+
+    void release(int64_t bytes) { consume(-bytes); }
+
+    static void batch_consume(int64_t bytes, const std::vector<MemTrackerObserve*>& trackers) {
+        for (auto& tracker : trackers) {
+            tracker->consume(bytes);
+        }
+    }
+
+    // Transfer 'bytes' of consumption from this tracker to 'dst'.
+    void transfer_to(MemTrackerObserve* dst, int64_t bytes);
+
+    bool limit_exceeded(int64_t limit) const { return limit >= 0 && limit < consumption(); }
+
+    std::string log_usage(int64_t* logged_consumption = nullptr);
+
+    std::string debug_string() {
+        std::stringstream msg;
+        msg << "label: " << _label << "; "
+            << "consumption: " << _consumption->current_value() << "; "
+            << "parent is null: " << ((_parent == nullptr) ? "true" : "false") << "; ";
+        return msg.str();
+    }
+
+    // Iterator into parent_->_child_observe_trackers for this object. Stored to have O(1) remove.
+    std::list<MemTrackerObserve*>::iterator _child_tracker_it;
+
+private:
+    MemTrackerObserve(const std::string& label, MemTrackerLimiter* parent, RuntimeProfile* profile)
+            : MemTrackerBase(label, parent, profile) {}
+};
+
+inline void MemTrackerObserve::consume(int64_t bytes) {
+    if (bytes == 0) {
+        return;
+    } else {
+        _consumption->add(bytes);
+    }
+}
+
+inline void MemTrackerObserve::transfer_to(MemTrackerObserve* dst, int64_t bytes) {
+    if (id() == dst->id()) return;
+    release(bytes);
+    dst->consume(bytes);
+}
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
new file mode 100644
index 0000000000..d643acdc4b
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -0,0 +1,153 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "runtime/memory/mem_tracker_task_pool.h"
+
+#include "common/config.h"
+#include "runtime/exec_env.h"
+#include "util/pretty_printer.h"
+
+namespace doris {
+
+MemTrackerLimiter* MemTrackerTaskPool::register_task_mem_tracker_impl(const std::string& task_id,
+                                                                      int64_t mem_limit,
+                                                                      const std::string& label,
+                                                                      MemTrackerLimiter* parent) {
+    DCHECK(!task_id.empty());
+    // First time this task_id registered, make a new object, otherwise do nothing.
+    // Combine create_tracker and emplace into one operation to avoid the use of locks
+    // Name for task MemTrackers. '$0' is replaced with the task id.
+    _task_mem_trackers.try_emplace_l(
+            task_id, [](MemTrackerLimiter*) {},
+            MemTrackerLimiter::create_tracker(mem_limit, label, parent));
+    return get_task_mem_tracker(task_id);
+}
+
+MemTrackerLimiter* MemTrackerTaskPool::register_query_mem_tracker(const std::string& query_id,
+                                                                  int64_t mem_limit) {
+    VLOG_FILE << "Register Query memory tracker, query id: " << query_id
+              << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
+    return register_task_mem_tracker_impl(query_id, mem_limit,
+                                          fmt::format("Query#queryId={}", query_id),
+                                          ExecEnv::GetInstance()->query_pool_mem_tracker());
+}
+
+MemTrackerLimiter* MemTrackerTaskPool::register_load_mem_tracker(const std::string& load_id,
+                                                                 int64_t mem_limit) {
+    // In load, the query id of the fragment is executed, which is the same as the load id of the load channel.
+    VLOG_FILE << "Register Load memory tracker, load id: " << load_id
+              << " limit: " << PrettyPrinter::print(mem_limit, TUnit::BYTES);
+    return register_task_mem_tracker_impl(load_id, mem_limit,
+                                          fmt::format("Load#loadId={}", load_id),
+                                          ExecEnv::GetInstance()->load_pool_mem_tracker());
+}
+
+MemTrackerLimiter* MemTrackerTaskPool::get_task_mem_tracker(const std::string& task_id) {
+    DCHECK(!task_id.empty());
+    MemTrackerLimiter* tracker = nullptr;
+    // Avoid using locks to resolve erase conflicts
+    _task_mem_trackers.if_contains(task_id, [&tracker](MemTrackerLimiter* v) { tracker = v; });
+    return tracker;
+}
+
+void MemTrackerTaskPool::logout_task_mem_tracker() {
+    std::vector<std::string> expired_tasks;
+    for (auto it = _task_mem_trackers.begin(); it != _task_mem_trackers.end(); it++) {
+        if (!it->second) {
+            // https://github.com/apache/incubator-doris/issues/10006
+            expired_tasks.emplace_back(it->first);
+        } else if (it->second->is_leaf() == true && it->second->peak_consumption() > 0) {
+            // No RuntimeState uses this task MemTracker, it is only referenced by this map,
+            // and tracker was not created soon, delete it.
+            if (config::memory_leak_detection && it->second->consumption() != 0) {
+                // If consumption is not equal to 0 before query mem tracker is destructed,
+                // there are two possibilities in theory.
+                // 1. A memory leak occurs.
+                // 2. Some of the memory consumed/released on the query mem tracker is actually released/consume on
+                // other trackers such as the process mem tracker, and there is no manual transfer between the two trackers.
+                //
+                // The second case should be eliminated in theory, but it has not been done so far, so the query memory leak
+                // cannot be located, and the value of the query pool mem tracker statistics will be inaccurate.
+                LOG(WARNING) << "Task memory tracker memory leak:" << it->second->debug_string();
+            }
+            // In order to ensure that the query pool mem tracker is the sum of all currently running query mem trackers,
+            // the effect of the ended query mem tracker on the query pool mem tracker should be cleared, that is,
+            // the negative number of the current value of consume.
+            it->second->parent()->consume_local(-it->second->consumption(),
+                                                MemTrackerLimiter::get_process_tracker());
+            expired_tasks.emplace_back(it->first);
+        } else {
+            // Log limit exceeded query tracker.
+            if (it->second->limit_exceeded()) {
+                it->second->mem_limit_exceeded(
+                        nullptr,
+                        fmt::format("Task mem limit exceeded but no cancel, queryId:{}", it->first),
+                        0, Status::OK());
+            }
+        }
+    }
+    for (auto tid : expired_tasks) {
+        if (!_task_mem_trackers[tid]) {
+            _task_mem_trackers.erase(tid);
+            VLOG_FILE << "Deregister null task mem tracker, task id: " << tid;
+        } else {
+            delete _task_mem_trackers[tid];
+            _task_mem_trackers.erase(tid);
+            VLOG_FILE << "Deregister not used task mem tracker, task id: " << tid;
+        }
+    }
+}
+
+// TODO(zxy) More observable methods
+// /// Logs the usage of 'limit' number of queries based on maximum total memory
+// /// consumption.
+// std::string MemTracker::LogTopNQueries(int limit) {
+//     if (limit == 0) return "";
+//     priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
+//                    std::greater<pair<int64_t, string>>>
+//             min_pq;
+//     GetTopNQueries(min_pq, limit);
+//     std::vector<string> usage_strings(min_pq.size());
+//     while (!min_pq.empty()) {
+//         usage_strings.push_back(min_pq.top().second);
+//         min_pq.pop();
+//     }
+//     std::reverse(usage_strings.begin(), usage_strings.end());
+//     return join(usage_strings, "\n");
+// }
+
+// /// Helper function for LogTopNQueries that iterates through the MemTracker hierarchy
+// /// and populates 'min_pq' with 'limit' number of elements (that contain state related
+// /// to query MemTrackers) based on maximum total memory consumption.
+// void MemTracker::GetTopNQueries(
+//         priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
+//                        greater<pair<int64_t, string>>>& min_pq,
+//         int limit) {
+//     list<weak_ptr<MemTracker>> children;
+//     {
+//         lock_guard<SpinLock> l(child_trackers_lock_);
+//         children = child_trackers_;
+//     }
+//     for (const auto& child_weak : children) {
+//         shared_ptr<MemTracker> child = child_weak.lock();
+//         if (child) {
+//             child->GetTopNQueries(min_pq, limit);
+//         }
+//     }
+// }
+
+} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h
new file mode 100644
index 0000000000..ae7d82caf4
--- /dev/null
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -0,0 +1,58 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include <parallel_hashmap/phmap.h>
+
+#include "runtime/memory/mem_tracker_limiter.h"
+
+namespace doris {
+
+using TaskTrackersMap = phmap::parallel_flat_hash_map<
+        std::string, MemTrackerLimiter*, phmap::priv::hash_default_hash<std::string>,
+        phmap::priv::hash_default_eq<std::string>,
+        std::allocator<std::pair<const std::string, MemTrackerLimiter*>>, 12, std::mutex>;
+
+// Global task pool for query MemTrackers. Owned by ExecEnv.
+class MemTrackerTaskPool {
+public:
+    // Construct a MemTracker object for 'task_id' with 'mem_limit' as the memory limit.
+    // The MemTracker is a child of the pool MemTracker, Calling this with the same
+    // 'task_id' will return the same MemTracker object. This is used to track the local
+    // memory usage of all tasks executing. The first time this is called for a task,
+    // a new MemTracker object is created with the pool tracker as its parent.
+    // Newly created trackers will always have a limit of -1.
+    MemTrackerLimiter* register_task_mem_tracker_impl(const std::string& task_id, int64_t mem_limit,
+                                                      const std::string& label,
+                                                      MemTrackerLimiter* parent);
+    MemTrackerLimiter* register_query_mem_tracker(const std::string& query_id, int64_t mem_limit);
+    MemTrackerLimiter* register_load_mem_tracker(const std::string& load_id, int64_t mem_limit);
+
+    MemTrackerLimiter* get_task_mem_tracker(const std::string& task_id);
+
+    // Remove the mem tracker that has ended the query.
+    void logout_task_mem_tracker();
+
+private:
+    // All per-task MemTracker objects.
+    // The life cycle of task memtracker in the process is the same as task runtime state,
+    // MemTrackers will be removed from this map after query finish or cancel.
+    TaskTrackersMap _task_mem_trackers;
+};
+
+} // namespace doris
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org