You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by ww...@apache.org on 2022/07/18 02:13:38 UTC
[incubator-brpc] branch master updated: Fix DoublyBufferedData limits by _SC_THREAD_KEYS_MAX (#1838)
This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 583fe463 Fix DoublyBufferedData limits by _SC_THREAD_KEYS_MAX (#1838)
583fe463 is described below
commit 583fe463790575c1397cbcd208137d25bfb048b3
Author: KaneVV1 <ya...@baidu.com>
AuthorDate: Mon Jul 18 10:13:32 2022 +0800
Fix DoublyBufferedData limits by _SC_THREAD_KEYS_MAX (#1838)
* Fix apache#502 DoublyBufferedData limits by _SC_THREAD_KEYS_MAX
* Fix DoublyBufferedData limits by _SC_THREAD_KEYS_MAX
---
src/butil/containers/doubly_buffered_data.h | 172 ++++++++++++++++++++++------
test/brpc_load_balancer_unittest.cpp | 14 +--
2 files changed, 143 insertions(+), 43 deletions(-)
diff --git a/src/butil/containers/doubly_buffered_data.h b/src/butil/containers/doubly_buffered_data.h
index 93ba3421..c928037b 100644
--- a/src/butil/containers/doubly_buffered_data.h
+++ b/src/butil/containers/doubly_buffered_data.h
@@ -20,6 +20,7 @@
#ifndef BUTIL_DOUBLY_BUFFERED_DATA_H
#define BUTIL_DOUBLY_BUFFERED_DATA_H
+#include <deque>
#include <vector> // std::vector
#include <pthread.h>
#include "butil/scoped_lock.h"
@@ -54,6 +55,8 @@ class Void { };
template <typename T, typename TLS = Void>
class DoublyBufferedData {
class Wrapper;
+ class WrapperTLSGroup;
+ typedef int WrapperTLSId;
public:
class ScopedPtr {
friend class DoublyBufferedData;
@@ -163,7 +166,7 @@ private:
const T* UnsafeRead() const
{ return _data + _index.load(butil::memory_order_acquire); }
- Wrapper* AddWrapper();
+ Wrapper* AddWrapper(Wrapper*);
void RemoveWrapper(Wrapper*);
// Foreground and background void.
@@ -173,8 +176,7 @@ private:
butil::atomic<int> _index;
// Key to access thread-local wrappers.
- bool _created_key;
- pthread_key_t _wrapper_key;
+ WrapperTLSId _wrapper_key;
// All thread-local instances.
std::vector<Wrapper*> _wrappers;
@@ -200,13 +202,127 @@ template <typename T>
class DoublyBufferedDataWrapperBase<T, Void> {
};
+// Use pthread_key store data limits by _SC_THREAD_KEYS_MAX.
+// WrapperTLSGroup can store Wrapper in thread local storage.
+// WrapperTLSGroup will destruct Wrapper data when thread exits,
+// other times only reset Wrapper inner structure.
+template <typename T, typename TLS>
+class DoublyBufferedData<T, TLS>::WrapperTLSGroup {
+public:
+ const static size_t RAW_BLOCK_SIZE = 4096;
+ const static size_t ELEMENTS_PER_BLOCK = (RAW_BLOCK_SIZE + sizeof(T) - 1) / sizeof(T);
+
+ struct BAIDU_CACHELINE_ALIGNMENT ThreadBlock {
+ inline DoublyBufferedData::Wrapper* at(size_t offset) {
+ return _data + offset;
+ };
+
+ private:
+ DoublyBufferedData::Wrapper _data[ELEMENTS_PER_BLOCK];
+ };
+
+ inline static WrapperTLSId key_create() {
+ BAIDU_SCOPED_LOCK(_s_mutex);
+ WrapperTLSId id = 0;
+ if (!_get_free_ids().empty()) {
+ id = _get_free_ids().back();
+ _get_free_ids().pop_back();
+ } else {
+ id = _s_id++;
+ }
+ return id;
+ }
+
+ inline static int key_delete(WrapperTLSId id) {
+ BAIDU_SCOPED_LOCK(_s_mutex);
+ if (id < 0 || id >= _s_id) {
+ errno = EINVAL;
+ return -1;
+ }
+ _get_free_ids().push_back(id);
+ return 0;
+ }
+
+ inline static DoublyBufferedData::Wrapper* get_or_create_tls_data(WrapperTLSId id) {
+ if (BAIDU_UNLIKELY(id < 0)) {
+ CHECK(false) << "Invalid id=" << id;
+ return NULL;
+ }
+ if (_s_tls_blocks == NULL) {
+ _s_tls_blocks = new (std::nothrow) std::vector<ThreadBlock*>;
+ if (BAIDU_UNLIKELY(_s_tls_blocks == NULL)) {
+ LOG(FATAL) << "Fail to create vector, " << berror();
+ return NULL;
+ }
+ butil::thread_atexit(_destroy_tls_blocks);
+ }
+ const size_t block_id = (size_t)id / ELEMENTS_PER_BLOCK;
+ if (block_id >= _s_tls_blocks->size()) {
+ // The 32ul avoid pointless small resizes.
+ _s_tls_blocks->resize(std::max(block_id + 1, 32ul));
+ }
+ ThreadBlock* tb = (*_s_tls_blocks)[block_id];
+ if (tb == NULL) {
+ ThreadBlock* new_block = new (std::nothrow) ThreadBlock;
+ if (BAIDU_UNLIKELY(new_block == NULL)) {
+ return NULL;
+ }
+ tb = new_block;
+ (*_s_tls_blocks)[block_id] = new_block;
+ }
+ return tb->at(id - block_id * ELEMENTS_PER_BLOCK);
+ }
+
+private:
+ static void _destroy_tls_blocks() {
+ if (!_s_tls_blocks) {
+ return;
+ }
+ for (size_t i = 0; i < _s_tls_blocks->size(); ++i) {
+ delete (*_s_tls_blocks)[i];
+ }
+ delete _s_tls_blocks;
+ _s_tls_blocks = NULL;
+ }
+
+ inline static std::deque<WrapperTLSId>& _get_free_ids() {
+ if (BAIDU_UNLIKELY(!_s_free_ids)) {
+ _s_free_ids = new (std::nothrow) std::deque<WrapperTLSId>();
+ if (!_s_free_ids) {
+ abort();
+ }
+ }
+ return *_s_free_ids;
+ }
+
+private:
+ static pthread_mutex_t _s_mutex;
+ static WrapperTLSId _s_id;
+ static std::deque<WrapperTLSId>* _s_free_ids;
+ static __thread std::vector<ThreadBlock*>* _s_tls_blocks;
+};
+
+template <typename T, typename TLS>
+pthread_mutex_t DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_mutex = PTHREAD_MUTEX_INITIALIZER;
+
+template <typename T, typename TLS>
+std::deque<typename DoublyBufferedData<T, TLS>::WrapperTLSId>*
+ DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_free_ids = NULL;
+
+template <typename T, typename TLS>
+typename DoublyBufferedData<T, TLS>::WrapperTLSId
+ DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_id = 0;
+
+template <typename T, typename TLS>
+__thread std::vector<typename DoublyBufferedData<T, TLS>::WrapperTLSGroup::ThreadBlock*>*
+ DoublyBufferedData<T, TLS>::WrapperTLSGroup::_s_tls_blocks = NULL;
template <typename T, typename TLS>
class DoublyBufferedData<T, TLS>::Wrapper
: public DoublyBufferedDataWrapperBase<T, TLS> {
friend class DoublyBufferedData;
public:
- explicit Wrapper(DoublyBufferedData* c) : _control(c) {
+ explicit Wrapper() : _control(NULL) {
pthread_mutex_init(&_mutex, NULL);
}
@@ -239,19 +355,26 @@ private:
// Called when thread initializes thread-local wrapper.
template <typename T, typename TLS>
-typename DoublyBufferedData<T, TLS>::Wrapper*
-DoublyBufferedData<T, TLS>::AddWrapper() {
- std::unique_ptr<Wrapper> w(new (std::nothrow) Wrapper(this));
+typename DoublyBufferedData<T, TLS>::Wrapper* DoublyBufferedData<T, TLS>::AddWrapper(
+ typename DoublyBufferedData<T, TLS>::Wrapper* w) {
if (NULL == w) {
return NULL;
}
+ if (w->_control == this) {
+ return w;
+ }
+ if (w->_control != NULL) {
+ LOG(FATAL) << "Get wrapper from tls but control != this";
+ return NULL;
+ }
try {
+ w->_control = this;
BAIDU_SCOPED_LOCK(_wrappers_mutex);
- _wrappers.push_back(w.get());
+ _wrappers.push_back(w);
} catch (std::exception& e) {
return NULL;
}
- return w.release();
+ return w;
}
// Called when thread quits.
@@ -274,18 +397,11 @@ void DoublyBufferedData<T, TLS>::RemoveWrapper(
template <typename T, typename TLS>
DoublyBufferedData<T, TLS>::DoublyBufferedData()
: _index(0)
- , _created_key(false)
, _wrapper_key(0) {
_wrappers.reserve(64);
pthread_mutex_init(&_modify_mutex, NULL);
pthread_mutex_init(&_wrappers_mutex, NULL);
- const int rc = pthread_key_create(&_wrapper_key,
- butil::delete_object<Wrapper>);
- if (rc != 0) {
- LOG(FATAL) << "Fail to pthread_key_create: " << berror(rc);
- } else {
- _created_key = true;
- }
+ _wrapper_key = WrapperTLSGroup::key_create();
// Initialize _data for some POD types. This is essential for pointer
// types because they should be Read() as NULL before any Modify().
if (is_integral<T>::value || is_floating_point<T>::value ||
@@ -299,18 +415,16 @@ template <typename T, typename TLS>
DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
// User is responsible for synchronizations between Read()/Modify() and
// this function.
- if (_created_key) {
- pthread_key_delete(_wrapper_key);
- }
{
BAIDU_SCOPED_LOCK(_wrappers_mutex);
for (size_t i = 0; i < _wrappers.size(); ++i) {
_wrappers[i]->_control = NULL; // hack: disable removal.
- delete _wrappers[i];
}
_wrappers.clear();
}
+ WrapperTLSGroup::key_delete(_wrapper_key);
+ _wrapper_key = -1;
pthread_mutex_destroy(&_modify_mutex);
pthread_mutex_destroy(&_wrappers_mutex);
}
@@ -318,26 +432,14 @@ DoublyBufferedData<T, TLS>::~DoublyBufferedData() {
template <typename T, typename TLS>
int DoublyBufferedData<T, TLS>::Read(
typename DoublyBufferedData<T, TLS>::ScopedPtr* ptr) {
- if (BAIDU_UNLIKELY(!_created_key)) {
- return -1;
- }
- Wrapper* w = static_cast<Wrapper*>(pthread_getspecific(_wrapper_key));
+ Wrapper* p = WrapperTLSGroup::get_or_create_tls_data(_wrapper_key);
+ Wrapper* w = AddWrapper(p);
if (BAIDU_LIKELY(w != NULL)) {
w->BeginRead();
ptr->_data = UnsafeRead();
ptr->_w = w;
return 0;
}
- w = AddWrapper();
- if (BAIDU_LIKELY(w != NULL)) {
- const int rc = pthread_setspecific(_wrapper_key, w);
- if (rc == 0) {
- w->BeginRead();
- ptr->_data = UnsafeRead();
- ptr->_w = w;
- return 0;
- }
- }
return -1;
}
diff --git a/test/brpc_load_balancer_unittest.cpp b/test/brpc_load_balancer_unittest.cpp
index 66426870..888045d4 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -97,16 +97,14 @@ bool AddN(Foo& f, int n) {
}
TEST_F(LoadBalancerTest, doubly_buffered_data) {
- const size_t old_TLS_ctor = TLS_ctor;
- const size_t old_TLS_dtor = TLS_dtor;
+ // test doubly_buffered_data TLS limits
{
- butil::DoublyBufferedData<Foo, TLS> d2;
- butil::DoublyBufferedData<Foo, TLS>::ScopedPtr ptr;
- d2.Read(&ptr);
- ASSERT_EQ(old_TLS_ctor + 1, TLS_ctor);
+ std::cout << "current PTHREAD_KEYS_MAX: " << PTHREAD_KEYS_MAX << std::endl;
+ butil::DoublyBufferedData<Foo> data[PTHREAD_KEYS_MAX + 1];
+ butil::DoublyBufferedData<Foo>::ScopedPtr ptr;
+ ASSERT_EQ(0, data[PTHREAD_KEYS_MAX].Read(&ptr));
+ ASSERT_EQ(0, ptr->x);
}
- ASSERT_EQ(old_TLS_ctor + 1, TLS_ctor);
- ASSERT_EQ(old_TLS_dtor + 1, TLS_dtor);
butil::DoublyBufferedData<Foo> d;
{
---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org