You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by da...@apache.org on 2022/06/25 06:13:08 UTC

[doris] branch master updated: [fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409)

This is an automated email from the ASF dual-hosted git repository.

dataroaring pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new eb25df5a2c [fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409)
eb25df5a2c is described below

commit eb25df5a2cb7de6e541fd4b0ed8c43ca78cd97a4
Author: Kidd <10...@users.noreply.github.com>
AuthorDate: Sat Jun 25 14:13:02 2022 +0800

    [fix] (mem tracker) Fix inaccurate mem tracker leads to load OOM (#10409)
    
    * fix load tracker
    
    * fix comment
---
 be/src/runtime/load_channel.cpp         | 10 ++-----
 be/src/runtime/load_channel.h           |  6 ++---
 be/src/runtime/load_channel_mgr.cpp     | 46 +++++++++++++++++----------------
 be/src/runtime/load_channel_mgr.h       |  8 ++----
 be/src/runtime/tablets_channel.cpp      |  2 --
 be/src/runtime/tcmalloc_hook.h          |  5 ++++
 be/src/runtime/thread_context.h         |  4 +++
 be/src/runtime/thread_mem_tracker_mgr.h | 17 +++++++-----
 be/src/service/internal_service.cpp     |  8 +++---
 9 files changed, 53 insertions(+), 53 deletions(-)

diff --git a/be/src/runtime/load_channel.cpp b/be/src/runtime/load_channel.cpp
index 42fbe9dac5..d2223554eb 100644
--- a/be/src/runtime/load_channel.cpp
+++ b/be/src/runtime/load_channel.cpp
@@ -25,19 +25,15 @@
 
 namespace doris {
 
-LoadChannel::LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit,
+LoadChannel::LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTracker>& mem_tracker,
                          int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
                          bool is_vec)
         : _load_id(load_id),
+          _mem_tracker(mem_tracker),
           _timeout_s(timeout_s),
           _is_high_priority(is_high_priority),
           _sender_ip(sender_ip),
           _is_vec(is_vec) {
-    _mem_tracker = MemTracker::create_tracker(
-            channel_mem_limit, "LoadChannel#senderIp=" + sender_ip,
-            ExecEnv::GetInstance()->task_pool_mem_tracker_registry()->register_load_mem_tracker(
-                    _load_id.to_string(), load_mem_limit),
-            MemTrackerLevel::TASK);
     // _last_updated_time should be set before being inserted to
     // _load_channels in load_channel_mgr, or it may be erased
     // immediately by gc thread.
@@ -52,7 +48,6 @@ LoadChannel::~LoadChannel() {
 }
 
 Status LoadChannel::open(const PTabletWriterOpenRequest& params) {
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t index_id = params.index_id();
     std::shared_ptr<TabletsChannel> channel;
     {
@@ -138,7 +133,6 @@ bool LoadChannel::is_finished() {
 }
 
 Status LoadChannel::cancel() {
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::lock_guard<std::mutex> l(_lock);
     for (auto& it : _tablets_channels) {
         it.second->cancel();
diff --git a/be/src/runtime/load_channel.h b/be/src/runtime/load_channel.h
index 38cc2ac89f..20ef476dd9 100644
--- a/be/src/runtime/load_channel.h
+++ b/be/src/runtime/load_channel.h
@@ -39,9 +39,8 @@ class Cache;
 // corresponding to a certain load job
 class LoadChannel {
 public:
-    LoadChannel(const UniqueId& load_id, int64_t load_mem_limit, int64_t channel_mem_limit,
-                int64_t timeout_s, bool is_high_priority, const std::string& sender_ip,
-                bool is_vec);
+    LoadChannel(const UniqueId& load_id, std::shared_ptr<MemTracker>& mem_tracker,
+                int64_t timeout_s, bool is_high_priority, const std::string& sender_ip, bool is_ve);
     ~LoadChannel();
 
     // open a new load channel if not exist
@@ -129,7 +128,6 @@ private:
 template <typename TabletWriterAddRequest, typename TabletWriterAddResult>
 Status LoadChannel::add_batch(const TabletWriterAddRequest& request,
                               TabletWriterAddResult* response) {
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     int64_t index_id = request.index_id();
     // 1. get tablets channel
     std::shared_ptr<TabletsChannel> channel;
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index aea5479aa6..c105bd96b3 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -84,10 +84,9 @@ LoadChannelMgr::~LoadChannelMgr() {
 
 Status LoadChannelMgr::init(int64_t process_mem_limit) {
     int64_t load_mgr_mem_limit = calc_process_max_load_memory(process_mem_limit);
-    _mem_tracker = MemTracker::create_tracker(load_mgr_mem_limit, "LoadChannelMgr",
-                                              MemTracker::get_process_tracker(),
-                                              MemTrackerLevel::OVERVIEW);
-    SCOPED_SWITCH_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
+    _mem_tracker = MemTracker::create_virtual_tracker(load_mgr_mem_limit, "LoadChannelMgr",
+                                                      MemTracker::get_process_tracker(),
+                                                      MemTrackerLevel::OVERVIEW);
     REGISTER_HOOK_METRIC(load_channel_mem_consumption,
                          [this]() { return _mem_tracker->consumption(); });
     _last_success_channel = new_lru_cache("LastestSuccessChannelCache", 1024);
@@ -95,16 +94,7 @@ Status LoadChannelMgr::init(int64_t process_mem_limit) {
     return Status::OK();
 }
 
-LoadChannel* LoadChannelMgr::_create_load_channel(const UniqueId& load_id, int64_t load_mem_limit,
-                                                  int64_t channel_mem_limit, int64_t timeout_s,
-                                                  bool is_high_priority,
-                                                  const std::string& sender_ip, bool is_vec) {
-    return new LoadChannel(load_id, load_mem_limit, channel_mem_limit, timeout_s, is_high_priority,
-                           sender_ip, is_vec);
-}
-
 Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     UniqueId load_id(params.id());
     std::shared_ptr<LoadChannel> channel;
     {
@@ -114,18 +104,31 @@ Status LoadChannelMgr::open(const PTabletWriterOpenRequest& params) {
             channel = it->second;
         } else {
             // create a new load channel
-            int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1;
-            int64_t channel_mem_limit =
-                    calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit());
-
             int64_t timeout_in_req_s =
                     params.has_load_channel_timeout_s() ? params.load_channel_timeout_s() : -1;
             int64_t channel_timeout_s = calc_channel_timeout_s(timeout_in_req_s);
-
             bool is_high_priority = (params.has_is_high_priority() && params.is_high_priority());
-            channel.reset(_create_load_channel(load_id, load_mem_limit, channel_mem_limit,
-                                               channel_timeout_s, is_high_priority,
-                                               params.sender_ip(), params.is_vectorized()));
+
+            int64_t load_mem_limit = params.has_load_mem_limit() ? params.load_mem_limit() : -1;
+            int64_t channel_mem_limit =
+                    calc_channel_max_load_memory(load_mem_limit, _mem_tracker->limit());
+            auto channel_mem_tracker =
+                    MemTracker::create_tracker(channel_mem_limit,
+                                               fmt::format("LoadChannel#senderIp={}#loadID={}",
+                                                           params.sender_ip(), load_id.to_string()),
+                                               _mem_tracker);
+            // TODO
+            // auto channel_mem_tracker_job = std::make_shared<MemTracker>(
+            //         -1,
+            //         fmt::format("LoadChannel#senderIp={}#loadID={}", params.sender_ip(),
+            //                     load_id.to_string()),
+            //         ExecEnv::GetInstance()
+            //                 ->task_pool_mem_tracker_registry()
+            //                 ->register_load_mem_tracker(load_id.to_string(), load_mem_limit),
+            //         MemTrackerLevel::TASK);
+            channel.reset(new LoadChannel(load_id, channel_mem_tracker, channel_timeout_s,
+                                          is_high_priority, params.sender_ip(),
+                                          params.is_vectorized()));
             _load_channels.insert({load_id, channel});
         }
     }
@@ -181,7 +184,6 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
 }
 
 Status LoadChannelMgr::cancel(const PTabletWriterCancelRequest& params) {
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     UniqueId load_id(params.id());
     std::shared_ptr<LoadChannel> cancelled_channel;
     {
diff --git a/be/src/runtime/load_channel_mgr.h b/be/src/runtime/load_channel_mgr.h
index 65d72534f4..39d7ed5b2b 100644
--- a/be/src/runtime/load_channel_mgr.h
+++ b/be/src/runtime/load_channel_mgr.h
@@ -61,11 +61,6 @@ public:
     std::shared_ptr<MemTracker> mem_tracker() { return _mem_tracker; }
 
 private:
-    static LoadChannel* _create_load_channel(const UniqueId& load_id, int64_t load_mem_limit,
-                                             int64_t channel_mem_limit, int64_t timeout_s,
-                                             bool is_high_priority, const std::string& sender_ip,
-                                             bool is_vec);
-
     template <typename Request>
     Status _get_load_channel(std::shared_ptr<LoadChannel>& channel, bool& is_eof,
                              const UniqueId& load_id, const Request& request);
@@ -84,7 +79,8 @@ protected:
     std::unordered_map<UniqueId, std::shared_ptr<LoadChannel>> _load_channels;
     Cache* _last_success_channel = nullptr;
 
-    // check the total load mem consumption of this Backend
+    // check the total load channel mem consumption of this Backend
+    // TODO no used, refactor soon
     std::shared_ptr<MemTracker> _mem_tracker;
 
     CountDownLatch _stop_background_threads_latch;
diff --git a/be/src/runtime/tablets_channel.cpp b/be/src/runtime/tablets_channel.cpp
index 8010956cc4..566d90fb7f 100644
--- a/be/src/runtime/tablets_channel.cpp
+++ b/be/src/runtime/tablets_channel.cpp
@@ -53,7 +53,6 @@ TabletsChannel::~TabletsChannel() {
 }
 
 Status TabletsChannel::open(const PTabletWriterOpenRequest& request) {
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kOpened) {
         // Normal case, already open by other sender
@@ -253,7 +252,6 @@ Status TabletsChannel::_open_all_writers(const PTabletWriterOpenRequest& request
 }
 
 Status TabletsChannel::cancel() {
-    SCOPED_SWITCH_TASK_THREAD_LOCAL_MEM_TRACKER(_mem_tracker);
     std::lock_guard<std::mutex> l(_lock);
     if (_state == kFinished) {
         return _close_status;
diff --git a/be/src/runtime/tcmalloc_hook.h b/be/src/runtime/tcmalloc_hook.h
index 59208b334a..af60cdc178 100644
--- a/be/src/runtime/tcmalloc_hook.h
+++ b/be/src/runtime/tcmalloc_hook.h
@@ -21,6 +21,7 @@
 #include <gperftools/nallocx.h>
 #include <gperftools/tcmalloc.h>
 
+#include "runtime/mem_tracker.h"
 #include "runtime/thread_context.h"
 
 // Notice: modify the command in New/Delete Hook should be careful enough!,
@@ -38,12 +39,16 @@
 void new_hook(const void* ptr, size_t size) {
     if (doris::tls_ctx()) {
         doris::tls_ctx()->consume_mem(tc_nallocx(size, 0));
+    } else if (doris::ExecEnv::GetInstance()->initialized()) {
+        doris::MemTracker::get_process_tracker()->consume(tc_nallocx(size, 0));
     }
 }
 
 void delete_hook(const void* ptr) {
     if (doris::tls_ctx()) {
         doris::tls_ctx()->release_mem(tc_malloc_size(const_cast<void*>(ptr)));
+    } else if (doris::ExecEnv::GetInstance()->initialized()) {
+        doris::MemTracker::get_process_tracker()->release(tc_malloc_size(const_cast<void*>(ptr)));
     }
 }
 
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 7b58315b09..33478b19c5 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -157,12 +157,16 @@ public:
     void consume_mem(int64_t size) {
         if (start_thread_mem_tracker) {
             _thread_mem_tracker_mgr->cache_consume(size);
+        } else {
+            MemTracker::get_process_tracker()->consume(size);
         }
     }
 
     void release_mem(int64_t size) {
         if (start_thread_mem_tracker) {
             _thread_mem_tracker_mgr->cache_consume(-size);
+        } else {
+            MemTracker::get_process_tracker()->release(size);
         }
     }
 
diff --git a/be/src/runtime/thread_mem_tracker_mgr.h b/be/src/runtime/thread_mem_tracker_mgr.h
index d582c3a46a..d8042ac0fa 100644
--- a/be/src/runtime/thread_mem_tracker_mgr.h
+++ b/be/src/runtime/thread_mem_tracker_mgr.h
@@ -177,6 +177,7 @@ private:
     phmap::flat_hash_map<int64_t, std::string> _mem_tracker_labels;
     // If true, call memtracker try_consume, otherwise call consume.
     bool _check_limit;
+    bool _stop_consume = false;
 
     int64_t _tracker_id;
     // Avoid memory allocation in functions.
@@ -256,25 +257,27 @@ inline void ThreadMemTrackerMgr::cache_consume(int64_t size) {
     // When some threads `0 < _untracked_mem < config::mem_tracker_consume_min_size_bytes`
     // and some threads `_untracked_mem <= -config::mem_tracker_consume_min_size_bytes` trigger consumption(),
     // it will cause tracker->consumption to be temporarily less than 0.
-    if (_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
-        _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) {
+    //
+    // Temporary memory may be allocated during the consumption of the mem tracker (in the processing logic of
+    // the exceeded limit), which will lead to entering the TCMalloc Hook again, so suspend consumption to avoid
+    // falling into an infinite loop.
+    if ((_untracked_mem >= config::mem_tracker_consume_min_size_bytes ||
+         _untracked_mem <= -config::mem_tracker_consume_min_size_bytes) &&
+        !_stop_consume) {
+        _stop_consume = true;
         DCHECK(_untracked_mems.find(_tracker_id) != _untracked_mems.end()) << print_debug_string();
         // When switching to the current tracker last time, the remaining untracked memory.
         if (_untracked_mems[_tracker_id] != 0) {
             _untracked_mem += _untracked_mems[_tracker_id];
             _untracked_mems[_tracker_id] = 0;
         }
-        // Allocating memory in the Hook command causes the TCMalloc Hook to be entered again,
-        // will enter infinite recursion. So the temporary memory allocated in mem_tracker.try_consume
-        // and mem_limit_exceeded will directly call consume.
         if (_check_limit) {
-            _check_limit = false;
             noncache_try_consume(_untracked_mem);
-            _check_limit = true;
         } else {
             mem_tracker()->consume(_untracked_mem);
         }
         _untracked_mem = 0;
+        _stop_consume = false;
     }
 }
 
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index e85bdfebfc..47e156a0bf 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -206,6 +206,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
                                                    const PTabletWriterAddBlockRequest* request,
                                                    PTabletWriterAddBlockResult* response,
                                                    google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     // TODO(zxy) delete in 1.2 version
     brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
     attachment_transfer_request_block<PTabletWriterAddBlockRequest>(request, cntl);
@@ -216,6 +217,7 @@ void PInternalServiceImpl::tablet_writer_add_block(google::protobuf::RpcControll
 void PInternalServiceImpl::tablet_writer_add_block_by_http(
         google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
         PTabletWriterAddBlockResult* response, google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     PTabletWriterAddBlockRequest* request_raw = new PTabletWriterAddBlockRequest();
     google::protobuf::Closure* done_raw =
             new NewHttpClosure<PTabletWriterAddBlockRequest>(request_raw, done);
@@ -243,8 +245,6 @@ void PInternalServiceImpl::_tablet_writer_add_block(google::protobuf::RpcControl
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
-            SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
-                                      _exec_env->load_channel_mgr()->mem_tracker());
 
             auto st = _exec_env->load_channel_mgr()->add_batch(*request, response);
             if (!st.ok()) {
@@ -264,12 +264,14 @@ void PInternalServiceImpl::tablet_writer_add_batch(google::protobuf::RpcControll
                                                    const PTabletWriterAddBatchRequest* request,
                                                    PTabletWriterAddBatchResult* response,
                                                    google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     _tablet_writer_add_batch(cntl_base, request, response, done);
 }
 
 void PInternalServiceImpl::tablet_writer_add_batch_by_http(
         google::protobuf::RpcController* cntl_base, const ::doris::PEmptyRequest* request,
         PTabletWriterAddBatchResult* response, google::protobuf::Closure* done) {
+    SCOPED_SWITCH_BTHREAD();
     PTabletWriterAddBatchRequest* request_raw = new PTabletWriterAddBatchRequest();
     google::protobuf::Closure* done_raw =
             new NewHttpClosure<PTabletWriterAddBatchRequest>(request_raw, done);
@@ -300,8 +302,6 @@ void PInternalServiceImpl::_tablet_writer_add_batch(google::protobuf::RpcControl
         int64_t execution_time_ns = 0;
         {
             SCOPED_RAW_TIMER(&execution_time_ns);
-            SCOPED_ATTACH_TASK_THREAD(ThreadContext::TaskType::LOAD,
-                                      _exec_env->load_channel_mgr()->mem_tracker());
             // TODO(zxy) delete in 1.2 version
             brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
             attachment_transfer_request_row_batch<PTabletWriterAddBatchRequest>(request, cntl);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org