You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by mo...@apache.org on 2022/10/27 01:03:17 UTC
[doris] branch branch-1.1-lts updated: [branch-1.1-lts](cherry-pick) Fix brpc causing query mem tracker to be inaccurate and deadlock (#13697)
This is an automated email from the ASF dual-hosted git repository.
morningman pushed a commit to branch branch-1.1-lts
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-1.1-lts by this push:
new 7b74101b5f [branch-1.1-lts](cherry-pick) Fix brpc causing query mem tracker to be inaccurate and deadlock (#13697)
7b74101b5f is described below
commit 7b74101b5f9d79cae5d9ebf4139141110c43e4f3
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Thu Oct 27 09:03:11 2022 +0800
[branch-1.1-lts](cherry-pick) Fix brpc causing query mem tracker to be inaccurate and deadlock (#13697)
cherry-pick #13401, #13528, and fix bthead mem tracker dead lock
---
be/src/runtime/memory/mem_tracker.cpp | 8 +++-----
be/src/runtime/memory/mem_tracker.h | 6 ++++++
be/src/runtime/memory/mem_tracker_limiter.h | 8 +-------
be/src/runtime/memory/mem_tracker_task_pool.cpp | 7 ++++---
be/src/runtime/memory/mem_tracker_task_pool.h | 2 ++
be/src/runtime/thread_context.h | 11 -----------
be/src/service/doris_main.cpp | 3 +--
be/src/service/internal_service.cpp | 22 ++++++++++------------
be/src/util/mem_info.cpp | 3 ++-
be/src/util/mem_info.h | 5 +++++
be/src/vec/runtime/vdata_stream_recvr.cpp | 5 ++++-
11 files changed, 38 insertions(+), 42 deletions(-)
diff --git a/be/src/runtime/memory/mem_tracker.cpp b/be/src/runtime/memory/mem_tracker.cpp
index 31d40563d4..02145922b4 100644
--- a/be/src/runtime/memory/mem_tracker.cpp
+++ b/be/src/runtime/memory/mem_tracker.cpp
@@ -23,7 +23,6 @@
#include <fmt/format.h>
#include "runtime/thread_context.h"
-#include "util/pretty_printer.h"
#include "util/string_util.h"
#include "util/time.h"
@@ -104,10 +103,9 @@ void NewMemTracker::make_group_snapshot(std::vector<NewMemTracker::Snapshot>* sn
}
std::string NewMemTracker::log_usage(NewMemTracker::Snapshot snapshot) {
- return fmt::format(
- "MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)", snapshot.label,
- snapshot.parent, PrettyPrinter::print(snapshot.cur_consumption, TUnit::BYTES),
- snapshot.cur_consumption, PrettyPrinter::print(snapshot.peak_consumption, TUnit::BYTES),
+ return fmt::format("MemTracker Label={}, Parent Label={}, Used={}({} B), Peak={}({} B)",
+ snapshot.label, snapshot.parent, print_bytes(snapshot.cur_consumption),
+ snapshot.cur_consumption, print_bytes(snapshot.peak_consumption),
snapshot.peak_consumption);
}
diff --git a/be/src/runtime/memory/mem_tracker.h b/be/src/runtime/memory/mem_tracker.h
index 258f244aae..aec490c808 100644
--- a/be/src/runtime/memory/mem_tracker.h
+++ b/be/src/runtime/memory/mem_tracker.h
@@ -19,6 +19,7 @@
// and modified by Doris
#pragma once
+#include "util/pretty_printer.h"
#include "util/runtime_profile.h"
namespace doris {
@@ -50,6 +51,11 @@ public:
~NewMemTracker();
+ static std::string print_bytes(int64_t bytes) {
+ return bytes >= 0 ? PrettyPrinter::print(bytes, TUnit::BYTES)
+ : "-" + PrettyPrinter::print(std::abs(bytes), TUnit::BYTES);
+ }
+
public:
const std::string& label() const { return _label; }
// Returns the memory consumed in bytes.
diff --git a/be/src/runtime/memory/mem_tracker_limiter.h b/be/src/runtime/memory/mem_tracker_limiter.h
index e6126c15a6..73aaa8e500 100644
--- a/be/src/runtime/memory/mem_tracker_limiter.h
+++ b/be/src/runtime/memory/mem_tracker_limiter.h
@@ -76,9 +76,7 @@ public:
// but it may not actually alloc physical memory, which is not expected in mem hook fail.
//
// TODO: In order to ensure no OOM, currently reserve 200M, and then use the free mem in /proc/meminfo to ensure no OOM.
- if (PerfCounters::get_vm_rss() - static_cast<int64_t>(MemInfo::allocator_cache_mem()) +
- bytes >=
- MemInfo::mem_limit() ||
+ if (MemInfo::proc_mem_no_allocator_cache() + bytes >= MemInfo::mem_limit() ||
PerfCounters::get_vm_rss() + bytes >= MemInfo::hard_mem_limit()) {
if (config::enable_proc_meminfo_cancel_query) {
return true;
@@ -172,10 +170,6 @@ public:
return msg.str();
}
- static std::string print_bytes(int64_t bytes) {
- return PrettyPrinter::print(bytes, TUnit::BYTES);
- }
-
private:
// The following func, for automatic memory tracking and limiting based on system memory allocation.
friend class ThreadMemTrackerMgr;
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.cpp b/be/src/runtime/memory/mem_tracker_task_pool.cpp
index c4f9ba2dd7..2b294f0223 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.cpp
+++ b/be/src/runtime/memory/mem_tracker_task_pool.cpp
@@ -20,6 +20,7 @@
#include "common/config.h"
#include "runtime/exec_env.h"
#include "util/pretty_printer.h"
+#include "runtime/memory/mem_tracker.h"
namespace doris {
@@ -100,9 +101,9 @@ void MemTrackerTaskPool::logout_task_mem_tracker() {
LOG(INFO) << fmt::format(
"Deregister query/load memory tracker, queryId={}, Limit={}, CurrUsed={}, "
"PeakUsed={}",
- it->first, PrettyPrinter::print(it->second->limit(), TUnit::BYTES),
- PrettyPrinter::print(it->second->consumption(), TUnit::BYTES),
- PrettyPrinter::print(it->second->peak_consumption(), TUnit::BYTES));
+ it->first, NewMemTracker::print_bytes(it->second->limit()),
+ NewMemTracker::print_bytes(it->second->consumption()),
+ NewMemTracker::print_bytes(it->second->peak_consumption()));
expired_task_ids.emplace_back(it->first);
}
}
diff --git a/be/src/runtime/memory/mem_tracker_task_pool.h b/be/src/runtime/memory/mem_tracker_task_pool.h
index f8c5039eab..4c21c4c40e 100644
--- a/be/src/runtime/memory/mem_tracker_task_pool.h
+++ b/be/src/runtime/memory/mem_tracker_task_pool.h
@@ -23,6 +23,8 @@
namespace doris {
+// TODO: phmap `parallel_flat_hash_map` is not thread-safe. If it is not fixed in the future,
+// can consider using other maps instead.
using TaskTrackersMap = phmap::parallel_flat_hash_map<
std::string, std::shared_ptr<MemTrackerLimiter>,
phmap::priv::hash_default_hash<std::string>, phmap::priv::hash_default_eq<std::string>,
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index b6143264a8..164dee1d3e 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -188,19 +188,8 @@ static void attach_bthread() {
#endif
// Create thread-local data on demand.
bthread_context = new ThreadContext;
- std::shared_ptr<MemTrackerLimiter> btls_tracker =
- std::make_shared<MemTrackerLimiter>(-1, "Bthread:id=" + std::to_string(bthread_id),
- ExecEnv::GetInstance()->bthread_mem_tracker());
- bthread_context->attach_task(ThreadContext::TaskType::BRPC, "", TUniqueId(), btls_tracker);
// set the data so that next time bthread_getspecific in the thread returns the data.
CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
- } else {
- // two scenarios:
- // 1. A new bthread starts, but get a reuses btls.
- // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
- // So tracker call reset 0 like reuses btls.
- DCHECK(bthread_context->_thread_mem_tracker_mgr->get_attach_layers() == 2);
- bthread_context->_thread_mem_tracker_mgr->limiter_mem_tracker_raw()->reset_zero();
}
}
diff --git a/be/src/service/doris_main.cpp b/be/src/service/doris_main.cpp
index a752088c85..bd5fe3aa93 100644
--- a/be/src/service/doris_main.cpp
+++ b/be/src/service/doris_main.cpp
@@ -456,11 +456,10 @@ int main(int argc, char** argv) {
#if defined(LEAK_SANITIZER)
__lsan_do_leak_check();
#endif
-
+ doris::PerfCounters::refresh_proc_status();
#if !defined(ADDRESS_SANITIZER) && !defined(LEAK_SANITIZER) && !defined(THREAD_SANITIZER)
doris::MemInfo::refresh_allocator_mem();
#endif
- doris::PerfCounters::refresh_proc_status();
int64_t allocator_cache_mem_diff =
doris::MemInfo::allocator_cache_mem() -
doris::ExecEnv::GetInstance()->allocator_cache_mem_tracker()->consumption();
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index b7bc9a179b..38f12b5180 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -120,16 +120,14 @@ void PInternalServiceImpl<T>::_transmit_data(google::protobuf::RpcController* cn
const Status& extract_st) {
std::string query_id;
TUniqueId finst_id;
- std::shared_ptr<MemTrackerLimiter> transmit_tracker;
+ std::shared_ptr<MemTrackerLimiter> transmit_tracker = nullptr;
if (request->has_query_id()) {
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
- // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
- transmit_tracker = std::make_shared<MemTrackerLimiter>(
- -1, fmt::format("QueryTransmit#queryId={}", query_id),
- _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
- } else {
+ transmit_tracker = _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
+ }
+ if (!transmit_tracker) {
query_id = "unkown_transmit_data";
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_data");
}
@@ -595,16 +593,16 @@ void PInternalServiceImpl<T>::_transmit_block(google::protobuf::RpcController* c
const Status& extract_st) {
std::string query_id;
TUniqueId finst_id;
- std::shared_ptr<MemTrackerLimiter> transmit_tracker;
+ std::shared_ptr<MemTrackerLimiter> transmit_tracker = nullptr;
if (request->has_query_id()) {
query_id = print_id(request->query_id());
finst_id.__set_hi(request->finst_id().hi());
finst_id.__set_lo(request->finst_id().lo());
- // In some cases, query mem tracker does not exist in BE when transmit block, will get null pointer.
- transmit_tracker = std::make_shared<MemTrackerLimiter>(
- -1, fmt::format("QueryTransmit#queryId={}", query_id),
- _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id));
- } else {
+ // phmap `parallel_flat_hash_map` is not thread safe, so get query mem tracker may be null pointer.
+ transmit_tracker =
+ _exec_env->task_pool_mem_tracker_registry()->get_task_mem_tracker(query_id);
+ }
+ if (!transmit_tracker) {
query_id = "unkown_transmit_block";
transmit_tracker = std::make_shared<MemTrackerLimiter>(-1, "unkown_transmit_block");
}
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 674c053136..438fab038c 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -48,6 +48,7 @@ size_t MemInfo::_s_tcmalloc_thread_bytes = 0;
size_t MemInfo::_s_allocator_cache_mem = 0;
std::string MemInfo::_s_allocator_cache_mem_str = "";
size_t MemInfo::_s_virtual_memory_used = 0;
+int64_t MemInfo::_s_proc_mem_no_allocator_cache = -1;
void MemInfo::init() {
// Read from /proc/meminfo
@@ -96,7 +97,7 @@ void MemInfo::init() {
bool is_percent = true;
_s_mem_limit = ParseUtil::parse_mem_spec(config::mem_limit, -1, _s_physical_mem, &is_percent);
_s_mem_limit_str = PrettyPrinter::print(_s_mem_limit, TUnit::BYTES);
- _s_hard_mem_limit = _s_physical_mem - std::min(209715200L, _s_physical_mem / 10); // 200M
+ _s_hard_mem_limit = _s_physical_mem - std::max(209715200L, _s_physical_mem / 10); // 200M
LOG(INFO) << "Physical Memory: " << PrettyPrinter::print(_s_physical_mem, TUnit::BYTES) << ", /proc/meminfo/MemTotal: " << line;
_s_initialized = true;
diff --git a/be/src/util/mem_info.h b/be/src/util/mem_info.h
index 637448c7c0..e73d07ce1f 100644
--- a/be/src/util/mem_info.h
+++ b/be/src/util/mem_info.h
@@ -23,6 +23,7 @@
#include <string>
#include "common/logging.h"
+#include "util/perf_counters.h"
#include "util/pretty_printer.h"
namespace doris {
@@ -45,6 +46,7 @@ public:
static inline size_t allocator_virtual_mem() { return _s_virtual_memory_used; }
static inline size_t allocator_cache_mem() { return _s_allocator_cache_mem; }
static inline std::string allocator_cache_mem_str() { return _s_allocator_cache_mem_str; }
+ static inline int64_t proc_mem_no_allocator_cache() { return _s_proc_mem_no_allocator_cache; }
// Tcmalloc property `generic.total_physical_bytes` records the total length of the virtual memory
// obtained by the process malloc, not the physical memory actually used by the process in the OS.
@@ -65,6 +67,8 @@ public:
_s_tcmalloc_transfer_bytes + _s_tcmalloc_thread_bytes;
_s_allocator_cache_mem_str = PrettyPrinter::print(_s_allocator_cache_mem, TUnit::BYTES);
_s_virtual_memory_used = _s_allocator_physical_mem + _s_pageheap_unmapped_bytes;
+ _s_proc_mem_no_allocator_cache =
+ PerfCounters::get_vm_rss() - static_cast<int64_t>(_s_allocator_cache_mem);
}
static inline int64_t mem_limit() {
@@ -100,6 +104,7 @@ private:
static size_t _s_allocator_cache_mem;
static std::string _s_allocator_cache_mem_str;
static size_t _s_virtual_memory_used;
+ static int64_t _s_proc_mem_no_allocator_cache;
};
} // namespace doris
diff --git a/be/src/vec/runtime/vdata_stream_recvr.cpp b/be/src/vec/runtime/vdata_stream_recvr.cpp
index 14630c2533..eaa99ffd14 100644
--- a/be/src/vec/runtime/vdata_stream_recvr.cpp
+++ b/be/src/vec/runtime/vdata_stream_recvr.cpp
@@ -53,7 +53,10 @@ Status VDataStreamRecvr::SenderQueue::get_batch(Block** next_block) {
}
// _cur_batch must be replaced with the returned batch.
- _current_block.reset();
+ {
+ SCOPED_ATTACH_TASK(ExecEnv::GetInstance()->orphan_mem_tracker());
+ _current_block.reset();
+ }
*next_block = nullptr;
if (_is_cancelled) {
return Status::Cancelled("Cancelled");
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org