You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@brpc.apache.org by "yanglimingcn (via GitHub)" <gi...@apache.org> on 2023/04/20 05:13:26 UTC

[PR] add usercode thread pool for method (brpc)

yanglimingcn opened a new pull request, #2215:
URL: https://github.com/apache/brpc/pull/2215

   ### What problem does this PR solve?
   1、应用不同接口的处理逻辑不尽相同,qps和延迟也不尽相同,很多时候我们想为某些接口预留一定的能力,比如heartbeat接口或者其它控制层面的接口。
   2、应用有些接口延迟比较高,如果这些接口的请求比较多(超过worker数量)会导致其它请求的延迟增加,对于延迟敏感接口产生影响。因此需要将这些延迟高的接口独立出来。
   3、不同接口处理逻辑上的不同也需要独立线程池的划分,比如,io请求要提交到绑定了磁盘设备驱动的线程上,在这个特定的线程上磁盘设备驱动完成io的提交和收割工作。
   4、一些阻塞worker线程的调用会让这些worker线程不可用,影响bthread的调度,比如worker1排队很多请求,前边一个请求要阻塞worker,虽然后续请求会被调度到其它worker,但是这个过程使任务在不同worker间调度,还是有很多消耗的。
   5、不同应用逻辑差异很大,很难在应用层根据服务或者方法将任务很细致的划分到不同线程池,代码实现复杂度很高,在框架层面实现能更加简单。
   Issue Number:
   
   Problem Summary:
   
   ### What is changed and the side effects?
   
   Changed:
   
   Side effects:
   - Performance effects(性能影响):
   
   - Breaking backward compatibility(向后兼容性): 
   
   ---
   ### Check List:
   - Please make sure your changes are compilable(请确保你的更改可以通过编译).
   - When providing us with a new feature, it is best to add related tests(如果你向我们增加一个新的功能, 请添加相关测试).
   - Please follow [Contributor Covenant Code of Conduct](../../master/CODE_OF_CONDUCT.md).(请遵循贡献者准则).
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191867173


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1550911590

   > 只支持baidu协议吗?其他协议也应该支持吧?
   
   等代码主体review没啥问题了,我再给其它协议加上。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191908472


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   这个Thread Pool是不是可以放在server外面来管理呢?考虑这种场景:有两个server需要共享一个thread pool



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1573313057

   这块后边最好实现一个无锁队列,性能应该更好些。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525526114

   > > > 需要考虑线程池退出问题吗?
   > > 
   > > 
   > > 这块参考了usercode backup pool的实现,它里面有个注释,所以我这块好像也得按照这个方式处理。 int UserCodeBackupPool::Init() { // Like bthread workers, these threads never quit (to avoid potential hang // during termination of program). for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) { pthread_t th; if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) { LOG(ERROR) << "Fail to create UserCodeRunner"; return -1; } } return 0; }
   > 
   > UserCodeBackupPool是框架里使用的,进程运行时是不会析构的。
   > 
   > ```c++
   > static UserCodeBackupPool* s_usercode_pool = NULL;
   > ```
   > 
   > 该PR里的UserCodeThreadPool是给用户使用的,UserCodeThreadPool有可能会析构,但是线程池里的线程没有退出,有crash的可能。
   > 
   > 在UserCodeThreadPool析构函数中做一下stop and join操作,是不是合理一些呢?
   
   看代码Server退出的时候都应该调用Server::Join()这个函数,这个函数执行完,应该就没有新的请求了,然后Server析构函数中析构_thread_pool_map的时候去UserCodeThreadPool里面调用stop and join,我觉得这样是合理的。
   UserCodeBackupPool如果在Server里面析构了应该会造成死锁问题,UserCodeThreadPool应该没有这种问题。
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525565132

   > > > 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗?
   > > 
   > > 
   > > @chenBright 这个有啥建议吗?Map里面存了裸指针,析构的时候要手动遍历调用delete,感觉不太优雅。
   > 
   > 不想自己delete,那就用智能指针吧。没有特别好的办法,看看大家有没有啥建议咯。
   
   flatmap放unique_ptr好像也不行,或者不用flatmap,用std::unordered_map ?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191120117


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   这块在server里面加个方法?SetNumThreads(pool_name, number)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191065633


##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;

Review Comment:
   这个定义在UserCodeTask结构体里面



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "serverglen (via GitHub)" <gi...@apache.org>.
serverglen commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1202566928


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"

Review Comment:
   brpc/usercode_thread_pool.h头文件移到最下面吧



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191936718


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   嗯,可以的,那就定义一个全局的,这样gflags动态修改的话,也就能查找到了。你觉得这样可以我就这么改一下,我觉得挺合理的。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191071292


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);
+    if (ret) {
+        if (info.type != "int32") {
+            LOG(ERROR) << "Failed gflags type need int32";
+            return false;
+        }
+        _gflag_num_threads = static_cast<const int32_t*>(info.flag_ptr);
+        num = *_gflag_num_threads;
+    } else {
+        num = std::atoi(num_threads.c_str());
+    }
+    if (num <= 0) {
+        LOG(ERROR) << "Failed parameter for usercode pool init";
+        return false;
+    }
+    SetNumThreads(num);
+    return true;
+}
+
+void UserCodeThreadPool::RunUserCode(void (*fn)(void*), void* arg) {
+    if (_gflag_num_threads != nullptr) {
+        size_t num_threads = *_gflag_num_threads;
+        if (num_threads != _workers.size()) {
+            SetNumThreads(num_threads);
+        }
+    }
+    auto range = GetNumThreads();
+    auto index = _assign_policy->Index(arg, range);
+    auto& worker = _workers[index];

Review Comment:
   这个逻辑放到Policy里面做了,给Policy传入arg和range,是可以让用户根据请求和线程池数量定制自己选择线程的策略,比如,请求是有tag的,不同tag的可以放到不同线程上去执行。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191854729


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   可以暴露一个修改线程数的方法吧。用户自己可以定义gflag,在validator中调用这个方法



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;

Review Comment:
   LocalStorage里有三个成员
       KeyTable* keytable;
       void* assigned_data;
       void* rpcz_parent_span;
   它们的生命周期都不一样。
   1. keytable如果有使用keytable_pool,则生命周期由keytable_pool管理,如果没有pool,则生命周期随着bthread
   2. assigned_data通常是一个server级别的option,不会变化
   3. rpcz_parent_span这个生命周期是随着bthread的
   
   稳妥起见建议只传递assigned_data到usercode worker,其它的不要传递



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);
+    if (ret) {
+        if (info.type != "int32") {
+            LOG(ERROR) << "Failed gflags type need int32";
+            return false;
+        }
+        _gflag_num_threads = static_cast<const int32_t*>(info.flag_ptr);
+        num = *_gflag_num_threads;
+    } else {
+        num = std::atoi(num_threads.c_str());
+    }
+    if (num <= 0) {
+        LOG(ERROR) << "Failed parameter for usercode pool init";
+        return false;
+    }
+    SetNumThreads(num);
+    return true;
+}
+
+void UserCodeThreadPool::RunUserCode(void (*fn)(void*), void* arg) {
+    if (_gflag_num_threads != nullptr) {
+        size_t num_threads = *_gflag_num_threads;
+        if (num_threads != _workers.size()) {
+            SetNumThreads(num_threads);
+        }
+    }
+    auto range = GetNumThreads();
+    auto index = _assign_policy->Index(arg, range);
+    auto& worker = _workers[index];

Review Comment:
   既然允许用户自定义Policy了,就不能假设policy的实现没有bug。这里最好还是要检查一下



##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;

Review Comment:
   > 这个定义在UserCodeTask结构体里面
   
   那个只是使用了bthread::LocalStorage类型,并没有引用bthread::tls_bls这个全局变量



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "chenBright (via GitHub)" <gi...@apache.org>.
chenBright commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525554441

   > > 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗?
   > 
   > @chenBright 这个有啥建议吗?Map里面存了裸指针,析构的时候要手动遍历调用delete,感觉不太优雅。
   
   不想自己delete,那就用智能指针吧。没有特别好的办法,看看大家有没有啥建议咯。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1195028211


##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,142 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+
+namespace brpc {
+// Store pending user code.
+struct UserCodeTask {
+    void (*fn)(void*);
+    void* arg;
+    void* assigned_data;
+};
+
+class UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadAssignPolicy() {}
+    virtual ~UserCodeThreadAssignPolicy() {}
+    virtual size_t Index(void* arg, size_t range) = 0;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy);
+};
+
+class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadRandomAssignPolicy() {}
+    virtual ~UserCodeThreadRandomAssignPolicy() {}
+    size_t Index(void* arg, size_t range) override;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadRandomAssignPolicy);
+};
+
+class UserCodeThreadPool;
+class UserCodeThreadWorker {
+public:
+    UserCodeThreadWorker(UserCodeThreadPool* pool);
+    void UserCodeRun(UserCodeTask&& usercode);
+    void UserCodeLoop();
+    void Start();
+    void Stop();
+    void Join();
+
+private:
+    UserCodeThreadPool* _pool;
+    std::deque<UserCodeTask> _queue;
+    std::mutex _mutex;
+    std::condition_variable _cond;
+    std::thread _worker;
+    std::atomic<bool> _running;  // running flag
+};
+// "user code thread pool" configuration

Review Comment:
   加个空行



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,230 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Bthread local storage
+extern __thread bthread::LocalStorage tls_bls;
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+
+namespace brpc {
+
+DEFINE_int32(usercode_thread_pool_map_nbucket, 64 * 2,
+             "usercode thread pool map bucket size");
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto worker_id = _pool->NextWorkerId();
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%ld", pool_name.c_str(), worker_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCodeTask> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls.assigned_data = usercode.assigned_data;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCodeTask&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _next_worker_id(0) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+bool UserCodeThreadPool::Init(size_t num_threads) {
+    if (num_threads <= 0) {
+        LOG(ERROR) << "Wrong parameter for usercode thread pool init";
+        return false;
+    }
+    SetNumThreads(num_threads);
+    return true;
+}
+
+void UserCodeThreadPool::RunUserCode(void (*fn)(void*), void* arg) {
+    auto range = GetNumThreads();
+    auto index = _assign_policy->Index(arg, range);
+    auto& worker = _workers[index % range];
+    UserCodeTask usercode{fn, arg, bthread::tls_bls.assigned_data};
+    worker->UserCodeRun(std::move(usercode));
+}
+
+void UserCodeThreadPool::StopAndJoin() {
+    std::unique_lock<std::mutex> lk(_mutex);
+    for (auto& worker : _workers) {
+        worker->Stop();
+    }
+    for (auto& worker : _workers) {
+        worker->Join();
+    }
+}
+
+void UserCodeThreadPool::SetNumThreads(size_t num_threads) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    if (num_threads <= _workers.size()) {
+        return;
+    }
+    for (size_t i = _workers.size(); i < num_threads; ++i) {
+        auto worker = new UserCodeThreadWorker(this);
+        worker->Start();
+        _workers.emplace_back(worker);
+    }
+}
+
+size_t UserCodeThreadPool::NextWorkerId() {
+    return _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+}
+
+size_t UserCodeThreadPool::GetNumThreads() {
+    std::unique_lock<std::mutex> lk(_mutex);
+    return _workers.size();
+}
+
+size_t UserCodeThreadRandomAssignPolicy::Index(void*, size_t range) {
+    return butil::fast_rand() % range;
+}
+
+UserCodeThreadPoolMap UserCodeThreadPool::_thread_pool_map;
+std::once_flag UserCodeThreadPool::_thread_pool_map_once;
+
+UserCodeThreadPool* UserCodeThreadPool::GetOrCreatePool(
+    const UserCodeThreadPoolConf& conf) {
+    std::call_once(_thread_pool_map_once, [&]() {
+        if (_thread_pool_map.init(FLAGS_usercode_thread_pool_map_nbucket) !=
+            0) {
+            LOG(ERROR) << "Fail to init usercode thread pool map";
+            exit(1);
+        }
+    });
+
+    auto p = _thread_pool_map.seek(conf.pool_name);
+    if (p == nullptr) {
+        std::unique_ptr<UserCodeThreadPool> pool(new UserCodeThreadPool(
+            conf.pool_name, conf.thread_startfn, conf.assign_policy));
+        if (pool->Init(conf.num_threads) == false) {
+            return nullptr;
+        }
+        _thread_pool_map[conf.pool_name].swap(pool);
+    }
+    return _thread_pool_map.seek(conf.pool_name)->get();
+}
+
+void UserCodeThreadPool::SetPoolThreads(const std::string& pool_name,

Review Comment:
   这个方法是不是有个返回值比较好,比如pool_name找不到,num_threads值范围不对,可以返回一个错误



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1550910750

   > 另外,能不能加一些单测?
   
   done


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "serverglen (via GitHub)" <gi...@apache.org>.
serverglen commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1204945239


##########
example/usercode_thread_echo_c++/server.cpp:
##########
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include "echo.pb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8000, "TCP Port of this server");
+DEFINE_string(listen_addr, "",
+              "Server listen address, may be IPV4/IPV6/UDS."
+              " If this is set, the flag port will be ignored");
+DEFINE_int32(idle_timeout_s, -1,
+             "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+DEFINE_int32(logoff_ms, 2000,
+             "Maximum duration of server's LOGOFF state "
+             "(waiting for client to close connection before server stops)");
+DEFINE_int32(num_threads1, 2, "thread number for pool1");
+DEFINE_int32(num_threads2, 2, "thread number for pool2");
+
+// Your implementation of example::EchoService
+// Notice that implementing brpc::Describable grants the ability to put
+// additional information in /status.
+namespace example {
+butil::atomic<int> ntls(0);
+struct MyThreadLocalData {
+    MyThreadLocalData() : y(0) {
+        ntls.fetch_add(1, butil::memory_order_relaxed);
+    }
+    ~MyThreadLocalData() { ntls.fetch_sub(1, butil::memory_order_relaxed); }
+    static void deleter(void* d) { delete static_cast<MyThreadLocalData*>(d); }
+
+    int y;
+};
+
+class MyThreadLocalDataFactory : public brpc::DataFactory {
+public:
+    void* CreateData() const { return new MyThreadLocalData; }
+
+    void DestroyData(void* d) const { MyThreadLocalData::deleter(d); }
+};
+
+class EchoServiceImpl : public EchoService {
+public:
+    EchoServiceImpl(){};
+    virtual ~EchoServiceImpl(){};
+    virtual void Echo(google::protobuf::RpcController* cntl_base,
+                      const EchoRequest* request, EchoResponse* response,
+                      google::protobuf::Closure* done) {
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        MyThreadLocalData* tls =
+            static_cast<MyThreadLocalData*>(brpc::thread_local_data());
+        if (tls == NULL) {
+            cntl->SetFailed(
+                "Require ServerOptions.thread_local_data_factory "
+                "to be set with a correctly implemented instance");
+            LOG(ERROR) << cntl->ErrorText();
+            return;
+        }
+        // The purpose of following logs is to help you to understand
+        // how clients interact with servers more intuitively. You should
+        // remove these logs in performance-sensitive servers.
+        LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from "
+                  << cntl->remote_side() << " to " << cntl->local_side() << ": "
+                  << request->message()
+                  << " (attached=" << cntl->request_attachment() << ")";
+
+        // Fill response.
+        response->set_message(request->message());
+
+        // You can compress the response by setting Controller, but be aware
+        // that compression may be costly, evaluate before turning on.
+        // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
+
+        if (FLAGS_echo_attachment) {
+            // Set attachment which is wired to network directly instead of
+            // being serialized into protobuf messages.
+            cntl->response_attachment().append(cntl->request_attachment());
+        }
+    }
+    virtual void Echo2(google::protobuf::RpcController* cntl_base,
+                       const EchoRequest* request, EchoResponse* response,
+                       google::protobuf::Closure* done) {
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        MyThreadLocalData* tls =
+            static_cast<MyThreadLocalData*>(brpc::thread_local_data());
+        if (tls == NULL) {
+            cntl->SetFailed(
+                "Require ServerOptions.thread_local_data_factory "
+                "to be set with a correctly implemented instance");
+            LOG(ERROR) << cntl->ErrorText();
+            return;
+        }
+        // The purpose of following logs is to help you to understand
+        // how clients interact with servers more intuitively. You should
+        // remove these logs in performance-sensitive servers.
+        LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from "
+                  << cntl->remote_side() << " to " << cntl->local_side() << ": "
+                  << request->message()
+                  << " (attached=" << cntl->request_attachment() << ")";
+
+        // Fill response.
+        response->set_message(request->message());
+
+        // You can compress the response by setting Controller, but be aware
+        // that compression may be costly, evaluate before turning on.
+        // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
+
+        if (FLAGS_echo_attachment) {
+            // Set attachment which is wired to network directly instead of
+            // being serialized into protobuf messages.
+            cntl->response_attachment().append(cntl->request_attachment());
+        }
+    }
+};
+
+struct MyUserCodeThreadPoolArgs {
+    ::google::protobuf::Service* service;
+    const ::google::protobuf::MethodDescriptor* method;
+    ::google::protobuf::RpcController* controller;
+    const ::google::protobuf::Message* request;
+    ::google::protobuf::Message* response;
+    ::google::protobuf::Closure* done;
+};
+// user defined policy
+class MyUserCodeThreadAssignPolicy : public brpc::UserCodeThreadAssignPolicy {
+public:
+    MyUserCodeThreadAssignPolicy() {}
+    virtual ~MyUserCodeThreadAssignPolicy() {}
+    size_t Index(void* arg, size_t range) {
+        auto myArg = static_cast<MyUserCodeThreadPoolArgs*>(arg);
+        auto request = static_cast<const EchoRequest*>(myArg->request);
+        auto hash = std::hash<std::string>{}(request->message());
+        LOG(INFO) << "MyUserCodeThreadAssignPolicy message="
+                  << request->message() << " hash=" << hash
+                  << " range=" << range;
+        return hash % range;
+    }
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(MyUserCodeThreadAssignPolicy);
+};
+
+}  // namespace example
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // Instance of your service.
+    example::EchoServiceImpl echo_service_impl;
+
+    // Generally you only need one Server.
+    brpc::Server server;
+
+    // Add the service into server. Notice the second parameter, because the
+    // service is put on stack, we don't want server to delete it, otherwise
+    // use brpc::SERVER_OWNS_SERVICE.
+    if (server.AddService(&echo_service_impl,
+                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+        LOG(ERROR) << "Fail to add service";
+        return -1;
+    }
+
+    // use number string to setup num threads
+    // brpc::UserCodeThreadPoolConf conf("p1", "2");
+
+    // use gflag string to setup num threads
+    brpc::UserCodeThreadRandomAssignPolicy policy;
+    brpc::UserCodeThreadPoolConf conf("p1", FLAGS_num_threads1, nullptr,
+                                      &policy);

Review Comment:
   &policy可以改成DefaultUserCodeThreadAssignPolicy()



##########
example/usercode_thread_echo_c++/server.cpp:
##########
@@ -0,0 +1,247 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+// A server to receive EchoRequest and send back EchoResponse.
+
+#include <gflags/gflags.h>
+#include <butil/logging.h>
+#include <brpc/server.h>
+#include "echo.pb.h"
+
+DEFINE_bool(echo_attachment, true, "Echo attachment as well");
+DEFINE_int32(port, 8000, "TCP Port of this server");
+DEFINE_string(listen_addr, "",
+              "Server listen address, may be IPV4/IPV6/UDS."
+              " If this is set, the flag port will be ignored");
+DEFINE_int32(idle_timeout_s, -1,
+             "Connection will be closed if there is no "
+             "read/write operations during the last `idle_timeout_s'");
+DEFINE_int32(logoff_ms, 2000,
+             "Maximum duration of server's LOGOFF state "
+             "(waiting for client to close connection before server stops)");
+DEFINE_int32(num_threads1, 2, "thread number for pool1");
+DEFINE_int32(num_threads2, 2, "thread number for pool2");
+
+// Your implementation of example::EchoService
+// Notice that implementing brpc::Describable grants the ability to put
+// additional information in /status.
+namespace example {
+butil::atomic<int> ntls(0);
+struct MyThreadLocalData {
+    MyThreadLocalData() : y(0) {
+        ntls.fetch_add(1, butil::memory_order_relaxed);
+    }
+    ~MyThreadLocalData() { ntls.fetch_sub(1, butil::memory_order_relaxed); }
+    static void deleter(void* d) { delete static_cast<MyThreadLocalData*>(d); }
+
+    int y;
+};
+
+class MyThreadLocalDataFactory : public brpc::DataFactory {
+public:
+    void* CreateData() const { return new MyThreadLocalData; }
+
+    void DestroyData(void* d) const { MyThreadLocalData::deleter(d); }
+};
+
+class EchoServiceImpl : public EchoService {
+public:
+    EchoServiceImpl(){};
+    virtual ~EchoServiceImpl(){};
+    virtual void Echo(google::protobuf::RpcController* cntl_base,
+                      const EchoRequest* request, EchoResponse* response,
+                      google::protobuf::Closure* done) {
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        MyThreadLocalData* tls =
+            static_cast<MyThreadLocalData*>(brpc::thread_local_data());
+        if (tls == NULL) {
+            cntl->SetFailed(
+                "Require ServerOptions.thread_local_data_factory "
+                "to be set with a correctly implemented instance");
+            LOG(ERROR) << cntl->ErrorText();
+            return;
+        }
+        // The purpose of following logs is to help you to understand
+        // how clients interact with servers more intuitively. You should
+        // remove these logs in performance-sensitive servers.
+        LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from "
+                  << cntl->remote_side() << " to " << cntl->local_side() << ": "
+                  << request->message()
+                  << " (attached=" << cntl->request_attachment() << ")";
+
+        // Fill response.
+        response->set_message(request->message());
+
+        // You can compress the response by setting Controller, but be aware
+        // that compression may be costly, evaluate before turning on.
+        // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
+
+        if (FLAGS_echo_attachment) {
+            // Set attachment which is wired to network directly instead of
+            // being serialized into protobuf messages.
+            cntl->response_attachment().append(cntl->request_attachment());
+        }
+    }
+    virtual void Echo2(google::protobuf::RpcController* cntl_base,
+                       const EchoRequest* request, EchoResponse* response,
+                       google::protobuf::Closure* done) {
+        // This object helps you to call done->Run() in RAII style. If you need
+        // to process the request asynchronously, pass done_guard.release().
+        brpc::ClosureGuard done_guard(done);
+
+        brpc::Controller* cntl = static_cast<brpc::Controller*>(cntl_base);
+
+        MyThreadLocalData* tls =
+            static_cast<MyThreadLocalData*>(brpc::thread_local_data());
+        if (tls == NULL) {
+            cntl->SetFailed(
+                "Require ServerOptions.thread_local_data_factory "
+                "to be set with a correctly implemented instance");
+            LOG(ERROR) << cntl->ErrorText();
+            return;
+        }
+        // The purpose of following logs is to help you to understand
+        // how clients interact with servers more intuitively. You should
+        // remove these logs in performance-sensitive servers.
+        LOG(INFO) << "Received request[log_id=" << cntl->log_id() << "] from "
+                  << cntl->remote_side() << " to " << cntl->local_side() << ": "
+                  << request->message()
+                  << " (attached=" << cntl->request_attachment() << ")";
+
+        // Fill response.
+        response->set_message(request->message());
+
+        // You can compress the response by setting Controller, but be aware
+        // that compression may be costly, evaluate before turning on.
+        // cntl->set_response_compress_type(brpc::COMPRESS_TYPE_GZIP);
+
+        if (FLAGS_echo_attachment) {
+            // Set attachment which is wired to network directly instead of
+            // being serialized into protobuf messages.
+            cntl->response_attachment().append(cntl->request_attachment());
+        }
+    }
+};
+
+struct MyUserCodeThreadPoolArgs {
+    ::google::protobuf::Service* service;
+    const ::google::protobuf::MethodDescriptor* method;
+    ::google::protobuf::RpcController* controller;
+    const ::google::protobuf::Message* request;
+    ::google::protobuf::Message* response;
+    ::google::protobuf::Closure* done;
+};
+// user defined policy
+class MyUserCodeThreadAssignPolicy : public brpc::UserCodeThreadAssignPolicy {
+public:
+    MyUserCodeThreadAssignPolicy() {}
+    virtual ~MyUserCodeThreadAssignPolicy() {}
+    size_t Index(void* arg, size_t range) {
+        auto myArg = static_cast<MyUserCodeThreadPoolArgs*>(arg);
+        auto request = static_cast<const EchoRequest*>(myArg->request);
+        auto hash = std::hash<std::string>{}(request->message());
+        LOG(INFO) << "MyUserCodeThreadAssignPolicy message="
+                  << request->message() << " hash=" << hash
+                  << " range=" << range;
+        return hash % range;
+    }
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(MyUserCodeThreadAssignPolicy);
+};
+
+}  // namespace example
+
+int main(int argc, char* argv[]) {
+    // Parse gflags. We recommend you to use gflags as well.
+    GFLAGS_NS::ParseCommandLineFlags(&argc, &argv, true);
+
+    // Instance of your service.
+    example::EchoServiceImpl echo_service_impl;
+
+    // Generally you only need one Server.
+    brpc::Server server;
+
+    // Add the service into server. Notice the second parameter, because the
+    // service is put on stack, we don't want server to delete it, otherwise
+    // use brpc::SERVER_OWNS_SERVICE.
+    if (server.AddService(&echo_service_impl,
+                          brpc::SERVER_DOESNT_OWN_SERVICE) != 0) {
+        LOG(ERROR) << "Fail to add service";
+        return -1;
+    }
+
+    // use number string to setup num threads
+    // brpc::UserCodeThreadPoolConf conf("p1", "2");
+
+    // use gflag string to setup num threads
+    brpc::UserCodeThreadRandomAssignPolicy policy;

Review Comment:
   应该不需要了



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1864160788

   a better method see #2358 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525527773

   > 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗?
   
   @chenBright 这个有啥建议吗?Map里面存了裸指针,析构的时候要手动遍历调用delete,感觉不太优雅。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191056242


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;

Review Comment:
   这个能解释一下场景吗?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "chenBright (via GitHub)" <gi...@apache.org>.
chenBright commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1549549620

   只支持baidu协议吗?其他协议也应该支持吧?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1605894348

   和主干冲突了 @yanglimingcn 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn closed pull request #2215: add usercode thread pool for method
URL: https://github.com/apache/brpc/pull/2215


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191057326


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   这个主要是想要做一个能动态修改线程数的功能。



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191120948


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   这块在server里面加个方法?SetNumThreads(pool_name, number)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191120948


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   这块在server里面加个方法?SetNumThreads(pool_name, number)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191867372


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);
+    if (ret) {
+        if (info.type != "int32") {
+            LOG(ERROR) << "Failed gflags type need int32";
+            return false;
+        }
+        _gflag_num_threads = static_cast<const int32_t*>(info.flag_ptr);
+        num = *_gflag_num_threads;
+    } else {
+        num = std::atoi(num_threads.c_str());
+    }
+    if (num <= 0) {
+        LOG(ERROR) << "Failed parameter for usercode pool init";
+        return false;
+    }
+    SetNumThreads(num);
+    return true;
+}
+
+void UserCodeThreadPool::RunUserCode(void (*fn)(void*), void* arg) {
+    if (_gflag_num_threads != nullptr) {
+        size_t num_threads = *_gflag_num_threads;
+        if (num_threads != _workers.size()) {
+            SetNumThreads(num_threads);
+        }
+    }
+    auto range = GetNumThreads();
+    auto index = _assign_policy->Index(arg, range);
+    auto& worker = _workers[index];

Review Comment:
   OK



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1201470417


##########
src/brpc/server.h:
##########
@@ -379,6 +380,7 @@ class Server {
         const google::protobuf::MethodDescriptor* method;
         MethodStatus* status;
         AdaptiveMaxConcurrency max_concurrency;
+        UserCodeThreadPool* thread_pool;

Review Comment:
   这里增加了成员,但没有在MethodProperty的构造函数中对它进行初始化



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1554164103

   > 有个BuiltinServiceTest.rpcz的case总是挂,看看是不是和这次修改的代码有关?
   
   我本地复线不了,理论上没有去配置的话,不会走这些代码逻辑的。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1526872291

   > std::unordered_map也要求value支持拷贝吧。
   
   类似这样,就可以
   std::vector<std::unique_ptr<std::thread>> _threads;  // threads
   _threads.emplace_back(new std::thread(UserCodeRunner, this));
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1522947950

   和主干冲突了


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1190831296


##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;
+}
+namespace brpc {
+// Store pending user code.
+struct UserCode {

Review Comment:
   UserCodeTask



##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;
+}
+namespace brpc {
+// Store pending user code.
+struct UserCode {
+    void (*fn)(void*);
+    void* arg;
+    bthread::LocalStorage tls_bls;
+};
+
+class UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadAssignPolicy() {}
+    virtual ~UserCodeThreadAssignPolicy() {}
+    virtual size_t Index(void* arg, size_t range) = 0;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy);
+};
+
+class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadRandomAssignPolicy() {}
+    virtual ~UserCodeThreadRandomAssignPolicy() {}
+    size_t Index(void* arg, size_t range) override;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadRandomAssignPolicy);
+};
+
+class UserCodeThreadPool;
+class UserCodeThreadWorker {
+public:
+    UserCodeThreadWorker(UserCodeThreadPool* pool);
+    void UserCodeRun(UserCode&& usercode);
+    void UserCodeLoop();
+    void Start();
+    void Stop();
+    void Join();
+
+private:
+    UserCodeThreadPool* _pool;
+    std::deque<UserCode> _queue;
+    std::mutex _mutex;
+    std::condition_variable _cond;
+    std::thread _worker;
+    std::atomic<bool> _running;               // running flag
+    static std::atomic<int> _next_worker_id;  // worker id
+};
+// "user code thread pool" configuration
+struct UserCodeThreadPoolConf {
+    UserCodeThreadPoolConf(const std::string& pool_name,
+                           const std::string& num_threads,
+                           const std::function<void()>& startfn,
+                           UserCodeThreadAssignPolicy* policy)
+        : pool_name(pool_name),
+          num_threads(num_threads),
+          thread_startfn(startfn),
+          assign_policy(policy) {}
+    std::string pool_name;                      // pool name
+    std::string num_threads;                    // thread number
+    std::function<void()> thread_startfn;       // thread start function
+    UserCodeThreadAssignPolicy* assign_policy;  // thread assign policy
+};
+// "user code thread pool" is a set of pthreads to allow run user code in this
+// pool for some methods
+class UserCodeThreadPool {
+    static double GetInPoolElapseInSecond(void*);

Review Comment:
   private的成员是不是放在public后面



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);
+    if (ret) {
+        if (info.type != "int32") {
+            LOG(ERROR) << "Failed gflags type need int32";
+            return false;
+        }
+        _gflag_num_threads = static_cast<const int32_t*>(info.flag_ptr);
+        num = *_gflag_num_threads;
+    } else {
+        num = std::atoi(num_threads.c_str());
+    }
+    if (num <= 0) {
+        LOG(ERROR) << "Failed parameter for usercode pool init";
+        return false;
+    }
+    SetNumThreads(num);
+    return true;
+}
+
+void UserCodeThreadPool::RunUserCode(void (*fn)(void*), void* arg) {
+    if (_gflag_num_threads != nullptr) {
+        size_t num_threads = *_gflag_num_threads;
+        if (num_threads != _workers.size()) {
+            SetNumThreads(num_threads);
+        }
+    }
+    auto range = GetNumThreads();
+    auto index = _assign_policy->Index(arg, range);
+    auto& worker = _workers[index];

Review Comment:
   是不是得判断一下index有没有越界



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;

Review Comment:
   这个可能有坑,tls_bls里面的对象有可能在bthread worker里释放了,但是usercode worker里还在用



##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;
+}
+namespace brpc {
+// Store pending user code.
+struct UserCode {
+    void (*fn)(void*);
+    void* arg;
+    bthread::LocalStorage tls_bls;
+};
+
+class UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadAssignPolicy() {}
+    virtual ~UserCodeThreadAssignPolicy() {}
+    virtual size_t Index(void* arg, size_t range) = 0;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy);
+};
+
+class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadRandomAssignPolicy() {}
+    virtual ~UserCodeThreadRandomAssignPolicy() {}
+    size_t Index(void* arg, size_t range) override;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadRandomAssignPolicy);
+};
+
+class UserCodeThreadPool;
+class UserCodeThreadWorker {
+public:
+    UserCodeThreadWorker(UserCodeThreadPool* pool);
+    void UserCodeRun(UserCode&& usercode);
+    void UserCodeLoop();
+    void Start();
+    void Stop();
+    void Join();
+
+private:
+    UserCodeThreadPool* _pool;
+    std::deque<UserCode> _queue;
+    std::mutex _mutex;
+    std::condition_variable _cond;
+    std::thread _worker;
+    std::atomic<bool> _running;               // running flag
+    static std::atomic<int> _next_worker_id;  // worker id
+};
+// "user code thread pool" configuration
+struct UserCodeThreadPoolConf {
+    UserCodeThreadPoolConf(const std::string& pool_name,
+                           const std::string& num_threads,
+                           const std::function<void()>& startfn,
+                           UserCodeThreadAssignPolicy* policy)
+        : pool_name(pool_name),
+          num_threads(num_threads),
+          thread_startfn(startfn),
+          assign_policy(policy) {}
+    std::string pool_name;                      // pool name
+    std::string num_threads;                    // thread number
+    std::function<void()> thread_startfn;       // thread start function
+    UserCodeThreadAssignPolicy* assign_policy;  // thread assign policy
+};
+// "user code thread pool" is a set of pthreads to allow run user code in this
+// pool for some methods
+class UserCodeThreadPool {
+    static double GetInPoolElapseInSecond(void*);
+    static size_t GetUserCodeThreadSize(void*);
+    void StopAndJoin();
+    void SetNumThreads(size_t);
+    size_t GetNumThreads();
+
+public:
+    bvar::Adder<size_t> inpool_count;

Review Comment:
   这些成员需要是public吗



##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,136 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+namespace bthread {
+extern __thread bthread::LocalStorage tls_bls;

Review Comment:
   这个需要暴露在头文件里吗,放在cpp里就可以吧



##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   感觉这里不要耦合gflag吧,直接传一个int进来是不是就可以



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191866701


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   那这个方法感觉得是全局的,然后能访问到这个Thread Pool,这个Thread Pool现在是定义在server.cpp里面了,一般用户使用Server的时候是一个局部变量。这块你有啥建议吗?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191120117


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   这块在server里面加个方法?SetNumThreads(pool_name, number)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "serverglen (via GitHub)" <gi...@apache.org>.
serverglen commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1202585354


##########
src/brpc/usercode_thread_pool.h:
##########
@@ -0,0 +1,143 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#ifndef BRPC_USERCODE_THREAD_POOL_H
+#define BRPC_USERCODE_THREAD_POOL_H
+
+#include <deque>
+#include <mutex>
+#include <functional>
+#include <thread>
+#include <condition_variable>
+#include <gflags/gflags_declare.h>
+#include "butil/atomicops.h"
+#include "bvar/bvar.h"
+#include "bthread/task_meta.h"
+#include "butil/containers/case_ignored_flat_map.h"  // [CaseIgnored]FlatMap
+
+namespace brpc {
+// Store pending user code.
+struct UserCodeTask {
+    void (*fn)(void*);
+    void* arg;
+    void* assigned_data;
+};
+
+class UserCodeThreadAssignPolicy {
+public:
+    UserCodeThreadAssignPolicy() {}
+    virtual ~UserCodeThreadAssignPolicy() {}
+    virtual size_t Index(void* arg, size_t range) = 0;
+
+private:
+    DISALLOW_COPY_AND_ASSIGN(UserCodeThreadAssignPolicy);
+};
+
+class UserCodeThreadRandomAssignPolicy : public UserCodeThreadAssignPolicy {

Review Comment:
   是不是加一个
   const UserCodeThreadAssignPolicy* DefaultUserCodeThreadAssignPolicy() 函数 稍微好点?类似 DefaultRetryPolicy();
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1562753543

   如果是pthread的话可以保证它肯定只执行UserCode的任务,但是如果是ExecutionQueue的话,有可能中间bthread切走了,被其它的bthread任务插入,那是不是还是解决不了隔离的问题?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1515725562

   @wwbmmm 有时间review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1524494146

   > 需要考虑线程池退出问题吗?
   
   这块参考了usercode backup pool的实现,它里面有个注释,所以我这块好像也得按照这个方式处理。
   int UserCodeBackupPool::Init() {
       // Like bthread workers, these threads never quit (to avoid potential hang
       // during termination of program).
       for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) {
           pthread_t th;
           if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) {
               LOG(ERROR) << "Fail to create UserCodeRunner";
               return -1;
           }
       }
       return 0;
   }


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1524495658

   另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "chenBright (via GitHub)" <gi...@apache.org>.
chenBright commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525594120

   > std::unordered_map
   
   std::unordered_map也要求value支持拷贝吧。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1554079245

   有个BuiltinServiceTest.rpcz的case总是挂,看看是不是和这次修改的代码有关?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1562762813

   嗯,比如现在服务有3个方法,worker线程一共有10个,method1不配置线程池,method2配置2个 method3配置3个,这样能保证method2和method3最多能用5个,如果method1请求太多的话,method2和method3将少于5个。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1562734266

   @wwbmmm @serverglen @chenBright 我现在对这个feature又有一点思考,想和你们沟通一下,现在UserCodeThreadWorker是一个pthread线程,和brpc的worker是分离的。我想如果UserCodeThreadWorker是一个ExecutionQueue的话其实是可以共用brpc worker的。然后不同方法配置不同数量的ExecutionQueue,也就实现了不同方法对brpc worker的分配。这样的好处是用户的代码还是bthread上下文。缺点是阻塞的系统调用会阻塞worker,但是本身这就是bthread的性质。想听听你们的意见。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1558722470

   LGTM


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1535868157

   > > > > 另外我有个问题,因为这个ThreadPool是不能拷贝的,放到FlatMap现在只能放入指针,有什么好的建议让这块更优雅一些吗?
   > > > 
   > > > 
   > > > @chenBright 这个有啥建议吗?Map里面存了裸指针,析构的时候要手动遍历调用delete,感觉不太优雅。
   > > 
   > > 
   > > 不想自己delete,那就用智能指针吧。没有特别好的办法,看看大家有没有啥建议咯。
   > 
   > flatmap放unique_ptr好像也不行,或者不用flatmap,用std::unordered_map ?
   
   这个问题解决了,flatmap可以存unique_ptr,gejun在前面提了个patch,可以支持了。只是insert接口这块没有很好的支持。


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on code in PR #2215:
URL: https://github.com/apache/brpc/pull/2215#discussion_r1191965402


##########
src/brpc/usercode_thread_pool.cpp:
##########
@@ -0,0 +1,210 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "brpc/usercode_thread_pool.h"
+#include <deque>
+#include <vector>
+#include <gflags/gflags.h>
+#include "butil/threading/platform_thread.h"
+
+namespace bthread {
+// Defined in bthread/task_control.cpp
+void run_worker_startfn();
+}  // namespace bthread
+namespace brpc {
+
+std::atomic<int> UserCodeThreadWorker::_next_worker_id(0);
+
+static void* UserCodeRunner(void* args) {
+    static_cast<UserCodeThreadWorker*>(args)->UserCodeLoop();
+    return NULL;
+}
+
+UserCodeThreadWorker::UserCodeThreadWorker(UserCodeThreadPool* pool)
+    : _pool(pool), _running(true) {}
+
+// Entry of backup thread for running user code.
+void UserCodeThreadWorker::UserCodeLoop() {
+    auto pool_name = _pool->pool_name();
+    auto thread_id = _next_worker_id.fetch_add(1, butil::memory_order_relaxed);
+    std::string thread_name =
+        butil::string_printf("usercode_%s:%d", pool_name.c_str(), thread_id);
+    butil::PlatformThread::SetName(thread_name.c_str());
+    auto startfn = _pool->thread_startfn();
+    if (startfn) {
+        startfn();
+    } else {
+        bthread::run_worker_startfn();
+    }
+
+    VLOG(1) << "start thread " << thread_name;
+
+    int64_t last_time = butil::cpuwide_time_us();
+    while (true) {
+        bool blocked = false;
+        std::deque<UserCode> usercodes;
+        {
+            std::unique_lock<std::mutex> lk(_mutex);
+            _cond.wait(lk, [&]() {
+                if (!_queue.empty() ||
+                    !_running.load(std::memory_order_relaxed)) {
+                    return true;
+                } else {
+                    blocked = true;
+                    return false;
+                }
+            });
+            if (!_running.load(std::memory_order_relaxed)) {
+                break;
+            }
+            usercodes = std::move(_queue);
+            _queue = {};
+        }
+        const int64_t begin_time =
+            (blocked ? butil::cpuwide_time_us() : last_time);
+        for (auto& usercode : usercodes) {
+            bthread::tls_bls = usercode.tls_bls;
+            usercode.fn(usercode.arg);
+        }
+        const int64_t end_time = butil::cpuwide_time_us();
+        _pool->inpool_count << usercodes.size();
+        _pool->inpool_elapse_us << (end_time - begin_time);
+        last_time = end_time;
+    }
+
+    VLOG(1) << "exit thread " << thread_name;
+}
+
+void UserCodeThreadWorker::UserCodeRun(UserCode&& usercode) {
+    std::unique_lock<std::mutex> lk(_mutex);
+    _queue.emplace_back(std::move(usercode));
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Start() {
+    _worker = std::thread(UserCodeRunner, this);
+}
+
+void UserCodeThreadWorker::Stop() {
+    _running.store(false, std::memory_order_relaxed);
+    std::unique_lock<std::mutex> lk(_mutex);
+    _cond.notify_one();
+}
+
+void UserCodeThreadWorker::Join() {
+    if (_worker.joinable()) {
+        _worker.join();
+    }
+}
+
+double UserCodeThreadPool::GetInPoolElapseInSecond(void* arg) {
+    return static_cast<bvar::Adder<int64_t>*>(arg)->get_value() / 1000000.0;
+}
+
+size_t UserCodeThreadPool::GetUserCodeThreadSize(void* arg) {
+    auto pool = static_cast<UserCodeThreadPool*>(arg);
+    return pool->_workers.size();
+}
+
+UserCodeThreadPool::UserCodeThreadPool(const std::string& pool_name,
+                                       const std::function<void()>& startfn,
+                                       UserCodeThreadAssignPolicy* policy)
+    : inpool_per_second("rpc_usercode_thread_pool_second", pool_name,
+                        &inpool_count),
+      inpool_elapse_s(GetInPoolElapseInSecond, &inpool_elapse_us),
+      pool_usage("rpc_usercode_thread_pool_usage", pool_name, &inpool_elapse_s,
+                 1),
+      thread_count("rpc_usercode_thread_num_threads", pool_name,
+                   GetUserCodeThreadSize, this),
+      _pool_name(pool_name),
+      _thread_startfn(startfn),
+      _assign_policy(policy),
+      _gflag_num_threads(nullptr) {}
+
+UserCodeThreadPool::~UserCodeThreadPool() { StopAndJoin(); }
+
+// TODO(yangliming): use c++17 std::variant
+bool UserCodeThreadPool::Init(const std::string& num_threads) {
+    // Like bthread workers, these threads never quit (to avoid potential hang
+    // during termination of program).
+    int32_t num = 0;
+    GFLAGS_NS::CommandLineFlagInfo info;
+    auto ret = GFLAGS_NS::GetCommandLineFlagInfo(num_threads.c_str(), &info);

Review Comment:
   ok



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1543612341

   @yanglimingcn 能不能添加一个使用文档说明


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "wwbmmm (via GitHub)" <gi...@apache.org>.
wwbmmm commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1549494144

   另外,能不能加一些单测?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "yanglimingcn (via GitHub)" <gi...@apache.org>.
yanglimingcn commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1683309772

   #2358


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "chenBright (via GitHub)" <gi...@apache.org>.
chenBright commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1522840023

   需要考虑线程池退出问题吗?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org


Re: [PR] add usercode thread pool for method (brpc)

Posted by "chenBright (via GitHub)" <gi...@apache.org>.
chenBright commented on PR #2215:
URL: https://github.com/apache/brpc/pull/2215#issuecomment-1525295355

   > > 需要考虑线程池退出问题吗?
   > 
   > 这块参考了usercode backup pool的实现,它里面有个注释,所以我这块好像也得按照这个方式处理。 int UserCodeBackupPool::Init() { // Like bthread workers, these threads never quit (to avoid potential hang // during termination of program). for (int i = 0; i < FLAGS_usercode_backup_threads; ++i) { pthread_t th; if (pthread_create(&th, NULL, UserCodeRunner, this) != 0) { LOG(ERROR) << "Fail to create UserCodeRunner"; return -1; } } return 0; }
   
   UserCodeBackupPool是框架里使用的,进程运行时是不会析构的。
   ```c++
   static UserCodeBackupPool* s_usercode_pool = NULL;
   ```
   
   该PR里的UserCodeThreadPool是给用户使用的,UserCodeThreadPool有可能会析构,但是线程池里的线程没有退出,有crash的可能。
   
   在UserCodeThreadPool析构函数中做一下stop and join操作,是不是合理一些呢?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@brpc.apache.org
For additional commands, e-mail: dev-help@brpc.apache.org