You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:20 UTC
[rocketmq-client-cpp] 03/29: fix(executor): incorrect param pass to
forward
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit c595776c5939982da4a913811a95db7fbb37f07e
Author: James Yin <yw...@hotmail.com>
AuthorDate: Thu Jul 16 08:42:12 2020 +0800
fix(executor): incorrect param pass to forward
---
src/concurrent/concurrent_queue.hpp | 7 ++++---
src/concurrent/executor.hpp | 10 +++++-----
src/concurrent/executor_impl.hpp | 7 ++++---
src/concurrent/thread_group.hpp | 2 +-
4 files changed, 14 insertions(+), 12 deletions(-)
diff --git a/src/concurrent/concurrent_queue.hpp b/src/concurrent/concurrent_queue.hpp
index 5a67e8f..20ae5f5 100644
--- a/src/concurrent/concurrent_queue.hpp
+++ b/src/concurrent/concurrent_queue.hpp
@@ -38,8 +38,9 @@ class concurrent_queue_node {
private:
template <typename E,
typename std::enable_if<std::is_same<typename std::decay<E>::type, value_type>::value, int>::type = 0>
- explicit concurrent_queue_node(E v) : value_(new value_type(std::forward<E>(v))) {}
+ explicit concurrent_queue_node(E&& v) : value_(new value_type(std::forward<E>(v))) {}
+ // for pointer
template <class E, typename std::enable_if<std::is_convertible<E, value_type*>::value, int>::type = 0>
explicit concurrent_queue_node(E v) : value_(v) {}
@@ -76,7 +77,7 @@ class concurrent_queue {
bool empty() { return sentinel == tail_.load(); }
template <class E>
- void push_back(E v) {
+ void push_back(E&& v) {
auto* node = new node_type(std::forward<E>(v));
push_back_impl(node);
}
@@ -144,7 +145,7 @@ class concurrent_queue {
}
}
- std::atomic<node_type*> head_, tail_;
+ std::atomic<node_type *> head_, tail_;
node_type* const sentinel;
bool _clear_when_destruct;
};
diff --git a/src/concurrent/executor.hpp b/src/concurrent/executor.hpp
index 3ed5ad5..7f530d9 100644
--- a/src/concurrent/executor.hpp
+++ b/src/concurrent/executor.hpp
@@ -28,12 +28,12 @@ namespace rocketmq {
typedef std::function<void()> handler_type;
struct executor_handler {
- handler_type handler_;
+ const handler_type handler_;
std::unique_ptr<std::promise<void>> promise_;
template <typename H,
typename std::enable_if<std::is_same<typename std::decay<H>::type, handler_type>::value, int>::type = 0>
- explicit executor_handler(H handler)
+ explicit executor_handler(H&& handler)
: handler_(std::forward<handler_type>(handler)), promise_(new std::promise<void>) {}
void operator()() noexcept {
@@ -51,9 +51,9 @@ struct executor_handler {
}
}
- template <class _Ep>
- void abort(_Ep&& exception) noexcept {
- promise_->set_exception(std::make_exception_ptr(std::forward<_Ep>(exception)));
+ template <class E>
+ void abort(E&& exception) noexcept {
+ promise_->set_exception(std::make_exception_ptr(std::forward<E>(exception)));
}
private:
diff --git a/src/concurrent/executor_impl.hpp b/src/concurrent/executor_impl.hpp
index 0c0a0cb..10acdd5 100644
--- a/src/concurrent/executor_impl.hpp
+++ b/src/concurrent/executor_impl.hpp
@@ -31,7 +31,7 @@ namespace rocketmq {
class abstract_executor_service : virtual public executor_service {
public:
std::future<void> submit(const handler_type& task) override {
- std::unique_ptr<executor_handler> handler(new executor_handler(task));
+ std::unique_ptr<executor_handler> handler(new executor_handler(const_cast<handler_type&>(task)));
std::future<void> fut = handler->promise_->get_future();
execute(std::move(handler));
return fut;
@@ -132,7 +132,7 @@ struct scheduled_executor_handler : public executor_handler {
template <typename H,
typename std::enable_if<std::is_same<typename std::decay<H>::type, handler_type>::value, int>::type = 0>
- explicit scheduled_executor_handler(H handler, const std::chrono::steady_clock::time_point& time)
+ explicit scheduled_executor_handler(H&& handler, const std::chrono::steady_clock::time_point& time)
: executor_handler(std::forward<handler_type>(handler)), wakeup_time_(time) {}
bool operator<(const scheduled_executor_handler& other) const { return (wakeup_time_ > other.wakeup_time_); }
@@ -232,7 +232,8 @@ class scheduled_thread_pool_executor : public thread_pool_executor, virtual publ
std::future<void> schedule(const handler_type& task, long delay, time_unit unit) override {
auto time_point = until_time_point(delay, unit);
- std::unique_ptr<scheduled_executor_handler> handler(new scheduled_executor_handler(task, time_point));
+ std::unique_ptr<scheduled_executor_handler> handler(
+ new scheduled_executor_handler(const_cast<handler_type&>(task), time_point));
std::future<void> fut = handler->promise_->get_future();
{
diff --git a/src/concurrent/thread_group.hpp b/src/concurrent/thread_group.hpp
index a7ea833..59bcd9a 100644
--- a/src/concurrent/thread_group.hpp
+++ b/src/concurrent/thread_group.hpp
@@ -29,7 +29,7 @@ class thread_group {
template <typename Function>
thread_group(const std::string& name, Function f, std::size_t num_threads) : name_(name), first_(nullptr) {
- create_thread(f, num_threads);
+ create_threads(f, num_threads);
}
// Destructor joins any remaining threads in the group.