You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ww...@apache.org on 2022/07/18 02:13:38 UTC

[incubator-brpc] branch master updated: Fix DoublyBufferedData limits by _SC_THREAD_KEYS_MAX (#1838)

This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 583fe463 Fix DoublyBufferedData limits by _SC_THREAD_KEYS_MAX (#1838)
583fe463 is described below

commit 583fe463790575c1397cbcd208137d25bfb048b3
Author: KaneVV1 <ya...@baidu.com>
AuthorDate: Mon Jul 18 10:13:32 2022 +0800

    Fix DoublyBufferedData limits by _SC_THREAD_KEYS_MAX (#1838)
    
    * Fix apache#502 DoublyBufferedData limits by _SC_THREAD_KEYS_MAX
    
    * Fix DoublyBufferedData limits by _SC_THREAD_KEYS_MAX
---
 src/butil/containers/doubly_buffered_data.h | 172 ++++++++++++++++++++++------
 test/brpc_load_balancer_unittest.cpp        |  14 +--
 2 files changed, 143 insertions(+), 43 deletions(-)

diff --git a/src/butil/containers/doubly_buffered_data.h b/src/butil/containers/doubly_buffered_data.h
index 93ba3421..c928037b 100644
--- a/src/butil/containers/doubly_buffered_data.h
+++ b/src/butil/containers/doubly_buffered_data.h
@@ -20,6 +20,7 @@
 #ifndef BUTIL_DOUBLY_BUFFERED_DATA_H
 #define BUTIL_DOUBLY_BUFFERED_DATA_H
 
+#include <deque>
 #include <vector>                                       // std::vector
 #include <pthread.h>
 #include "butil/scoped_lock.h"
@@ -54,6 +55,8 @@ class Void { };
 template <typename T, typename TLS = Void>
 class DoublyBufferedData {
     class Wrapper;
+    class WrapperTLSGroup;
+    typedef int WrapperTLSId;
 public:
     class ScopedPtr {
     friend class DoublyBufferedData;
@@ -163,7 +166,7 @@ private:
 
     const T* UnsafeRead() const
     { return _data + _index.load(butil::memory_order_acquire); }
-    Wrapper* AddWrapper();
+    Wrapper* AddWrapper(Wrapper*);
     void RemoveWrapper(Wrapper*);
 
     // Foreground and background void.
@@ -173,8 +176,7 @@ private:
     butil::atomic<int> _index;
 
     // Key to access thread-local wrappers.
-    bool _created_key;
-    pthread_key_t _wrapper_key;
+    WrapperTLSId _wrapper_key;
 
     // All thread-local instances.
     std::vector<Wrapper*> _wrappers;
@@ -200,13 +202,127 @@ template <typename T>
 class DoublyBufferedDataWrapperBase<T, Void> {
 };
 
+// Use pthread_key store data limits by _SC_THREAD_KEYS_MAX.
+// WrapperTLSGroup can store Wrapper in thread local storage.
+// WrapperTLSGroup will destruct Wrapper data when thread exits,
+// other times only reset Wrapper inner structure.
+template <typename T, typename TLS>
+class DoublyBufferedData<T, TLS>::WrapperTLSGroup {
+public:
+    const static size_t RAW_BLOCK_SIZE = 4096;
+    const static size_t ELEMENTS_PER_BLOCK = (RAW_BLOCK_SIZE + sizeof(T) - 1) / sizeof(T);
+
+    struct BAIDU_CACHELINE_ALIGNMENT ThreadBlock {
+        inline DoublyBufferedData::Wrapper* at(size_t offset) {
+            return _data + offset;
+        };
+
+    private:
+        DoublyBufferedData::Wrapper _data[ELEMENTS_PER_BLOCK];
+    };
+
+    inline static WrapperTLSId key_create() {
+        BAIDU_SCOPED_LOCK(_s_mutex);
+        WrapperTLSId id = 0;
+        if (!_get_free_ids().empty()) {
+            id = _get_free_ids().back();
+            _get_free_ids().pop_back();
+        } else {
+            id = _s_id++;
+        }
+        return id;
+    }
+
+    inline static int key_delete(WrapperTLSId id) {
+        BAIDU_SCOPED_LOCK(_s_mutex);
+        if (id < 0 || id >= _s_id) {
+            errno = EINVAL;
+            return -1;
+        }
+        _get_free_ids().push_back(id);
+        return 0;
+    }
+
+    inline static DoublyBufferedData::Wrapper* get_or_create_tls_data(WrapperTLSId id) {
+        if (BAIDU_UNLIKELY(id < 0)) {
+            CHECK(false) << "Invalid id=" << id;
+            return NULL;
+        }
+        if (_s_tls_blocks == NULL) {
+            _s_tls_blocks = new (std::nothrow) std::vector<ThreadBlock*>;
+            if (BAIDU_UNLIKELY(_s_tls_blocks == NULL)) {
+                LOG(FATAL) << "Fail to create vector, " << berror();
+                return NULL;
+            }
+            butil::thread_atexit(_destroy_tls_blocks);
+        }
+        const size_t block_id = (size_t)id / ELEMENTS_PER_BLOCK;
+        if (block_id >= _s_tls_blocks->size()) {
+            // The 32ul avoid pointless small resizes.
+            _s_tls_blocks->resize(std::max(block_id + 1, 32ul));
+        }
+        ThreadBlock* tb = (*_s_tls_blocks)[block_id];
+        if (tb == NULL) {
+            ThreadBlock* new_block = new (std::nothrow) ThreadBlock;
+            if (BAIDU_UNLIKELY(new_block == NULL)) {
+                return NULL;
+            }
+            tb = new_block;
+            (*_s_tls_blocks)[block_id] = new_block;
+        }
+        return tb->at(id - block_id * ELEMENTS_PER_BLOCK);
+    }
+
+private:
+    static void _destroy_tls_blocks() {
+        if (!_s_tls_blocks) {
+            return;
+        }
+        for (size_t i = 0; i < _s_tls_blocks->size(); ++i) {
+            delete (*_s_tls_blocks)[i];
+        }
+        delete _s_tls_blocks;
+        _s_tls_blocks = NULL;
+    }
+
+    inline static std::deque<WrapperTLSId>& _get_free_ids() {
+        if (BAIDU_UNLIKELY(!_s_free_ids)) {
+            _s_free_ids = new (std::nothrow) std::deque<WrapperTLSId>();
+            if (!_s_free_ids) {
+                abort();
+            }
+        }
+        return *_s_free_ids;
+    }
+
+private:
+    static pthread_mutex_t _s_mutex;
+    static WrapperTLSId _s_id;
+    static std::deque<WrapperTLSId>* _s_free_ids;
+    static __thread std::vector<ThreadBlock*>* _s_tls_blocks;
+};
+
+template <typename T, typename TLS>
+pthread_mutex_t DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+template <typename T, typename TLS>
+std::deque<typename DoublyBufferedData<T, TLS>::WrapperTLSId>*
+        DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_free_ids = NULL;
+
+template <typename T, typename TLS>
+typename DoublyBufferedData<T, TLS>::WrapperTLSId
+        DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_id = 0;
+
+template <typename T, typename TLS>
+__thread std::vector<typename DoublyBufferedData<T, TLS>::WrapperTLSGroup::ThreadBlock*>*
+        DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_tls_blocks = NULL;
 
 template <typename T, typename TLS>
 class DoublyBufferedData<T, TLS>::Wrapper
     : public DoublyBufferedDataWrapperBase<T, TLS> {
 friend class DoublyBufferedData;
 public:
-    explicit Wrapper(DoublyBufferedData* c) : _control(c) {
+    explicit Wrapper() : _control(NULL) {
         pthread_mutex_init(&_mutex, NULL);
     }
     
@@ -239,19 +355,26 @@ private:
 
 // Called when thread initializes thread-local wrapper.
 template <typename T, typename TLS>
-typename DoublyBufferedData<T, TLS>::Wrapper*
-DoublyBufferedData<T, TLS>::AddWrapper() {
-    std::unique_ptr<Wrapper> w(new (std::nothrow) Wrapper(this));
+typename DoublyBufferedData<T, TLS>::Wrapper* DoublyBufferedData<T, TLS>::AddWrapper(
+        typename DoublyBufferedData<T, TLS>::Wrapper* w) {
     if (NULL == w) {
         return NULL;
     }
+    if (w->_control == this) {
+        return w;
+    }
+    if (w->_control != NULL) {
+        LOG(FATAL) << "Get wrapper from tls but control != this";
+        return NULL;
+    }
     try {
+        w->_control = this;
         BAIDU_SCOPED_LOCK(_wrappers_mutex);
-        _wrappers.push_back(w.get());
+        _wrappers.push_back(w);
     } catch (std::exception& e) {
         return NULL;
     }
-    return w.release();
+    return w;
 }
 
 // Called when thread quits.
@@ -274,18 +397,11 @@ void DoublyBufferedData<T, TLS>::RemoveWrapper(
 template <typename T, typename TLS>
 DoublyBufferedData<T, TLS>::DoublyBufferedData()
     : _index(0)
-    , _created_key(false)
     , _wrapper_key(0) {
     _wrappers.reserve(64);
     pthread_mutex_init(&_modify_mutex, NULL);
     pthread_mutex_init(&_wrappers_mutex, NULL);
-    const int rc = pthread_key_create(&_wrapper_key,
-                                      butil::delete_object<Wrapper>);
-    if (rc != 0) {
-        LOG(FATAL) << "Fail to pthread_key_create: " << berror(rc);
-    } else {
-        _created_key = true;
-    }
+    _wrapper_key = WrapperTLSGroup::key_create();
     // Initialize _data for some POD types. This is essential for pointer
     // types because they should be Read() as NULL before any Modify().
     if (is_integral<T>::value || is_floating_point<T>::value ||
@@ -299,18 +415,16 @@ template <typename T, typename TLS>
 DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
     // User is responsible for synchronizations between Read()/Modify() and
     // this function.
-    if (_created_key) {
-        pthread_key_delete(_wrapper_key);
-    }
     
     {
         BAIDU_SCOPED_LOCK(_wrappers_mutex);
         for (size_t i = 0; i < _wrappers.size(); ++i) {
             _wrappers[i]->_control = NULL;  // hack: disable removal.
-            delete _wrappers[i];
         }
         _wrappers.clear();
     }
+    WrapperTLSGroup::key_delete(_wrapper_key);
+    _wrapper_key = -1;
     pthread_mutex_destroy(&_modify_mutex);
     pthread_mutex_destroy(&_wrappers_mutex);
 }
@@ -318,26 +432,14 @@ DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
 template <typename T, typename TLS>
 int DoublyBufferedData<T, TLS>::Read(
     typename DoublyBufferedData<T, TLS>::ScopedPtr* ptr) {
-    if (BAIDU_UNLIKELY(!_created_key)) {
-        return -1;
-    }
-    Wrapper* w = static_cast<Wrapper*>(pthread_getspecific(_wrapper_key));
+    Wrapper* p = WrapperTLSGroup::get_or_create_tls_data(_wrapper_key);
+    Wrapper* w = AddWrapper(p);
     if (BAIDU_LIKELY(w != NULL)) {
         w->BeginRead();
         ptr->_data = UnsafeRead();
         ptr->_w = w;
         return 0;
     }
-    w = AddWrapper();
-    if (BAIDU_LIKELY(w != NULL)) {
-        const int rc = pthread_setspecific(_wrapper_key, w);
-        if (rc == 0) {
-            w->BeginRead();
-            ptr->_data = UnsafeRead();
-            ptr->_w = w;
-            return 0;
-        }
-    }
     return -1;
 }
 
diff --git a/test/brpc_load_balancer_unittest.cpp b/test/brpc_load_balancer_unittest.cpp
index 66426870..888045d4 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -97,16 +97,14 @@ bool AddN(Foo& f, int n) {
 }
 
 TEST_F(LoadBalancerTest, doubly_buffered_data) {
-    const size_t old_TLS_ctor = TLS_ctor;
-    const size_t old_TLS_dtor = TLS_dtor;
+    // test doubly_buffered_data TLS limits
     {
-        butil::DoublyBufferedData<Foo, TLS> d2;
-        butil::DoublyBufferedData<Foo, TLS>::ScopedPtr ptr;
-        d2.Read(&ptr);
-        ASSERT_EQ(old_TLS_ctor + 1, TLS_ctor);
+        std::cout << "current PTHREAD_KEYS_MAX: " << PTHREAD_KEYS_MAX << std::endl;
+        butil::DoublyBufferedData<Foo> data[PTHREAD_KEYS_MAX + 1];
+        butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+        ASSERT_EQ(0, data[PTHREAD_KEYS_MAX].Read(&ptr));
+        ASSERT_EQ(0, ptr->x);
     }
-    ASSERT_EQ(old_TLS_ctor + 1, TLS_ctor);
-    ASSERT_EQ(old_TLS_dtor + 1, TLS_dtor);
 
     butil::DoublyBufferedData<Foo> d;
     {


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org