You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/26 05:24:08 UTC

[doris] 02/08: [enhancement](memory) Add Memory GC when the available memory of the BE process is lacking (#14712)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 5cacef1547d331ff6936f37634d107ace11c186a
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed Dec 7 15:28:52 2022 +0800

    [enhancement](memory) Add Memory GC when the available memory of the BE process is lacking (#14712)
    
    When the system MemAvailable is less than the warning water mark, or the memory used by the BE process exceeds the mem soft limit, run minor gc and try to release cache.
    
    When the MemAvailable of the system is less than the low water mark, or the memory used by the BE process exceeds the mem limit, run fucc gc, try to release the cache, and start canceling from the query with the largest memory usage until the memory of mem_limit * 20% is released.
---
 be/src/common/config.h                           |   7 ++
 be/src/common/daemon.cpp                         |   2 +
 be/src/olap/lru_cache.cpp                        |   4 +
 be/src/olap/lru_cache.h                          |   3 +
 be/src/olap/page_cache.cpp                       |   5 ++
 be/src/olap/page_cache.h                         |   6 ++
 be/src/runtime/fragment_mgr.cpp                  |  16 ++++
 be/src/runtime/fragment_mgr.h                    |   3 +
 be/src/runtime/memory/chunk_allocator.cpp        |  20 +++++
 be/src/runtime/memory/chunk_allocator.h          |   4 +
 be/src/runtime/memory/mem_tracker_limiter.cpp    | 110 +++++++++++++++--------
 be/src/runtime/memory/mem_tracker_limiter.h      |  15 +++-
 be/src/runtime/memory/thread_mem_tracker_mgr.cpp |  17 +++-
 be/src/runtime/memory/thread_mem_tracker_mgr.h   |  10 ++-
 be/src/runtime/plan_fragment_executor.cpp        |   2 +
 be/src/runtime/query_fragments_ctx.h             |   2 +
 be/src/runtime/thread_context.h                  |   2 +
 be/src/util/mem_info.cpp                         |  45 ++++++++++
 be/src/util/mem_info.h                           |  18 +++-
 19 files changed, 244 insertions(+), 47 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 707222447b..86b278b76b 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -68,6 +68,13 @@ CONF_Double(soft_mem_limit_frac, "0.9");
 // Turn down max. will use as much memory as possible.
 CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918");
 
+// The size of the memory that gc wants to release each time, as a percentage of the mem limit.
+CONF_mString(process_minor_gc_size, "10%");
+CONF_mString(process_full_gc_size, "20%");
+
+// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
+CONF_mInt32(thread_wait_gc_max_milliseconds, "1000");
+
 // the port heartbeat service used
 CONF_Int32(heartbeat_service_port, "9050");
 // the count of heart beat service
diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index fb306c21a0..c0d3fee21f 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -228,11 +228,13 @@ void Daemon::memory_maintenance_thread() {
                     doris::MemInfo::sys_mem_available_low_water_mark() ||
             doris::MemInfo::proc_mem_no_allocator_cache() >= doris::MemInfo::mem_limit()) {
             interval_milliseconds = 100;
+            doris::MemInfo::process_full_gc();
         } else if (doris::MemInfo::sys_mem_available() <
                            doris::MemInfo::sys_mem_available_warning_water_mark() ||
                    doris::MemInfo::proc_mem_no_allocator_cache() >=
                            doris::MemInfo::soft_mem_limit()) {
             interval_milliseconds = 200;
+            doris::MemInfo::process_minor_gc();
         } else {
             interval_milliseconds = config::memory_maintenance_sleep_time_ms;
         }
diff --git a/be/src/olap/lru_cache.cpp b/be/src/olap/lru_cache.cpp
index a6505d9bbe..484660e1bb 100644
--- a/be/src/olap/lru_cache.cpp
+++ b/be/src/olap/lru_cache.cpp
@@ -523,6 +523,10 @@ int64_t ShardedLRUCache::prune_if(CacheValuePredicate pred) {
     return num_prune;
 }
 
+int64_t ShardedLRUCache::mem_consumption() {
+    return _mem_tracker->consumption();
+}
+
 void ShardedLRUCache::update_cache_metrics() const {
     size_t total_capacity = 0;
     size_t total_usage = 0;
diff --git a/be/src/olap/lru_cache.h b/be/src/olap/lru_cache.h
index 7ae79ef3ba..480a61c712 100644
--- a/be/src/olap/lru_cache.h
+++ b/be/src/olap/lru_cache.h
@@ -217,6 +217,8 @@ public:
     // may hold lock for a long time to execute predicate.
     virtual int64_t prune_if(CacheValuePredicate pred) { return 0; }
 
+    virtual int64_t mem_consumption() = 0;
+
 private:
     DISALLOW_COPY_AND_ASSIGN(Cache);
 };
@@ -370,6 +372,7 @@ public:
     virtual uint64_t new_id() override;
     virtual int64_t prune() override;
     virtual int64_t prune_if(CacheValuePredicate pred) override;
+    int64_t mem_consumption() override;
 
 private:
     void update_cache_metrics() const;
diff --git a/be/src/olap/page_cache.cpp b/be/src/olap/page_cache.cpp
index fa9f9010be..2813f85dd3 100644
--- a/be/src/olap/page_cache.cpp
+++ b/be/src/olap/page_cache.cpp
@@ -76,4 +76,9 @@ void StoragePageCache::insert(const CacheKey& key, const Slice& data, PageCacheH
     *handle = PageCacheHandle(cache, lru_handle);
 }
 
+void StoragePageCache::prune(segment_v2::PageTypePB page_type) {
+    auto cache = _get_page_cache(page_type);
+    cache->prune();
+}
+
 } // namespace doris
diff --git a/be/src/olap/page_cache.h b/be/src/olap/page_cache.h
index c1f0b48da3..6313e931e0 100644
--- a/be/src/olap/page_cache.h
+++ b/be/src/olap/page_cache.h
@@ -91,6 +91,12 @@ public:
         return _get_page_cache(page_type) != nullptr;
     }
 
+    void prune(segment_v2::PageTypePB page_type);
+
+    int64_t get_page_cache_mem_consumption(segment_v2::PageTypePB page_type) {
+        return _get_page_cache(page_type)->mem_consumption();
+    }
+
 private:
     StoragePageCache();
     static StoragePageCache* _s_instance;
diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp
index 9bf0a361cd..5e0d4f4645 100644
--- a/be/src/runtime/fragment_mgr.cpp
+++ b/be/src/runtime/fragment_mgr.cpp
@@ -680,6 +680,7 @@ Status FragmentMgr::exec_plan_fragment(const TExecPlanFragmentParams& params, Fi
             }
         }
     }
+    fragments_ctx->fragment_ids.push_back(fragment_instance_id);
 
     exec_state.reset(new FragmentExecState(fragments_ctx->query_id,
                                            params.params.fragment_instance_id, params.backend_num,
@@ -792,6 +793,21 @@ Status FragmentMgr::cancel(const TUniqueId& fragment_id, const PPlanFragmentCanc
     return Status::OK();
 }
 
+void FragmentMgr::cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason,
+                               const std::string& msg) {
+    std::vector<TUniqueId> cancel_fragment_ids;
+    {
+        std::lock_guard<std::mutex> lock(_lock);
+        auto ctx = _fragments_ctx_map.find(query_id);
+        if (ctx != _fragments_ctx_map.end()) {
+            cancel_fragment_ids = ctx->second->fragment_ids;
+        }
+    }
+    for (auto it : cancel_fragment_ids) {
+        cancel(it, reason, msg);
+    }
+}
+
 void FragmentMgr::cancel_worker() {
     LOG(INFO) << "FragmentMgr cancel worker start working.";
     do {
diff --git a/be/src/runtime/fragment_mgr.h b/be/src/runtime/fragment_mgr.h
index 2246a42ac8..c5411eeed9 100644
--- a/be/src/runtime/fragment_mgr.h
+++ b/be/src/runtime/fragment_mgr.h
@@ -78,6 +78,9 @@ public:
     Status cancel(const TUniqueId& fragment_id, const PPlanFragmentCancelReason& reason,
                   const std::string& msg = "");
 
+    void cancel_query(const TUniqueId& query_id, const PPlanFragmentCancelReason& reason,
+                      const std::string& msg = "");
+
     void cancel_worker();
 
     virtual void debug(std::stringstream& ss);
diff --git a/be/src/runtime/memory/chunk_allocator.cpp b/be/src/runtime/memory/chunk_allocator.cpp
index 6ac8021648..a99b8b0873 100644
--- a/be/src/runtime/memory/chunk_allocator.cpp
+++ b/be/src/runtime/memory/chunk_allocator.cpp
@@ -120,6 +120,19 @@ public:
         _chunk_lists[idx].push_back(ptr);
     }
 
+    void clear() {
+        std::lock_guard<SpinLock> l(_lock);
+        for (int i = 0; i < 64; ++i) {
+            if (_chunk_lists[i].empty()) {
+                continue;
+            }
+            for (auto ptr : _chunk_lists[i]) {
+                ::free(ptr);
+            }
+            std::vector<uint8_t*>().swap(_chunk_lists[i]);
+        }
+    }
+
 private:
     SpinLock _lock;
     std::vector<std::vector<uint8_t*>> _chunk_lists;
@@ -256,4 +269,11 @@ void ChunkAllocator::free(uint8_t* data, size_t size) {
     free(chunk);
 }
 
+void ChunkAllocator::clear() {
+    for (int i = 0; i < _arenas.size(); ++i) {
+        _arenas[i]->clear();
+    }
+    THREAD_MEM_TRACKER_TRANSFER_FROM(_mem_tracker->consumption(), _mem_tracker.get());
+}
+
 } // namespace doris
diff --git a/be/src/runtime/memory/chunk_allocator.h b/be/src/runtime/memory/chunk_allocator.h
index de9ff70487..8602815a57 100644
--- a/be/src/runtime/memory/chunk_allocator.h
+++ b/be/src/runtime/memory/chunk_allocator.h
@@ -72,6 +72,10 @@ public:
     // otherwise the capacity of chunk allocator will be wrong.
     void free(uint8_t* data, size_t size);
 
+    void clear();
+
+    int64_t mem_consumption() { return _reserved_bytes; }
+
 private:
     ChunkAllocator(size_t reserve_limit);
 
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 767595d4cf..a228dcf5a7 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -20,8 +20,10 @@
 #include <fmt/format.h>
 
 #include <boost/stacktrace.hpp>
+#include <queue>
 
 #include "gutil/once.h"
+#include "runtime/fragment_mgr.h"
 #include "runtime/runtime_state.h"
 #include "runtime/thread_context.h"
 #include "util/pretty_printer.h"
@@ -238,42 +240,78 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
     return Status::MemoryLimitExceeded(failed_msg);
 }
 
-// TODO(zxy) More observable methods
-// /// Logs the usage of 'limit' number of queries based on maximum total memory
-// /// consumption.
-// std::string MemTracker::LogTopNQueries(int limit) {
-//     if (limit == 0) return "";
-//     priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
-//                    std::greater<pair<int64_t, string>>>
-//             min_pq;
-//     GetTopNQueries(min_pq, limit);
-//     std::vector<string> usage_strings(min_pq.size());
-//     while (!min_pq.empty()) {
-//         usage_strings.push_back(min_pq.top().second);
-//         min_pq.pop();
-//     }
-//     std::reverse(usage_strings.begin(), usage_strings.end());
-//     return join(usage_strings, "\n");
-// }
+int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
+    std::priority_queue<std::pair<int64_t, std::string>,
+                        std::vector<std::pair<int64_t, std::string>>,
+                        std::greater<std::pair<int64_t, std::string>>>
+            min_pq;
+    // After greater than min_free_mem, will not be modified.
+    int64_t prepare_free_mem = 0;
 
-// /// Helper function for LogTopNQueries that iterates through the MemTracker hierarchy
-// /// and populates 'min_pq' with 'limit' number of elements (that contain state related
-// /// to query MemTrackers) based on maximum total memory consumption.
-// void MemTracker::GetTopNQueries(
-//         priority_queue<pair<int64_t, string>, std::vector<pair<int64_t, string>>,
-//                        greater<pair<int64_t, string>>>& min_pq,
-//         int limit) {
-//     list<weak_ptr<MemTracker>> children;
-//     {
-//         lock_guard<SpinLock> l(child_trackers_lock_);
-//         children = child_trackers_;
-//     }
-//     for (const auto& child_weak : children) {
-//         shared_ptr<MemTracker> child = child_weak.lock();
-//         if (child) {
-//             child->GetTopNQueries(min_pq, limit);
-//         }
-//     }
-// }
+    auto label_to_queryid = [&](const std::string& label) -> TUniqueId {
+        auto queryid = split(label, "#Id=")[1];
+        TUniqueId querytid;
+        parse_id(queryid, &querytid);
+        return querytid;
+    };
+
+    auto cancel_top_query = [&](auto min_pq, auto label_to_queryid) -> int64_t {
+        std::vector<std::string> usage_strings;
+        bool had_cancel = false;
+        int64_t freed_mem = 0;
+        while (!min_pq.empty()) {
+            TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
+            ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+                    cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
+                    fmt::format("Process has no memory available, cancel top memory usage query: "
+                                "query memory tracker <{}> consumption {}, backend {} "
+                                "process memory used {} exceed limit {} or sys mem available {} "
+                                "less than low water mark {}. Execute again after enough memory, "
+                                "details see be.INFO.",
+                                min_pq.top().second, print_bytes(min_pq.top().first),
+                                BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
+                                MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(),
+                                print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+
+            freed_mem += min_pq.top().first;
+            usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
+                                                min_pq.top().first));
+            had_cancel = true;
+            min_pq.pop();
+        }
+        if (had_cancel) {
+            LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ",");
+        }
+        return freed_mem;
+    };
+
+    for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
+        std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
+        for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+            if (tracker->type() == Type::QUERY) {
+                if (tracker->consumption() > min_free_mem) {
+                    std::priority_queue<std::pair<int64_t, std::string>,
+                                        std::vector<std::pair<int64_t, std::string>>,
+                                        std::greater<std::pair<int64_t, std::string>>>
+                            min_pq_null;
+                    std::swap(min_pq, min_pq_null);
+                    min_pq.push(
+                            pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
+                    return cancel_top_query(min_pq, label_to_queryid);
+                } else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
+                    min_pq.push(
+                            pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
+                    prepare_free_mem += tracker->consumption();
+                } else if (tracker->consumption() > min_pq.top().first) {
+                    // No need to modify prepare_free_mem, prepare_free_mem will always be greater than min_free_mem.
+                    min_pq.push(
+                            pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
+                    min_pq.pop();
+                }
+            }
+        }
+    }
+    return cancel_top_query(min_pq, label_to_queryid);
+}
 
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index dfba608144..6415510315 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -144,14 +144,18 @@ public:
     Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
                                        int64_t failed_allocation_size = 0);
 
+    // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is released.
+    static int64_t free_top_query(int64_t min_free_mem);
+
     static std::string process_mem_log_str() {
         return fmt::format(
                 "physical memory {}, process memory used {} limit {}, sys mem available {} low "
-                "water mark {}",
+                "water mark {}, refresh interval memory growth {} B",
                 PrettyPrinter::print(MemInfo::physical_mem(), TUnit::BYTES),
                 PerfCounters::get_vm_rss_str(), MemInfo::mem_limit_str(),
                 MemInfo::sys_mem_available_str(),
-                PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES));
+                PrettyPrinter::print(MemInfo::sys_mem_available_low_water_mark(), TUnit::BYTES),
+                MemInfo::refresh_interval_memory_growth);
     }
 
     std::string debug_string() {
@@ -169,7 +173,7 @@ private:
     // Increases consumption of this tracker by 'bytes' only if will not exceeding limit.
     // Returns true if the consumption was successfully updated.
     WARN_UNUSED_RESULT
-    bool try_consume(int64_t bytes, std::string& failed_msg);
+    bool try_consume(int64_t bytes, std::string& failed_msg, bool& is_process_exceed);
 
     // When the accumulated untracked memory value exceeds the upper limit,
     // the current value is returned and set to 0.
@@ -232,7 +236,8 @@ inline void MemTrackerLimiter::cache_consume(int64_t bytes) {
     consume(consume_bytes);
 }
 
-inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg) {
+inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_msg,
+                                           bool& is_process_exceed) {
     if (bytes <= 0) {
         release(-bytes);
         failed_msg = std::string();
@@ -245,6 +250,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
 
     if (sys_mem_exceed_limit_check(bytes)) {
         failed_msg = process_limit_exceeded_errmsg_str(bytes);
+        is_process_exceed = true;
         return false;
     }
 
@@ -253,6 +259,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
     } else {
         if (!_consumption->try_add(bytes, _limit)) {
             failed_msg = tracker_limit_exceeded_errmsg_str(bytes, this);
+            is_process_exceed = false;
             return false;
         }
     }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
index f197cad79a..d45d6b8cb5 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.cpp
@@ -17,6 +17,9 @@
 
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 
+#include <chrono>
+#include <thread>
+
 #include "runtime/exec_env.h"
 #include "runtime/fragment_mgr.h"
 #include "service/backend_options.h"
@@ -49,14 +52,24 @@ void ThreadMemTrackerMgr::cancel_fragment() {
     _check_limit = false; // Make sure it will only be canceled once
 }
 
-void ThreadMemTrackerMgr::exceeded() {
+void ThreadMemTrackerMgr::exceeded(int64_t size) {
     if (_cb_func != nullptr) {
         _cb_func();
     }
     _limiter_tracker_raw->print_log_usage(_exceed_mem_limit_msg);
 
     if (is_attach_query()) {
-        // TODO wait gc
+        if (_is_process_exceed && _wait_gc) {
+            int64_t wait_milliseconds = config::thread_wait_gc_max_milliseconds;
+            while (wait_milliseconds > 0) {
+                std::this_thread::sleep_for(std::chrono::milliseconds(100)); // Check every 100 ms.
+                if (!MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
+                    MemInfo::refresh_interval_memory_growth += size;
+                    return; // Process memory is sufficient, no cancel query.
+                }
+                wait_milliseconds -= 100;
+            }
+        }
         cancel_fragment();
     }
 }
diff --git a/be/src/runtime/memory/thread_mem_tracker_mgr.h b/be/src/runtime/memory/thread_mem_tracker_mgr.h
index 86a5c51d6e..151f79cfcb 100644
--- a/be/src/runtime/memory/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/memory/thread_mem_tracker_mgr.h
@@ -104,6 +104,7 @@ public:
     void set_check_limit(bool check_limit) { _check_limit = check_limit; }
     std::string exceed_mem_limit_msg() { return _exceed_mem_limit_msg; }
     void clear_exceed_mem_limit_msg() { _exceed_mem_limit_msg = ""; }
+    void disable_wait_gc() { _wait_gc = false; }
 
     std::string print_debug_string() {
         fmt::memory_buffer consumer_tracker_buf;
@@ -119,7 +120,7 @@ public:
 
 private:
     void cancel_fragment();
-    void exceeded();
+    void exceeded(int64_t size);
 
     void save_exceed_mem_limit_msg() {
         _exceed_mem_limit_msg = _limiter_tracker_raw->mem_limit_exceeded(
@@ -138,6 +139,8 @@ private:
 
     std::string _failed_consume_msg = std::string();
     std::string _exceed_mem_limit_msg = std::string();
+    bool _is_process_exceed = false;
+    bool _wait_gc = true;
 
     std::shared_ptr<MemTrackerLimiter> _limiter_tracker;
     MemTrackerLimiter* _limiter_tracker_raw = nullptr;
@@ -216,10 +219,11 @@ inline bool ThreadMemTrackerMgr::flush_untracked_mem() {
     old_untracked_mem = _untracked_mem;
     if (_count_scope_mem) _scope_mem += _untracked_mem;
     if (CheckLimit) {
-        if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _failed_consume_msg)) {
+        if (!_limiter_tracker_raw->try_consume(old_untracked_mem, _failed_consume_msg,
+                                               _is_process_exceed)) {
             if (Force) _limiter_tracker_raw->consume(old_untracked_mem);
             save_exceed_mem_limit_msg();
-            exceeded();
+            exceeded(old_untracked_mem);
             if (!Force) return false;
         }
     } else {
diff --git a/be/src/runtime/plan_fragment_executor.cpp b/be/src/runtime/plan_fragment_executor.cpp
index 3a9206f1fa..bc46027bec 100644
--- a/be/src/runtime/plan_fragment_executor.cpp
+++ b/be/src/runtime/plan_fragment_executor.cpp
@@ -276,6 +276,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         SCOPED_CPU_TIMER(_fragment_cpu_timer);
         SCOPED_TIMER(profile()->total_time_counter());
         RETURN_IF_ERROR(_plan->open(_runtime_state.get()));
+        RETURN_IF_CANCELLED(_runtime_state);
     }
     if (_sink == nullptr) {
         return Status::OK();
@@ -289,6 +290,7 @@ Status PlanFragmentExecutor::open_vectorized_internal() {
         auto sink_send_span_guard = Defer {[this]() { this->_sink->end_send_span(); }};
         while (true) {
             doris::vectorized::Block* block;
+            RETURN_IF_CANCELLED(_runtime_state);
 
             {
                 SCOPED_CPU_TIMER(_fragment_cpu_timer);
diff --git a/be/src/runtime/query_fragments_ctx.h b/be/src/runtime/query_fragments_ctx.h
index fe327d0e7c..f3565b9f30 100644
--- a/be/src/runtime/query_fragments_ctx.h
+++ b/be/src/runtime/query_fragments_ctx.h
@@ -128,6 +128,8 @@ public:
     // MemTracker that is shared by all fragment instances running on this host.
     std::shared_ptr<MemTrackerLimiter> query_mem_tracker;
 
+    std::vector<TUniqueId> fragment_ids;
+
 private:
     ExecEnv* _exec_env;
     DateTimeValue _start_time;
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index cdc8d483f2..8b7116aabc 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -205,6 +205,8 @@ static void pthread_attach_bthread() {
         // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
         // So tracker call reset 0 like reuses btls.
         bthread_context = new ThreadContext;
+        // The brpc server should respond as quickly as possible.
+        bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
         // set the data so that next time bthread_getspecific in the thread returns the data.
         CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
     }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 6a2f57cc4b..453c1cd2bd 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -34,6 +34,7 @@
 
 #include "common/config.h"
 #include "gutil/strings/split.h"
+#include "olap/page_cache.h"
 #include "util/cgroup_util.h"
 #include "util/parse_util.h"
 #include "util/pretty_printer.h"
@@ -51,12 +52,15 @@ int64_t MemInfo::_s_allocator_cache_mem = 0;
 std::string MemInfo::_s_allocator_cache_mem_str = "";
 int64_t MemInfo::_s_virtual_memory_used = 0;
 int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
+std::atomic<int64_t> MemInfo::refresh_interval_memory_growth = 0;
 
 static std::unordered_map<std::string, int64_t> _mem_info_bytes;
 int64_t MemInfo::_s_sys_mem_available = 0;
 std::string MemInfo::_s_sys_mem_available_str = "";
 int64_t MemInfo::_s_sys_mem_available_low_water_mark = 0;
 int64_t MemInfo::_s_sys_mem_available_warning_water_mark = 0;
+int64_t MemInfo::_s_process_minor_gc_size = -1;
+int64_t MemInfo::_s_process_full_gc_size = -1;
 
 void MemInfo::refresh_allocator_mem() {
 #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER)
@@ -85,6 +89,42 @@ void MemInfo::refresh_allocator_mem() {
 #endif
 }
 
+void MemInfo::process_minor_gc() {
+    // TODO, free more cache, and should free a certain percentage of capacity, not all.
+    int64_t freed_mem = 0;
+    Defer defer {[&]() {
+        LOG(INFO) << fmt::format("Process Minor GC Free Memory {} Bytes", freed_mem);
+    }};
+
+    freed_mem += ChunkAllocator::instance()->mem_consumption();
+    ChunkAllocator::instance()->clear();
+    if (freed_mem > _s_process_minor_gc_size) {
+        return;
+    }
+    freed_mem +=
+            StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+}
+
+void MemInfo::process_full_gc() {
+    int64_t freed_mem = 0;
+    Defer defer {
+            [&]() { LOG(INFO) << fmt::format("Process Full GC Free Memory {} Bytes", freed_mem); }};
+
+    freed_mem +=
+            StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
+    StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+    if (freed_mem > _s_process_full_gc_size) {
+        return;
+    }
+    freed_mem += ChunkAllocator::instance()->mem_consumption();
+    ChunkAllocator::instance()->clear();
+    if (freed_mem > _s_process_full_gc_size) {
+        return;
+    }
+    freed_mem += MemTrackerLimiter::free_top_query(_s_process_full_gc_size - freed_mem);
+}
+
 #ifndef __APPLE__
 void MemInfo::refresh_proc_meminfo() {
     std::ifstream meminfo("/proc/meminfo", std::ios::in);
@@ -143,6 +183,11 @@ void MemInfo::init() {
     _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
     _s_soft_mem_limit = _s_mem_limit * config::soft_mem_limit_frac;
 
+    _s_process_minor_gc_size =
+            ParseUtil::parse_mem_spec(config::process_minor_gc_size, -1, _s_mem_limit, &is_percent);
+    _s_process_full_gc_size =
+            ParseUtil::parse_mem_spec(config::process_full_gc_size, -1, _s_mem_limit, &is_percent);
+
     std::string line;
     int64_t _s_vm_min_free_kbytes = 0;
     std::ifstream vminfo("/proc/sys/vm/min_free_kbytes", std::ios::in);
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 52281f508e..bd76c6124c 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -53,7 +53,9 @@ public:
 
     static void refresh_proc_meminfo();
 
-    static inline int64_t sys_mem_available() { return _s_sys_mem_available; }
+    static inline int64_t sys_mem_available() {
+        return _s_sys_mem_available - refresh_interval_memory_growth;
+    }
     static inline std::string sys_mem_available_str() { return _s_sys_mem_available_str; }
     static inline int64_t sys_mem_available_low_water_mark() {
         return _s_sys_mem_available_low_water_mark;
@@ -83,7 +85,9 @@ public:
     static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; }
     static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; }
     static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; }
-    static inline int64_t proc_mem_no_allocator_cache() { return _s_proc_mem_no_allocator_cache; }
+    static inline int64_t proc_mem_no_allocator_cache() {
+        return _s_proc_mem_no_allocator_cache + refresh_interval_memory_growth;
+    }
 
     // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory
     // obtained by the process malloc, not the physical memory actually used by the process in the OS.
@@ -92,6 +96,7 @@ public:
     static inline void refresh_proc_mem_no_allocator_cache() {
         _s_proc_mem_no_allocator_cache =
                 PerfCounters::get_vm_rss() - static_cast<int64_t>(_s_allocator_cache_mem);
+        refresh_interval_memory_growth = 0;
     }
 
     static inline int64_t mem_limit() {
@@ -109,6 +114,13 @@ public:
 
     static std::string debug_string();
 
+    static void process_minor_gc();
+    static void process_full_gc();
+
+    // It is only used after the memory limit is exceeded. When multiple threads are waiting for the available memory of the process,
+    // avoid multiple threads starting at the same time and causing OOM.
+    static std::atomic<int64_t> refresh_interval_memory_growth;
+
 private:
     static bool _s_initialized;
     static int64_t _s_physical_mem;
@@ -125,6 +137,8 @@ private:
     static std::string _s_sys_mem_available_str;
     static int64_t _s_sys_mem_available_low_water_mark;
     static int64_t _s_sys_mem_available_warning_water_mark;
+    static int64_t _s_process_minor_gc_size;
+    static int64_t _s_process_full_gc_size;
 };
 
 } // namespace doris


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org