You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@doris.apache.org by yi...@apache.org on 2023/06/21 13:14:15 UTC

[doris] branch master updated: [fix](memory) no switch bthread context in UBSAN compile (#21064)

This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 661e1ae7c5 [fix](memory) no switch bthread context in UBSAN compile (#21064)
661e1ae7c5 is described below

commit 661e1ae7c5d1b14ce5f0dba84dd28c7f647dae96
Author: Xinyi Zou <zo...@gmail.com>
AuthorDate: Wed Jun 21 21:14:07 2023 +0800

    [fix](memory) no switch bthread context in UBSAN compile (#21064)
    
    When UBSAN is compiled, all memory will be tracked to the orphan (unknown) mem tracker, and the bthread context and mem tracker will no longer be switched.
    
    The supplementary fixes are as follows: #20999
---
 be/src/io/fs/stream_load_pipe.cpp             |  2 +-
 be/src/pipeline/exec/exchange_sink_buffer.cpp |  4 ++--
 be/src/runtime/thread_context.h               | 30 ++++++++++++++-------------
 be/src/vec/sink/vdata_stream_sender.cpp       |  2 +-
 be/src/vec/sink/vtablet_sink.cpp              |  4 ++--
 5 files changed, 22 insertions(+), 20 deletions(-)

diff --git a/be/src/io/fs/stream_load_pipe.cpp b/be/src/io/fs/stream_load_pipe.cpp
index b7a1713386..d79cfd028d 100644
--- a/be/src/io/fs/stream_load_pipe.cpp
+++ b/be/src/io/fs/stream_load_pipe.cpp
@@ -51,7 +51,7 @@ StreamLoadPipe::~StreamLoadPipe() {
 
 Status StreamLoadPipe::read_at_impl(size_t /*offset*/, Slice result, size_t* bytes_read,
                                     const IOContext* /*io_ctx*/) {
-    SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+    SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
     *bytes_read = 0;
     size_t bytes_req = result.size;
     char* to = result.data;
diff --git a/be/src/pipeline/exec/exchange_sink_buffer.cpp b/be/src/pipeline/exec/exchange_sink_buffer.cpp
index b16beca11d..c0f0e921e9 100644
--- a/be/src/pipeline/exec/exchange_sink_buffer.cpp
+++ b/be/src/pipeline/exec/exchange_sink_buffer.cpp
@@ -204,7 +204,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
         });
         {
-            SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
             if (enable_http_send_block(*brpc_request)) {
                 RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure,
                                                     *brpc_request,
@@ -251,7 +251,7 @@ Status ExchangeSinkBuffer::_send_rpc(InstanceLoId id) {
             }
         });
         {
-            SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
             if (enable_http_send_block(*brpc_request)) {
                 RETURN_IF_ERROR(transmit_block_http(_context->get_runtime_state(), closure,
                                                     *brpc_request,
diff --git a/be/src/runtime/thread_context.h b/be/src/runtime/thread_context.h
index c97dfab6f8..0cd42648ae 100644
--- a/be/src/runtime/thread_context.h
+++ b/be/src/runtime/thread_context.h
@@ -37,7 +37,7 @@
 #include "util/defer_op.h" // IWYU pragma: keep
 
 // Used to observe the memory usage of the specified code segment
-#if defined(USE_MEM_TRACKER)
+#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER)
 // Count a code segment memory (memory malloc - memory free) to int64_t
 // Usage example: int64_t scope_mem = 0; { SCOPED_MEM_COUNT(&scope_mem); xxx; xxx; }
 #define SCOPED_MEM_COUNT(scope_mem) \
@@ -56,7 +56,7 @@
 #endif
 
 // Used to observe query/load/compaction/e.g. execution thread memory usage and respond when memory exceeds the limit.
-#if defined(USE_MEM_TRACKER)
+#if defined(USE_MEM_TRACKER) && !defined(UNDEFINED_BEHAVIOR_SANITIZER)
 // Attach to query/load/compaction/e.g. when thread starts.
 // This will save some info about a working thread in the thread context.
 // And count the memory during thread execution (is actually also the code segment that executes the function)
@@ -207,13 +207,6 @@ private:
     TUniqueId _fragment_instance_id;
 };
 
-#if defined(UNDEFINED_BEHAVIOR_SANITIZER)
-class SwitchBthreadLocal {
-public:
-    static void switch_to_bthread_local() {}
-    static void switch_back_pthread_local() {}
-};
-#else
 // Switch thread context from pthread local to bthread local context.
 // Cache the pointer of bthread local in pthead local,
 // Avoid calling bthread_getspecific frequently to get bthread local, which has performance problems.
@@ -233,7 +226,6 @@ public:
                 // So tracker call reset 0 like reuses btls.
                 // during this period, stop the use of thread_context.
                 thread_context_ptr.init = false;
-                bthread_id = bthread_self();
                 bthread_context = new ThreadContext;
                 // The brpc server should respond as quickly as possible.
                 bthread_context->thread_mem_tracker_mgr->disable_wait_gc();
@@ -241,6 +233,7 @@ public:
                 CHECK_EQ(0, bthread_setspecific(btls_key, bthread_context));
                 thread_context_ptr.init = true;
             }
+            bthread_id = bthread_self();
             bthread_context->switch_bthread_local_count++;
         }
     }
@@ -249,7 +242,7 @@ public:
     // `switch_to_bthread_local` should only be called if `switch_to_bthread_local` returns true
     static void switch_back_pthread_local() {
         if (bthread_self() != 0) {
-            if (bthread_self() != bthread_id) {
+            if (!bthread_equal(bthread_self(), bthread_id)) {
                 bthread_id = bthread_self();
                 bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
                 DCHECK(bthread_context != nullptr);
@@ -261,16 +254,17 @@ public:
         }
     }
 };
-#endif
 
+// Note: All use of thread_context() in bthread requires the use of SwitchBthreadLocal.
 static ThreadContext* thread_context() {
     if (bthread_self() != 0) {
         // in bthread
-        if (bthread_self() != bthread_id) {
+        if (!bthread_equal(bthread_self(), bthread_id)) {
             // bthread switching pthread may be very frequent, remember not to use lock or other time-consuming operations.
             bthread_id = bthread_self();
             bthread_context = static_cast<ThreadContext*>(bthread_getspecific(btls_key));
-            // if nullptr, a new bthread task start or bthread switch pthread but not call switch_to_bthread_local, use pthread local context
+            // if nullptr, a new bthread task start and no reusable bthread local,
+            // or bthread switch pthread but not call switch_to_bthread_local, use pthread local context
             // else, bthread switch pthread and called switch_to_bthread_local, use bthread local context.
             if (bthread_context == nullptr) {
                 bthread_context = thread_context_ptr._ptr;
@@ -329,17 +323,25 @@ private:
 class TrackMemoryToUnknown {
 public:
     explicit TrackMemoryToUnknown() {
+        if (bthread_self() != 0) {
+            _tid = std::this_thread::get_id(); // save pthread id
+        }
         _old_mem_tracker = thread_context()->thread_mem_tracker_mgr->limiter_mem_tracker();
         thread_context()->thread_mem_tracker_mgr->attach_limiter_tracker(
                 ExecEnv::GetInstance()->orphan_mem_tracker(), TUniqueId());
     }
 
     ~TrackMemoryToUnknown() {
+        if (bthread_self() != 0) {
+            // make sure pthread is not switch, if switch, mem tracker will be wrong, but not crash in release
+            DCHECK(_tid == std::this_thread::get_id());
+        }
         thread_context()->thread_mem_tracker_mgr->detach_limiter_tracker(_old_mem_tracker);
     }
 
 private:
     std::shared_ptr<MemTrackerLimiter> _old_mem_tracker;
+    std::thread::id _tid;
 };
 
 class AddThreadMemTrackerConsumer {
diff --git a/be/src/vec/sink/vdata_stream_sender.cpp b/be/src/vec/sink/vdata_stream_sender.cpp
index eb1c21b834..df0bb396a3 100644
--- a/be/src/vec/sink/vdata_stream_sender.cpp
+++ b/be/src/vec/sink/vdata_stream_sender.cpp
@@ -179,7 +179,7 @@ Status Channel::send_block(PBlock* block, bool eos) {
     _closure->cntl.set_timeout_ms(_brpc_timeout_ms);
 
     {
-        SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+        SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
         if (enable_http_send_block(_brpc_request, _parent->_transfer_large_data_by_brpc)) {
             RETURN_IF_ERROR(transmit_block_http(_state, _closure, _brpc_request, _brpc_dest_addr));
         } else {
diff --git a/be/src/vec/sink/vtablet_sink.cpp b/be/src/vec/sink/vtablet_sink.cpp
index 5c9375ea77..39f3475563 100644
--- a/be/src/vec/sink/vtablet_sink.cpp
+++ b/be/src/vec/sink/vtablet_sink.cpp
@@ -839,7 +839,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
         _add_block_closure->cntl.http_request().set_content_type("application/json");
 
         {
-            SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
             _brpc_http_stub->tablet_writer_add_block_by_http(&_add_block_closure->cntl, nullptr,
                                                              &_add_block_closure->result,
                                                              _add_block_closure);
@@ -847,7 +847,7 @@ void VNodeChannel::try_send_block(RuntimeState* state) {
     } else {
         _add_block_closure->cntl.http_request().Clear();
         {
-            SCOPED_TRACK_MEMORY_TO_UNKNOWN();
+            SCOPED_SWITCH_THREAD_MEM_TRACKER_LIMITER(ExecEnv::GetInstance()->orphan_mem_tracker());
             _stub->tablet_writer_add_block(&_add_block_closure->cntl, &request,
                                            &_add_block_closure->result, _add_block_closure);
         }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@doris.apache.org
For additional commands, e-mail: commits-help@doris.apache.org