You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/27 01:03:17 UTC

[doris] branch branch-1.1-lts updated: [branch-1.1-lts](cherry-pick) Fix brpc causing query mem tracker to be inaccurate and deadlock (#13697)

This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
     new 7b74101b5f [branch-1.1-lts](cherry-pick) Fix brpc causing query mem tracker to be inaccurate and deadlock (#13697)
7b74101b5f is described below

commit 7b74101b5f9d79cae5d9ebf4139141110c43e4f3
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Thu Oct 27 09:03:11 2022 +0800

    [branch-1.1-lts](cherry-pick) Fix brpc causing query mem tracker to be inaccurate and deadlock (#13697)
    
    cherry-pick #13401, #13528, and fix bthead mem tracker dead lock
---
 be/src/runtime/memory/mem_tracker.cpp           |  8 +++-----
 be/src/runtime/memory/mem_tracker.h             |  6 ++++++
 be/src/runtime/memory/mem_tracker_limiter.h     |  8 +-------
 be/src/runtime/memory/mem_tracker_task_pool.cpp |  7 ++++---
 be/src/runtime/memory/mem_tracker_task_pool.h   |  2 ++
 be/src/runtime/thread_context.h                 | 11 -----------
 be/src/service/doris_main.cpp                   |  3 +--
 be/src/service/internal_service.cpp             | 22 ++++++++++------------
 be/src/util/mem_info.cpp                        |  3 ++-
 be/src/util/mem_info.h                          |  5 +++++
 be/src/vec/runtime/vdata_stream_recvr.cpp       |  5 ++++-
 11 files changed, 38 insertions(+), 42 deletions(-)

diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index 31d40563d4..02145922b4 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -23,7 +23,6 @@
 #include <fmt/format.h>
 
 #include "runtime/thread_context.h"
-#include "util/pretty_printer.h"
 #include "util/string_util.h"
 #include "util/time.h"
 
@@ -104,10 +103,9 @@ void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>* sn
 }
 
 std::string NewMemTracker::log_usage(NewMemTracker::Snapshot snapshot) {
-    return fmt::format(
-            "MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label,
-            snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
-            snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES),
+    return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)",
+            snapshot.label, snapshot.parent, print_bytes(snapshot.cur_consumption),
+            snapshot.cur_consumption, print_bytes(snapshot.peak_consumption),
             snapshot.peak_consumption);
 }
 
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 258f244aae..aec490c808 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -19,6 +19,7 @@
 // and modified by Doris
 #pragma once
 
+#include "util/pretty_printer.h"
 #include "util/runtime_profile.h"
 
 namespace doris {
@@ -50,6 +51,11 @@ public:
 
     ~NewMemTracker();
 
+    static std::string print_bytes(int64_t bytes) {
+        return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
+                          : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES);
+    }
+
 public:
     const std::string& label() const { return _label; }
     // Returns the memory consumed in bytes.
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index e6126c15a6..73aaa8e500 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -76,9 +76,7 @@ public:
         // but it may not actually alloc physical memory, which is not expected in mem hook fail.
         //
         // TODO: In order to ensure no OOM, currently reserve 200M, and then use the free mem in /proc/meminfo to ensure no OOM.
-        if (PerfCounters::get_vm_rss() - static_cast<int64_t>(MemInfo::allocator_cache_mem()) +
-                            bytes >=
-                    MemInfo::mem_limit() ||
+        if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() ||
             PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) {
             if (config::enable_proc_meminfo_cancel_query) {
                 return true;
@@ -172,10 +170,6 @@ public:
         return msg.str();
     }
 
-    static std::string print_bytes(int64_t bytes) {
-        return PrettyPrinter::print(bytes, TUnit::BYTES);
-    }
-
 private:
     // The following func, for automatic memory tracking and limiting based on system memory allocation.
     friend class ThreadMemTrackerMgr;
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index c4f9ba2dd7..2b294f0223 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -20,6 +20,7 @@
 #include "common/config.h"
 #include "runtime/exec_env.h"
 #include "util/pretty_printer.h"
+#include "runtime/memory/mem_tracker.h"
 
 namespace doris {
 
@@ -100,9 +101,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
             LOG(INFO) << fmt::format(
                     "Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
                     "PeakUsed={}",
-                    it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES),
-                    PrettyPrinter::print(it->second->consumption(), TUnit::BYTES),
-                    PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES));
+                    it->first, NewMemTracker::print_bytes(it->second->limit()),
+                    NewMemTracker::print_bytes(it->second->consumption()),
+                    NewMemTracker::print_bytes(it->second->peak_consumption()));
             expired_task_ids.emplace_back(it->first);
         }
     }
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h
index f8c5039eab..4c21c4c40e 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.h
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -23,6 +23,8 @@
 
 namespace doris {
 
+// TODO: phmap `parallel_flat_hash_map` is not thread-safe. If it is not fixed in the future,
+//       can consider using other maps instead.
 using TaskTrackersMap = phmap::parallel_flat_hash_map<
         std::string, std::shared_ptr<MemTrackerLimiter>,
         phmap::priv::hash_default_hash<std::string>, phmap::priv::hash_default_eq<std::string>,
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index b6143264a8..164dee1d3e 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -188,19 +188,8 @@ static void attach_bthread() {
 #endif
         // Create thread-local data on demand.
         bthread_context = new ThreadContext;
-        std::shared_ptr<MemTrackerLimiter> btls_tracker =
-                std::make_shared<MemTrackerLimiter>(-1, "Bthread:id=" + std::to_string(bthread_id),
-                                                    ExecEnv::GetInstance()->bthread_mem_tracker());
-        bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker);
         // set the data so that next time bthread_getspecific in the thread returns the data.
         CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
-    } else {
-        // two scenarios:
-        // 1. A new bthread starts, but get a reuses btls.
-        // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
-        // So tracker call reset 0 like reuses btls.
-        DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2);
-        bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero();
     }
 }
 
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a752088c85..bd5fe3aa93 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -456,11 +456,10 @@ int main(int argc, char** argv) {
 #if defined(LEAK_SANITIZER)
         __lsan_do_leak_check();
 #endif
-
+        doris::PerfCounters::refresh_proc_status();
 #if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
         doris::MemInfo::refresh_allocator_mem();
 #endif
-        doris::PerfCounters::refresh_proc_status();
         int64_t allocator_cache_mem_diff =
                 doris::MemInfo::allocator_cache_mem() -
                 doris::ExecEnv::GetInstance()->allocator_cache_mem_tracker()->consumption();
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index b7bc9a179b..38f12b5180 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -120,16 +120,14 @@ void PInternalServiceImpl<T>::_transmit_data(google::protobuf::RpcController* cn
                                           const Status& extract_st) {
     std::string query_id;
     TUniqueId finst_id;
-    std::shared_ptr<MemTrackerLimiter> transmit_tracker;
+    std::shared_ptr<MemTrackerLimiter> transmit_tracker = nullptr;
     if (request->has_query_id()) {
         query_id = print_id(request->query_id());
         finst_id.__set_hi(request->finst_id().hi());
         finst_id.__set_lo(request->finst_id().lo());
-        // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
-        transmit_tracker = std::make_shared<MemTrackerLimiter>(
-                -1, fmt::format("QueryTransmit#queryId={}", query_id),
-                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
-    } else {
+        transmit_tracker = _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
+    }
+    if (!transmit_tracker) {
         query_id = "unkown_transmit_data";
         transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data");
     }
@@ -595,16 +593,16 @@ void PInternalServiceImpl<T>::_transmit_block(google::protobuf::RpcController* c
                                            const Status& extract_st) {
     std::string query_id;
     TUniqueId finst_id;
-    std::shared_ptr<MemTrackerLimiter> transmit_tracker;
+    std::shared_ptr<MemTrackerLimiter> transmit_tracker = nullptr;
     if (request->has_query_id()) {
         query_id = print_id(request->query_id());
         finst_id.__set_hi(request->finst_id().hi());
         finst_id.__set_lo(request->finst_id().lo());
-        // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
-        transmit_tracker = std::make_shared<MemTrackerLimiter>(
-                -1, fmt::format("QueryTransmit#queryId={}", query_id),
-                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
-    } else {
+        // phmap `parallel_flat_hash_map` is not thread safe, so get query mem tracker may be null pointer.
+        transmit_tracker =
+                _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
+    }
+    if (!transmit_tracker) {
         query_id = "unkown_transmit_block";
         transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_block");
     }
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 674c053136..438fab038c 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -48,6 +48,7 @@ size_t MemInfo::_s_tcmalloc_thread_bytes = 0;
 size_t MemInfo::_s_allocator_cache_mem = 0;
 std::string MemInfo::_s_allocator_cache_mem_str = "";
 size_t MemInfo::_s_virtual_memory_used = 0;
+int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
 
 void MemInfo::init() {
     // Read from /proc/meminfo
@@ -96,7 +97,7 @@ void MemInfo::init() {
     bool is_percent = true;
     _s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent);
     _s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
-    _s_hard_mem_limit = _s_physical_mem - std::min(209715200L, _s_physical_mem / 10); // 200M
+    _s_hard_mem_limit = _s_physical_mem - std::max(209715200L, _s_physical_mem / 10); // 200M
 
     LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", /proc/meminfo/MemTotal: " << line;
     _s_initialized = true;
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 637448c7c0..e73d07ce1f 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -23,6 +23,7 @@
 #include <string>
 
 #include "common/logging.h"
+#include "util/perf_counters.h"
 #include "util/pretty_printer.h"
 
 namespace doris {
@@ -45,6 +46,7 @@ public:
     static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; }
     static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; }
     static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; }
+    static inline int64_t proc_mem_no_allocator_cache() { return _s_proc_mem_no_allocator_cache; }
 
     // Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory
     // obtained by the process malloc, not the physical memory actually used by the process in the OS.
@@ -65,6 +67,8 @@ public:
                                  _s_tcmalloc_transfer_bytes + _s_tcmalloc_thread_bytes;
         _s_allocator_cache_mem_str = PrettyPrinter::print(_s_allocator_cache_mem, TUnit::BYTES);
         _s_virtual_memory_used = _s_allocator_physical_mem + _s_pageheap_unmapped_bytes;
+        _s_proc_mem_no_allocator_cache =
+                PerfCounters::get_vm_rss() - static_cast<int64_t>(_s_allocator_cache_mem);
     }
 
     static inline int64_t mem_limit() {
@@ -100,6 +104,7 @@ private:
     static size_t _s_allocator_cache_mem;
     static std::string _s_allocator_cache_mem_str;
     static size_t _s_virtual_memory_used;
+    static int64_t _s_proc_mem_no_allocator_cache;
 };
 
 } // namespace doris
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 14630c2533..eaa99ffd14 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -53,7 +53,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
     }
 
     // _cur_batch must be replaced with the returned batch.
-    _current_block.reset();
+    {
+        SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+        _current_block.reset();
+    }
     *next_block = nullptr;
     if (_is_cancelled) {
         return Status::Cancelled("Cancelled");


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org