You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/21 09:35:27 UTC

[doris] branch master updated: [fix](memory) Fix memory exceed limit and query has been canceled, Allocator will block 100ms (#20959)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 84b97860a1 [fix](memory) Fix memory exceed  limit and query has been canceled, Allocator will block 100ms (#20959)
84b97860a1 is described below

commit 84b97860a103f684cf892c7151fa3879ef9a7fbb
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed Jun 21 17:35:19 2023 +0800

    [fix](memory) Fix memory exceed  limit and query has been canceled, Allocator will block 100ms (#20959)
---
 be/src/runtime/memory/mem_tracker_limiter.cpp |  4 ++-
 be/src/vec/common/allocator.cpp               | 39 ++++++++++++++++++---------
 2 files changed, 30 insertions(+), 13 deletions(-)

diff --git a/be/src/runtime/memory/mem_tracker_limiter.cpp b/be/src/runtime/memory/mem_tracker_limiter.cpp
index 24a79bc1b6..ded5b0c0b8 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.cpp
+++ b/be/src/runtime/memory/mem_tracker_limiter.cpp
@@ -437,7 +437,9 @@ int64_t MemTrackerLimiter::free_top_overcommit_query(
         std::lock_guard<std::mutex> l(tracker_groups[i].group_lock);
         for (auto tracker : tracker_groups[i].trackers) {
             if (tracker->type() == type) {
-                if (tracker->consumption() <= 33554432) { // 32M small query does not cancel
+                // 32M small query does not cancel
+                if (tracker->consumption() <= 33554432 ||
+                    tracker->consumption() < tracker->limit()) {
                     continue;
                 }
                 if (tracker->is_query_cancelled()) {
diff --git a/be/src/vec/common/allocator.cpp b/be/src/vec/common/allocator.cpp
index b74ed398d4..366fc48ce2 100644
--- a/be/src/vec/common/allocator.cpp
+++ b/be/src/vec/common/allocator.cpp
@@ -31,6 +31,7 @@
 #include "runtime/memory/mem_tracker_limiter.h"
 #include "runtime/memory/thread_mem_tracker_mgr.h"
 #include "runtime/thread_context.h"
+#include "util/defer_op.h"
 #include "util/mem_info.h"
 #include "util/uid_util.h"
 
@@ -47,12 +48,22 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
                 size, doris::thread_context()->thread_mem_tracker()->label(),
                 doris::thread_context()->thread_mem_tracker_mgr->last_consumer_tracker(),
                 doris::MemTrackerLimiter::process_limit_exceeded_errmsg_str());
+
+        // TODO, Save the query context in the thread context, instead of finding whether the query id is canceled in fragment_mgr.
+        if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
+                    doris::thread_context()->task_id())) {
+            if (doris::enable_thread_catch_bad_alloc) {
+                throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
+            }
+            return;
+        }
         if (doris::thread_context()->thread_mem_tracker_mgr->is_attach_query() &&
             doris::thread_context()->thread_mem_tracker_mgr->wait_gc()) {
-            int64_t wait_milliseconds = doris::config::thread_wait_gc_max_milliseconds;
-            LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum 5s, {}.",
-                                     print_id(doris::thread_context()->task_id()), err_msg);
-            while (wait_milliseconds > 0) {
+            int64_t wait_milliseconds = 0;
+            LOG(INFO) << fmt::format("Query:{} waiting for enough memory, maximum {}ms, {}.",
+                                     print_id(doris::thread_context()->task_id()),
+                                     doris::config::thread_wait_gc_max_milliseconds, err_msg);
+            while (wait_milliseconds < doris::config::thread_wait_gc_max_milliseconds) {
                 std::this_thread::sleep_for(std::chrono::milliseconds(100));
                 if (!doris::MemTrackerLimiter::sys_mem_exceed_limit_check(size)) {
                     doris::MemInfo::refresh_interval_memory_growth += size;
@@ -60,25 +71,29 @@ void Allocator<clear_memory_, mmap_populate, use_mmap>::sys_memory_check(size_t
                 }
                 if (doris::ExecEnv::GetInstance()->fragment_mgr()->query_is_canceled(
                             doris::thread_context()->task_id())) {
-                    wait_milliseconds = 0;
-                    break;
+                    if (doris::enable_thread_catch_bad_alloc) {
+                        throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
+                    }
+                    return;
                 }
-                wait_milliseconds -= 100;
+                wait_milliseconds += 100;
             }
-            if (wait_milliseconds <= 0) {
+            if (wait_milliseconds >= doris::config::thread_wait_gc_max_milliseconds) {
                 // Make sure to completely wait thread_wait_gc_max_milliseconds only once.
                 doris::thread_context()->thread_mem_tracker_mgr->disable_wait_gc();
                 doris::MemTrackerLimiter::print_log_process_usage(err_msg);
                 // If the external catch, throw bad::alloc first, let the query actively cancel. Otherwise asynchronous cancel.
                 if (!doris::enable_thread_catch_bad_alloc) {
                     LOG(INFO) << fmt::format(
-                            "Query:{} canceled asyn, after waiting for memory 5s, {}.",
-                            print_id(doris::thread_context()->task_id()), err_msg);
+                            "Query:{} canceled asyn, after waiting for memory {}ms, {}.",
+                            print_id(doris::thread_context()->task_id()), wait_milliseconds,
+                            err_msg);
                     doris::thread_context()->thread_mem_tracker_mgr->cancel_fragment(err_msg);
                 } else {
                     LOG(INFO) << fmt::format(
-                            "Query:{} throw exception, after waiting for memory 5s, {}.",
-                            print_id(doris::thread_context()->task_id()), err_msg);
+                            "Query:{} throw exception, after waiting for memory {}ms, {}.",
+                            print_id(doris::thread_context()->task_id()), wait_milliseconds,
+                            err_msg);
                     throw doris::Exception(doris::ErrorCode::MEM_ALLOC_FAILED, err_msg);
                 }
             }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org