You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/26 05:24:09 UTC

[doris] 03/08: [enhancement](memory) Support query memroy overcommit #14948

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git

commit a117c4d05097f90df2022cd5ae9d33e58aed3a97
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Dec 9 14:09:05 2022 +0800

    [enhancement](memory) Support query memroy overcommit #14948
    
    Add conf enable_query_memroy_overcommit
    
    If true, when the process does not exceed the soft mem limit, the query memory will not be limited; when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently used memory and the exec_mem_limit will be canceled.
    
    If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
---
 be/src/common/config.h                        |  6 ++
 be/src/runtime/memory/mem_tracker_limiter.cpp | 80 ++++++++++++++++++++++-----
 be/src/runtime/memory/mem_tracker_limiter.h   | 21 +++++--
 be/src/util/mem_info.cpp                      |  6 +-
 4 files changed, 93 insertions(+), 20 deletions(-)

diff --git a/be/src/common/config.h b/be/src/common/config.h
index 86b278b76b..3bc843e422 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -72,6 +72,12 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918");
 CONF_mString(process_minor_gc_size, "10%");
 CONF_mString(process_full_gc_size, "20%");
 
+// If true, when the process does not exceed the soft mem limit, the query memory will not be limited;
+// when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently
+// used memory and the exec_mem_limit will be canceled.
+// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
+CONF_mBool(enable_query_memroy_overcommit, "true");
+
 // The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
 CONF_mInt32(thread_wait_gc_max_milliseconds, "1000");
 
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index a228dcf5a7..a37a0be0f9 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -28,7 +28,6 @@
 #include "runtime/thread_context.h"
 #include "util/pretty_printer.h"
 #include "util/stack_util.h"
-#include "util/string_util.h"
 
 namespace doris {
 
@@ -240,7 +239,7 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
     return Status::MemoryLimitExceeded(failed_msg);
 }
 
-int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
+int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) {
     std::priority_queue<std::pair<int64_t, std::string>,
                         std::vector<std::pair<int64_t, std::string>>,
                         std::greater<std::pair<int64_t, std::string>>>
@@ -248,16 +247,8 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
     // After greater than min_free_mem, will not be modified.
     int64_t prepare_free_mem = 0;
 
-    auto label_to_queryid = [&](const std::string& label) -> TUniqueId {
-        auto queryid = split(label, "#Id=")[1];
-        TUniqueId querytid;
-        parse_id(queryid, &querytid);
-        return querytid;
-    };
-
-    auto cancel_top_query = [&](auto min_pq, auto label_to_queryid) -> int64_t {
+    auto cancel_top_query = [&](auto min_pq) -> int64_t {
         std::vector<std::string> usage_strings;
-        bool had_cancel = false;
         int64_t freed_mem = 0;
         while (!min_pq.empty()) {
             TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
@@ -276,10 +267,9 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
             freed_mem += min_pq.top().first;
             usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
                                                 min_pq.top().first));
-            had_cancel = true;
             min_pq.pop();
         }
-        if (had_cancel) {
+        if (!usage_strings.empty()) {
             LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ",");
         }
         return freed_mem;
@@ -297,7 +287,7 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
                     std::swap(min_pq, min_pq_null);
                     min_pq.push(
                             pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
-                    return cancel_top_query(min_pq, label_to_queryid);
+                    return cancel_top_query(min_pq);
                 } else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
                     min_pq.push(
                             pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
@@ -311,7 +301,67 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
             }
         }
     }
-    return cancel_top_query(min_pq, label_to_queryid);
+    return cancel_top_query(min_pq);
+}
+
+int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
+    std::priority_queue<std::pair<int64_t, std::string>,
+                        std::vector<std::pair<int64_t, std::string>>,
+                        std::greater<std::pair<int64_t, std::string>>>
+            min_pq;
+    std::unordered_map<std::string, int64_t> query_consumption;
+
+    for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
+        std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
+        for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+            if (tracker->type() == Type::QUERY) {
+                int64_t overcommit_ratio =
+                        (static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000;
+                if (overcommit_ratio == 0) { // Small query does not cancel
+                    continue;
+                }
+                min_pq.push(pair<int64_t, std::string>(overcommit_ratio, tracker->label()));
+                query_consumption[tracker->label()] = tracker->consumption();
+            }
+        }
+    }
+
+    std::priority_queue<std::pair<int64_t, std::string>> max_pq;
+    // Min-heap to Max-heap.
+    while (!min_pq.empty()) {
+        max_pq.push(min_pq.top());
+        min_pq.pop();
+    }
+
+    std::vector<std::string> usage_strings;
+    int64_t freed_mem = 0;
+    while (!max_pq.empty()) {
+        TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second);
+        int64_t query_mem = query_consumption[max_pq.top().second];
+        ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+                cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
+                fmt::format("Process has no memory available, cancel top memory usage query: "
+                            "query memory tracker <{}> consumption {}, backend {} "
+                            "process memory used {} exceed limit {} or sys mem available {} "
+                            "less than low water mark {}. Execute again after enough memory, "
+                            "details see be.INFO.",
+                            max_pq.top().second, print_bytes(query_mem),
+                            BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
+                            MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(),
+                            print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+
+        usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}",
+                                            max_pq.top().second, query_mem, max_pq.top().first));
+        freed_mem += query_mem;
+        if (freed_mem > min_free_mem) {
+            break;
+        }
+        max_pq.pop();
+    }
+    if (!usage_strings.empty()) {
+        LOG(INFO) << "Process GC Free Top Memory Overcommit Query: " << join(usage_strings, ",");
+    }
+    return freed_mem;
 }
 
 } // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 6415510315..617a7ffdee 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -25,6 +25,7 @@
 #include "service/backend_options.h"
 #include "util/mem_info.h"
 #include "util/perf_counters.h"
+#include "util/string_util.h"
 
 namespace doris {
 
@@ -144,8 +145,18 @@ public:
     Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
                                        int64_t failed_allocation_size = 0);
 
-    // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is released.
-    static int64_t free_top_query(int64_t min_free_mem);
+    // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed.
+    static int64_t free_top_memory_query(int64_t min_free_mem);
+    // Start canceling from the query with the largest memory overcommit ratio until the memory
+    // of min_free_mem size is freed.
+    static int64_t free_top_overcommit_query(int64_t min_free_mem);
+    // only for Type::QUERY or Type::LOAD.
+    static TUniqueId label_to_queryid(const std::string& label) {
+        auto queryid = split(label, "#Id=")[1];
+        TUniqueId querytid;
+        parse_id(queryid, &querytid);
+        return querytid;
+    };
 
     static std::string process_mem_log_str() {
         return fmt::format(
@@ -254,7 +265,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
         return false;
     }
 
-    if (_limit < 0) {
+    if (_limit < 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) {
         _consumption->add(bytes); // No limit at this tracker.
     } else {
         if (!_consumption->try_add(bytes, _limit)) {
@@ -271,7 +282,9 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
     if (sys_mem_exceed_limit_check(bytes)) {
         return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes));
     }
-    if (bytes <= 0) return Status::OK();
+    if (bytes <= 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) {
+        return Status::OK();
+    }
     if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
         return Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this));
     }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 453c1cd2bd..55500feea7 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -104,6 +104,10 @@ void MemInfo::process_minor_gc() {
     freed_mem +=
             StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
     StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+    if (config::enable_query_memroy_overcommit) {
+        freed_mem +=
+                MemTrackerLimiter::free_top_overcommit_query(_s_process_full_gc_size - freed_mem);
+    }
 }
 
 void MemInfo::process_full_gc() {
@@ -122,7 +126,7 @@ void MemInfo::process_full_gc() {
     if (freed_mem > _s_process_full_gc_size) {
         return;
     }
-    freed_mem += MemTrackerLimiter::free_top_query(_s_process_full_gc_size - freed_mem);
+    freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem);
 }
 
 #ifndef __APPLE__


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org