You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by kx...@apache.org on 2023/06/09 04:10:52 UTC

[doris] 16/29: [fix](memory) Fix crash at `bthread_setspecific` in `brpc::Socket::CheckHealth()` (#20450)

This is an automated email from the ASF dual-hosted git repository.

kxiao pushed a commit to branch branch-2.0-beta
in repository https://gitbox.apache.org/repos/asf/doris.git

commit 82cb2e06f6aaedfcdbc5cd6a51d4b6e9f1f55ce4
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Thu Jun 8 19:48:19 2023 +0800

    [fix](memory) Fix crash at `bthread_setspecific` in `brpc::Socket::CheckHealth()` (#20450)
    
    Only switch to bthread local when modifying the mem tracker in the thread context. No longer switches to bthread local by default when bthread starts
    mem tracker increases brpc IOBufBlockMemory memory
    remove thread mem tracker metrics
---
 be/src/common/daemon.cpp            |  4 ++
 be/src/runtime/exec_env.h           |  2 +
 be/src/runtime/exec_env_init.cpp    |  2 +
 be/src/runtime/load_channel_mgr.cpp |  3 +-
 be/src/runtime/thread_context.cpp   | 23 +++++-----
 be/src/runtime/thread_context.h     | 89 +++++++++++++++++++++++++------------
 be/src/service/internal_service.cpp |  1 -
 be/src/util/doris_metrics.cpp       | 10 -----
 be/src/util/doris_metrics.h         |  6 ---
 be/src/util/mem_info.cpp            |  1 -
 10 files changed, 82 insertions(+), 59 deletions(-)

diff --git a/be/src/common/daemon.cpp b/be/src/common/daemon.cpp
index af5628f4b9..29eabf0470 100644
--- a/be/src/common/daemon.cpp
+++ b/be/src/common/daemon.cpp
@@ -22,6 +22,7 @@
 #include <gflags/gflags.h>
 #include <gperftools/malloc_extension.h> // IWYU pragma: keep
 // IWYU pragma: no_include <bits/std_abs.h>
+#include <butil/iobuf.h>
 #include <math.h>
 #include <signal.h>
 #include <stdint.h>
@@ -210,6 +211,9 @@ void Daemon::memory_maintenance_thread() {
                 DorisMetrics::instance()->system_metrics()->update_allocator_metrics();
             }
 #endif
+
+            ExecEnv::GetInstance()->brpc_iobuf_block_memory_tracker()->set_consumption(
+                    butil::IOBuf::block_memory());
             LOG(INFO) << MemTrackerLimiter::
                             process_mem_log_str(); // print mem log when memory state by 256M
         }
diff --git a/be/src/runtime/exec_env.h b/be/src/runtime/exec_env.h
index aab93e1d99..0bd463ffb3 100644
--- a/be/src/runtime/exec_env.h
+++ b/be/src/runtime/exec_env.h
@@ -118,6 +118,7 @@ public:
     MemTrackerLimiter* orphan_mem_tracker_raw() { return _orphan_mem_tracker_raw; }
     MemTrackerLimiter* experimental_mem_tracker() { return _experimental_mem_tracker.get(); }
     MemTracker* page_no_cache_mem_tracker() { return _page_no_cache_mem_tracker.get(); }
+    MemTracker* brpc_iobuf_block_memory_tracker() { return _brpc_iobuf_block_memory_tracker.get(); }
 
     ThreadPool* send_batch_thread_pool() { return _send_batch_thread_pool.get(); }
     ThreadPool* download_cache_thread_pool() { return _download_cache_thread_pool.get(); }
@@ -211,6 +212,7 @@ private:
     std::shared_ptr<MemTrackerLimiter> _experimental_mem_tracker;
     // page size not in cache, data page/index page/etc.
     std::shared_ptr<MemTracker> _page_no_cache_mem_tracker;
+    std::shared_ptr<MemTracker> _brpc_iobuf_block_memory_tracker;
 
     std::unique_ptr<ThreadPool> _send_batch_thread_pool;
 
diff --git a/be/src/runtime/exec_env_init.cpp b/be/src/runtime/exec_env_init.cpp
index 944dad9cf5..fb14db11f3 100644
--- a/be/src/runtime/exec_env_init.cpp
+++ b/be/src/runtime/exec_env_init.cpp
@@ -338,6 +338,8 @@ void ExecEnv::init_mem_tracker() {
             MemTrackerLimiter::Type::EXPERIMENTAL, "ExperimentalSet");
     _page_no_cache_mem_tracker =
             std::make_shared<MemTracker>("PageNoCache", _orphan_mem_tracker_raw);
+    _brpc_iobuf_block_memory_tracker =
+            std::make_shared<MemTracker>("IOBufBlockMemory", _orphan_mem_tracker_raw);
 }
 
 void ExecEnv::init_download_cache_buf() {
diff --git a/be/src/runtime/load_channel_mgr.cpp b/be/src/runtime/load_channel_mgr.cpp
index 3fe4c7986d..f99d7413ad 100644
--- a/be/src/runtime/load_channel_mgr.cpp
+++ b/be/src/runtime/load_channel_mgr.cpp
@@ -433,8 +433,7 @@ void LoadChannelMgr::_handle_mem_exceed_limit() {
                 << PrettyPrinter::print_bytes(process_soft_mem_limit)
                 << ", total load mem consumption: "
                 << PrettyPrinter::print_bytes(_mem_tracker->consumption())
-                << ", vm_rss: " << PerfCounters::get_vm_rss_str()
-                << ", tc/jemalloc allocator cache: " << MemInfo::allocator_cache_mem_str();
+                << ", vm_rss: " << PerfCounters::get_vm_rss_str();
         }
         LOG(INFO) << oss.str();
     }
diff --git a/be/src/runtime/thread_context.cpp b/be/src/runtime/thread_context.cpp
index 15efa37b88..c2dd13770c 100644
--- a/be/src/runtime/thread_context.cpp
+++ b/be/src/runtime/thread_context.cpp
@@ -19,7 +19,6 @@
 
 #include "common/signal_handler.h"
 #include "runtime/runtime_state.h"
-#include "util/doris_metrics.h" // IWYU pragma: keep
 
 namespace doris {
 class MemTracker;
@@ -33,10 +32,12 @@ ThreadContextPtr::ThreadContextPtr() {
 
 AttachTask::AttachTask(const std::shared_ptr<MemTrackerLimiter>& mem_tracker,
                        const TUniqueId& task_id, const TUniqueId& fragment_instance_id) {
+    SwitchBthreadLocal::switch_to_bthread_local();
     thread_context()->attach_task(task_id, fragment_instance_id, mem_tracker);
 }
 
 AttachTask::AttachTask(RuntimeState* runtime_state) {
+    SwitchBthreadLocal::switch_to_bthread_local();
     doris::signal::query_id_hi = runtime_state->query_id().hi;
     doris::signal::query_id_lo = runtime_state->query_id().lo;
     thread_context()->attach_task(runtime_state->query_id(), runtime_state->fragment_instance_id(),
@@ -45,29 +46,31 @@ AttachTask::AttachTask(RuntimeState* runtime_state) {
 
 AttachTask::~AttachTask() {
     thread_context()->detach_task();
-#ifndef NDEBUG
-    DorisMetrics::instance()->attach_task_thread_count->increment(1);
-#endif // NDEBUG
+    SwitchBthreadLocal::switch_back_pthread_local();
 }
 
 AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(MemTracker* mem_tracker) {
-    if (mem_tracker)
+    SwitchBthreadLocal::switch_to_bthread_local();
+    if (mem_tracker) {
         _need_pop = thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(mem_tracker);
+    }
 }
 
 AddThreadMemTrackerConsumer::AddThreadMemTrackerConsumer(
         const std::shared_ptr<MemTracker>& mem_tracker)
         : _mem_tracker(mem_tracker) {
-    if (_mem_tracker)
+    SwitchBthreadLocal::switch_to_bthread_local();
+    if (_mem_tracker) {
         _need_pop =
                 thread_context()->thread_mem_tracker_mgr->push_consumer_tracker(_mem_tracker.get());
+    }
 }
 
 AddThreadMemTrackerConsumer::~AddThreadMemTrackerConsumer() {
-#ifndef NDEBUG
-    DorisMetrics::instance()->add_thread_mem_tracker_consumer_count->increment(1);
-#endif // NDEBUG
-    if (_need_pop) thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
+    if (_need_pop) {
+        thread_context()->thread_mem_tracker_mgr->pop_consumer_tracker();
+    }
+    SwitchBthreadLocal::switch_back_pthread_local();
 }
 
 } // namespace doris
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index 4fbfd9155d..cf0b8a7b38 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -37,7 +37,7 @@
 #include "util/defer_op.h" // IWYU pragma: keep
 
 // Used to observe the memory usage of the specified code segment
-#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER)
+#if defined(USE_MEM_TRACKER)
 // Count a code segment memory (memory malloc - memory free) to int64_t
 // Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; xxx; }
 #define SCOPED_MEM_COUNT(scope_mem) \
@@ -56,7 +56,7 @@
 #endif
 
 // Used to observe query/load/compaction/e.g. execution thread memory usage and respond when memory exceeds the limit.
-#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER)
+#if defined(USE_MEM_TRACKER)
 // Attach to query/load/compaction/e.g. when thread starts.
 // This will save some info about a working thread in the thread context.
 // And count the memory during thread execution (is actually also the code segment that executes the function)
@@ -189,51 +189,80 @@ public:
         return thread_mem_tracker_mgr->limiter_mem_tracker_raw();
     }
 
+    int switch_bthread_local_count = 0;
+
 private:
     TUniqueId _task_id;
     TUniqueId _fragment_instance_id;
 };
 
-#if defined(UNDEFINED_BEHAVIOR_SANITIZER)
-static ThreadContext* thread_context() {
-    return thread_context_ptr._ptr;
-}
-#else
+// Switch thread context from pthread local to bthread local context.
 // Cache the pointer of bthread local in pthead local,
 // Avoid calling bthread_getspecific frequently to get bthread local, which has performance problems.
-static void pthread_attach_bthread() {
-    bthread_id = bthread_self();
-    bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-    if (bthread_context == nullptr) {
-        // A new bthread starts, two scenarios:
-        // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
-        // 2. There are not enough reusable btls in btls pool.
-        // else, two scenarios:
-        // 1. A new bthread starts, but get a reuses btls.
-        // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
-        // So tracker call reset 0 like reuses btls.
-        bthread_context = new ThreadContext;
-        // The brpc server should respond as quickly as possible.
-        bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
-        // set the data so that next time bthread_getspecific in the thread returns the data.
-        CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
+class SwitchBthreadLocal {
+public:
+    static void switch_to_bthread_local() {
+        if (bthread_self() != 0) {
+            // Very frequent bthread_getspecific will slow, but switch_to_bthread_local is not expected to be much.
+            bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+            if (bthread_context == nullptr) {
+                // A new bthread starts, two scenarios:
+                // 1. First call to bthread_getspecific (and before any bthread_setspecific) returns NULL
+                // 2. There are not enough reusable btls in btls pool.
+                // else, two scenarios:
+                // 1. A new bthread starts, but get a reuses btls.
+                // 2. A pthread switch occurs. Because the pthread switch cannot be accurately identified at the moment.
+                // So tracker call reset 0 like reuses btls.
+                // during this period, stop the use of thread_context.
+                thread_context_ptr.init = false;
+                bthread_id = bthread_self();
+                bthread_context = new ThreadContext;
+                // The brpc server should respond as quickly as possible.
+                bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
+                // set the data so that next time bthread_getspecific in the thread returns the data.
+                CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
+                thread_context_ptr.init = true;
+            }
+            bthread_context->switch_bthread_local_count++;
+        }
     }
-}
+
+    // `switch_to_bthread_local` and `switch_back_pthread_local` should be used in pairs,
+    // `switch_to_bthread_local` should only be called if `switch_to_bthread_local` returns true
+    static void switch_back_pthread_local() {
+        if (bthread_self() != 0) {
+            if (bthread_self() != bthread_id) {
+                bthread_id = bthread_self();
+                bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+                DCHECK(bthread_context != nullptr);
+            }
+            bthread_context->switch_bthread_local_count--;
+            if (bthread_context->switch_bthread_local_count == 0) {
+                bthread_context = thread_context_ptr._ptr;
+            }
+        }
+    }
+};
 
 static ThreadContext* thread_context() {
     if (bthread_self() != 0) {
+        // in bthread
         if (bthread_self() != bthread_id) {
-            // A new bthread starts or pthread switch occurs, during this period, stop the use of thread_context.
-            thread_context_ptr.init = false;
-            pthread_attach_bthread();
-            thread_context_ptr.init = true;
+            // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations.
+            bthread_id = bthread_self();
+            bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
+            // if nullptr, a new bthread task start or bthread switch pthread but not call switch_to_bthread_local, use pthread local context
+            // else, bthread switch pthread and called switch_to_bthread_local, use bthread local context.
+            if (bthread_context == nullptr) {
+                bthread_context = thread_context_ptr._ptr;
+            }
         }
         return bthread_context;
     } else {
+        // in pthread
         return thread_context_ptr._ptr;
     }
 }
-#endif
 
 class ScopeMemCount {
 public:
@@ -264,12 +293,14 @@ public:
 class SwitchThreadMemTrackerLimiter {
 public:
     explicit SwitchThreadMemTrackerLimiter(const std::shared_ptr<MemTrackerLimiter>& mem_tracker) {
+        SwitchBthreadLocal::switch_to_bthread_local();
         _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
         thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(mem_tracker, TUniqueId());
     }
 
     ~SwitchThreadMemTrackerLimiter() {
         thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
+        SwitchBthreadLocal::switch_back_pthread_local();
     }
 
 private:
diff --git a/be/src/service/internal_service.cpp b/be/src/service/internal_service.cpp
index 357e93eef3..ed9a62a29b 100644
--- a/be/src/service/internal_service.cpp
+++ b/be/src/service/internal_service.cpp
@@ -1618,7 +1618,6 @@ void PInternalServiceImpl::get_tablet_rowset_versions(google::protobuf::RpcContr
                                                       const PGetTabletVersionsRequest* request,
                                                       PGetTabletVersionsResponse* response,
                                                       google::protobuf::Closure* done) {
-    //SCOPED_SWITCH_BTHREAD_TLS();
     brpc::ClosureGuard closure_guard(done);
     VLOG_DEBUG << "receive get tablet versions request: " << request->DebugString();
     ExecEnv::GetInstance()->storage_engine()->get_tablet_rowset_versions(request, response);
diff --git a/be/src/util/doris_metrics.cpp b/be/src/util/doris_metrics.cpp
index 97ca3de938..6b46c0cab6 100644
--- a/be/src/util/doris_metrics.cpp
+++ b/be/src/util/doris_metrics.cpp
@@ -135,11 +135,6 @@ DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(load_bytes, MetricUnit::BYTES);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_total, MetricUnit::OPERATIONS);
 DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(memtable_flush_duration_us, MetricUnit::MICROSECONDS);
 
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(attach_task_thread_count, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(add_thread_mem_tracker_consumer_count, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(thread_mem_tracker_exceed_call_back_count, MetricUnit::NOUNIT);
-DEFINE_COUNTER_METRIC_PROTOTYPE_2ARG(switch_bthread_count, MetricUnit::NOUNIT);
-
 DEFINE_GAUGE_METRIC_PROTOTYPE_2ARG(memory_pool_bytes_total, MetricUnit::BYTES);
 DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_thread_num, MetricUnit::NOUNIT);
 DEFINE_GAUGE_CORE_METRIC_PROTOTYPE_2ARG(process_fd_num_used, MetricUnit::NOUNIT);
@@ -298,11 +293,6 @@ DorisMetrics::DorisMetrics() : _metric_registry(_s_registry_name) {
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_rows);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, load_bytes);
 
-    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, attach_task_thread_count);
-    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, add_thread_mem_tracker_consumer_count);
-    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, thread_mem_tracker_exceed_call_back_count);
-    INT_COUNTER_METRIC_REGISTER(_server_metric_entity, switch_bthread_count);
-
     INT_UGAUGE_METRIC_REGISTER(_server_metric_entity, upload_total_byte);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, upload_rowset_count);
     INT_COUNTER_METRIC_REGISTER(_server_metric_entity, upload_fail_count);
diff --git a/be/src/util/doris_metrics.h b/be/src/util/doris_metrics.h
index 86b52e32be..646a4449c0 100644
--- a/be/src/util/doris_metrics.h
+++ b/be/src/util/doris_metrics.h
@@ -121,12 +121,6 @@ public:
     IntCounter* memtable_flush_total;
     IntCounter* memtable_flush_duration_us;
 
-    IntCounter* attach_task_thread_count;
-    IntCounter* add_thread_mem_tracker_consumer_count;
-    IntCounter* thread_mem_tracker_exceed_call_back_count;
-    // brpc server response count
-    IntCounter* switch_bthread_count;
-
     IntGauge* memory_pool_bytes_total;
     IntGauge* process_thread_num;
     IntGauge* process_fd_num_used;
diff --git a/be/src/util/mem_info.cpp b/be/src/util/mem_info.cpp
index 12038f7c16..8d81089323 100644
--- a/be/src/util/mem_info.cpp
+++ b/be/src/util/mem_info.cpp
@@ -77,7 +77,6 @@ int64_t MemInfo::_s_process_full_gc_size = -1;
 
 void MemInfo::refresh_allocator_mem() {
 #if defined(ADDRESS_SANITIZER) || defined(LEAK_SANITIZER) || defined(THREAD_SANITIZER)
-    LOG(INFO) << "Memory tracking is not available with address sanitizer builds.";
 #elif defined(USE_JEMALLOC)
     uint64_t epoch = 0;
     size_t sz = sizeof(epoch);


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org