You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/12/26 05:24:09 UTC
[doris] 03/08: [enhancement](memory) Support query memroy overcommit #14948
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.2-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
commit a117c4d05097f90df2022cd5ae9d33e58aed3a97
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Fri Dec 9 14:09:05 2022 +0800
[enhancement](memory) Support query memroy overcommit #14948
Add conf enable_query_memroy_overcommit
If true, when the process does not exceed the soft mem limit, the query memory will not be limited; when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently used memory and the exec_mem_limit will be canceled.
If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
---
be/src/common/config.h | 6 ++
be/src/runtime/memory/mem_tracker_limiter.cpp | 80 ++++++++++++++++++++++-----
be/src/runtime/memory/mem_tracker_limiter.h | 21 +++++--
be/src/util/mem_info.cpp | 6 +-
4 files changed, 93 insertions(+), 20 deletions(-)
diff --git a/be/src/common/config.h b/be/src/common/config.h
index 86b278b76b..3bc843e422 100644
--- a/be/src/common/config.h
+++ b/be/src/common/config.h
@@ -72,6 +72,12 @@ CONF_Int64(max_sys_mem_available_low_water_mark_bytes, "1717986918");
CONF_mString(process_minor_gc_size, "10%");
CONF_mString(process_full_gc_size, "20%");
+// If true, when the process does not exceed the soft mem limit, the query memory will not be limited;
+// when the process memory exceeds the soft mem limit, the query with the largest ratio between the currently
+// used memory and the exec_mem_limit will be canceled.
+// If false, cancel query when the memory used exceeds exec_mem_limit, same as before.
+CONF_mBool(enable_query_memroy_overcommit, "true");
+
// The maximum time a thread waits for a full GC. Currently only query will wait for full gc.
CONF_mInt32(thread_wait_gc_max_milliseconds, "1000");
diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index a228dcf5a7..a37a0be0f9 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -28,7 +28,6 @@
#include "runtime/thread_context.h"
#include "util/pretty_printer.h"
#include "util/stack_util.h"
-#include "util/string_util.h"
namespace doris {
@@ -240,7 +239,7 @@ Status MemTrackerLimiter::fragment_mem_limit_exceeded(RuntimeState* state, const
return Status::MemoryLimitExceeded(failed_msg);
}
-int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
+int64_t MemTrackerLimiter::free_top_memory_query(int64_t min_free_mem) {
std::priority_queue<std::pair<int64_t, std::string>,
std::vector<std::pair<int64_t, std::string>>,
std::greater<std::pair<int64_t, std::string>>>
@@ -248,16 +247,8 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
// After greater than min_free_mem, will not be modified.
int64_t prepare_free_mem = 0;
- auto label_to_queryid = [&](const std::string& label) -> TUniqueId {
- auto queryid = split(label, "#Id=")[1];
- TUniqueId querytid;
- parse_id(queryid, &querytid);
- return querytid;
- };
-
- auto cancel_top_query = [&](auto min_pq, auto label_to_queryid) -> int64_t {
+ auto cancel_top_query = [&](auto min_pq) -> int64_t {
std::vector<std::string> usage_strings;
- bool had_cancel = false;
int64_t freed_mem = 0;
while (!min_pq.empty()) {
TUniqueId cancelled_queryid = label_to_queryid(min_pq.top().second);
@@ -276,10 +267,9 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
freed_mem += min_pq.top().first;
usage_strings.push_back(fmt::format("{} memory usage {} Bytes", min_pq.top().second,
min_pq.top().first));
- had_cancel = true;
min_pq.pop();
}
- if (had_cancel) {
+ if (!usage_strings.empty()) {
LOG(INFO) << "Process GC Free Top Memory Usage Query: " << join(usage_strings, ",");
}
return freed_mem;
@@ -297,7 +287,7 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
std::swap(min_pq, min_pq_null);
min_pq.push(
pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
- return cancel_top_query(min_pq, label_to_queryid);
+ return cancel_top_query(min_pq);
} else if (tracker->consumption() + prepare_free_mem < min_free_mem) {
min_pq.push(
pair<int64_t, std::string>(tracker->consumption(), tracker->label()));
@@ -311,7 +301,67 @@ int64_t MemTrackerLimiter::free_top_query(int64_t min_free_mem) {
}
}
}
- return cancel_top_query(min_pq, label_to_queryid);
+ return cancel_top_query(min_pq);
+}
+
+int64_t MemTrackerLimiter::free_top_overcommit_query(int64_t min_free_mem) {
+ std::priority_queue<std::pair<int64_t, std::string>,
+ std::vector<std::pair<int64_t, std::string>>,
+ std::greater<std::pair<int64_t, std::string>>>
+ min_pq;
+ std::unordered_map<std::string, int64_t> query_consumption;
+
+ for (unsigned i = 1; i < mem_tracker_limiter_pool.size(); ++i) {
+ std::lock_guard<std::mutex> l(mem_tracker_limiter_pool[i].group_lock);
+ for (auto tracker : mem_tracker_limiter_pool[i].trackers) {
+ if (tracker->type() == Type::QUERY) {
+ int64_t overcommit_ratio =
+ (static_cast<double>(tracker->consumption()) / tracker->limit()) * 10000;
+ if (overcommit_ratio == 0) { // Small query does not cancel
+ continue;
+ }
+ min_pq.push(pair<int64_t, std::string>(overcommit_ratio, tracker->label()));
+ query_consumption[tracker->label()] = tracker->consumption();
+ }
+ }
+ }
+
+ std::priority_queue<std::pair<int64_t, std::string>> max_pq;
+ // Min-heap to Max-heap.
+ while (!min_pq.empty()) {
+ max_pq.push(min_pq.top());
+ min_pq.pop();
+ }
+
+ std::vector<std::string> usage_strings;
+ int64_t freed_mem = 0;
+ while (!max_pq.empty()) {
+ TUniqueId cancelled_queryid = label_to_queryid(max_pq.top().second);
+ int64_t query_mem = query_consumption[max_pq.top().second];
+ ExecEnv::GetInstance()->fragment_mgr()->cancel_query(
+ cancelled_queryid, PPlanFragmentCancelReason::MEMORY_LIMIT_EXCEED,
+ fmt::format("Process has no memory available, cancel top memory usage query: "
+ "query memory tracker <{}> consumption {}, backend {} "
+ "process memory used {} exceed limit {} or sys mem available {} "
+ "less than low water mark {}. Execute again after enough memory, "
+ "details see be.INFO.",
+ max_pq.top().second, print_bytes(query_mem),
+ BackendOptions::get_localhost(), PerfCounters::get_vm_rss_str(),
+ MemInfo::mem_limit_str(), MemInfo::sys_mem_available_str(),
+ print_bytes(MemInfo::sys_mem_available_low_water_mark())));
+
+ usage_strings.push_back(fmt::format("{} memory usage {} Bytes, overcommit ratio: {}",
+ max_pq.top().second, query_mem, max_pq.top().first));
+ freed_mem += query_mem;
+ if (freed_mem > min_free_mem) {
+ break;
+ }
+ max_pq.pop();
+ }
+ if (!usage_strings.empty()) {
+ LOG(INFO) << "Process GC Free Top Memory Overcommit Query: " << join(usage_strings, ",");
+ }
+ return freed_mem;
}
} // namespace doris
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index 6415510315..617a7ffdee 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -25,6 +25,7 @@
#include "service/backend_options.h"
#include "util/mem_info.h"
#include "util/perf_counters.h"
+#include "util/string_util.h"
namespace doris {
@@ -144,8 +145,18 @@ public:
Status fragment_mem_limit_exceeded(RuntimeState* state, const std::string& msg,
int64_t failed_allocation_size = 0);
- // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is released.
- static int64_t free_top_query(int64_t min_free_mem);
+ // Start canceling from the query with the largest memory usage until the memory of min_free_mem size is freed.
+ static int64_t free_top_memory_query(int64_t min_free_mem);
+ // Start canceling from the query with the largest memory overcommit ratio until the memory
+ // of min_free_mem size is freed.
+ static int64_t free_top_overcommit_query(int64_t min_free_mem);
+ // only for Type::QUERY or Type::LOAD.
+ static TUniqueId label_to_queryid(const std::string& label) {
+ auto queryid = split(label, "#Id=")[1];
+ TUniqueId querytid;
+ parse_id(queryid, &querytid);
+ return querytid;
+ };
static std::string process_mem_log_str() {
return fmt::format(
@@ -254,7 +265,7 @@ inline bool MemTrackerLimiter::try_consume(int64_t bytes, std::string& failed_ms
return false;
}
- if (_limit < 0) {
+ if (_limit < 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) {
_consumption->add(bytes); // No limit at this tracker.
} else {
if (!_consumption->try_add(bytes, _limit)) {
@@ -271,7 +282,9 @@ inline Status MemTrackerLimiter::check_limit(int64_t bytes) {
if (sys_mem_exceed_limit_check(bytes)) {
return Status::MemoryLimitExceeded(process_limit_exceeded_errmsg_str(bytes));
}
- if (bytes <= 0) return Status::OK();
+ if (bytes <= 0 || (_type == Type::QUERY && config::enable_query_memroy_overcommit)) {
+ return Status::OK();
+ }
if (_limit > 0 && _consumption->current_value() + bytes > _limit) {
return Status::MemoryLimitExceeded(tracker_limit_exceeded_errmsg_str(bytes, this));
}
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 453c1cd2bd..55500feea7 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -104,6 +104,10 @@ void MemInfo::process_minor_gc() {
freed_mem +=
StoragePageCache::instance()->get_page_cache_mem_consumption(segment_v2::DATA_PAGE);
StoragePageCache::instance()->prune(segment_v2::DATA_PAGE);
+ if (config::enable_query_memroy_overcommit) {
+ freed_mem +=
+ MemTrackerLimiter::free_top_overcommit_query(_s_process_full_gc_size - freed_mem);
+ }
}
void MemInfo::process_full_gc() {
@@ -122,7 +126,7 @@ void MemInfo::process_full_gc() {
if (freed_mem > _s_process_full_gc_size) {
return;
}
- freed_mem += MemTrackerLimiter::free_top_query(_s_process_full_gc_size - freed_mem);
+ freed_mem += MemTrackerLimiter::free_top_memory_query(_s_process_full_gc_size - freed_mem);
}
#ifndef __APPLE__
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org