You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/12/29 03:36:20 UTC

[rocketmq-client-cpp] 03/29: fix(executor): incorrect param pass to forward

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit c595776c5939982da4a913811a95db7fbb37f07e
Author: James Yin <yw...@hotmail.com>
AuthorDate: Thu Jul 16 08:42:12 2020 +0800

    fix(executor): incorrect param pass to forward
---
 src/concurrent/concurrent_queue.hpp |  7 ++++---
 src/concurrent/executor.hpp         | 10 +++++-----
 src/concurrent/executor_impl.hpp    |  7 ++++---
 src/concurrent/thread_group.hpp     |  2 +-
 4 files changed, 14 insertions(+), 12 deletions(-)

diff --git a/src/concurrent/concurrent_queue.hpp b/src/concurrent/concurrent_queue.hpp
index 5a67e8f..20ae5f5 100644
--- a/src/concurrent/concurrent_queue.hpp
+++ b/src/concurrent/concurrent_queue.hpp
@@ -38,8 +38,9 @@ class concurrent_queue_node {
  private:
   template <typename E,
             typename std::enable_if<std::is_same<typename std::decay<E>::type, value_type>::value, int>::type = 0>
-  explicit concurrent_queue_node(E v) : value_(new value_type(std::forward<E>(v))) {}
+  explicit concurrent_queue_node(E&& v) : value_(new value_type(std::forward<E>(v))) {}
 
+  // for pointer
   template <class E, typename std::enable_if<std::is_convertible<E, value_type*>::value, int>::type = 0>
   explicit concurrent_queue_node(E v) : value_(v) {}
 
@@ -76,7 +77,7 @@ class concurrent_queue {
   bool empty() { return sentinel == tail_.load(); }
 
   template <class E>
-  void push_back(E v) {
+  void push_back(E&& v) {
     auto* node = new node_type(std::forward<E>(v));
     push_back_impl(node);
   }
@@ -144,7 +145,7 @@ class concurrent_queue {
     }
   }
 
-  std::atomic<node_type*> head_, tail_;
+  std::atomic<node_type *> head_, tail_;
   node_type* const sentinel;
   bool _clear_when_destruct;
 };
diff --git a/src/concurrent/executor.hpp b/src/concurrent/executor.hpp
index 3ed5ad5..7f530d9 100644
--- a/src/concurrent/executor.hpp
+++ b/src/concurrent/executor.hpp
@@ -28,12 +28,12 @@ namespace rocketmq {
 typedef std::function<void()> handler_type;
 
 struct executor_handler {
-  handler_type handler_;
+  const handler_type handler_;
   std::unique_ptr<std::promise<void>> promise_;
 
   template <typename H,
             typename std::enable_if<std::is_same<typename std::decay<H>::type, handler_type>::value, int>::type = 0>
-  explicit executor_handler(H handler)
+  explicit executor_handler(H&& handler)
       : handler_(std::forward<handler_type>(handler)), promise_(new std::promise<void>) {}
 
   void operator()() noexcept {
@@ -51,9 +51,9 @@ struct executor_handler {
     }
   }
 
-  template <class _Ep>
-  void abort(_Ep&& exception) noexcept {
-    promise_->set_exception(std::make_exception_ptr(std::forward<_Ep>(exception)));
+  template <class E>
+  void abort(E&& exception) noexcept {
+    promise_->set_exception(std::make_exception_ptr(std::forward<E>(exception)));
   }
 
  private:
diff --git a/src/concurrent/executor_impl.hpp b/src/concurrent/executor_impl.hpp
index 0c0a0cb..10acdd5 100644
--- a/src/concurrent/executor_impl.hpp
+++ b/src/concurrent/executor_impl.hpp
@@ -31,7 +31,7 @@ namespace rocketmq {
 class abstract_executor_service : virtual public executor_service {
  public:
   std::future<void> submit(const handler_type& task) override {
-    std::unique_ptr<executor_handler> handler(new executor_handler(task));
+    std::unique_ptr<executor_handler> handler(new executor_handler(const_cast<handler_type&>(task)));
     std::future<void> fut = handler->promise_->get_future();
     execute(std::move(handler));
     return fut;
@@ -132,7 +132,7 @@ struct scheduled_executor_handler : public executor_handler {
 
   template <typename H,
             typename std::enable_if<std::is_same<typename std::decay<H>::type, handler_type>::value, int>::type = 0>
-  explicit scheduled_executor_handler(H handler, const std::chrono::steady_clock::time_point& time)
+  explicit scheduled_executor_handler(H&& handler, const std::chrono::steady_clock::time_point& time)
       : executor_handler(std::forward<handler_type>(handler)), wakeup_time_(time) {}
 
   bool operator<(const scheduled_executor_handler& other) const { return (wakeup_time_ > other.wakeup_time_); }
@@ -232,7 +232,8 @@ class scheduled_thread_pool_executor : public thread_pool_executor, virtual publ
 
   std::future<void> schedule(const handler_type& task, long delay, time_unit unit) override {
     auto time_point = until_time_point(delay, unit);
-    std::unique_ptr<scheduled_executor_handler> handler(new scheduled_executor_handler(task, time_point));
+    std::unique_ptr<scheduled_executor_handler> handler(
+        new scheduled_executor_handler(const_cast<handler_type&>(task), time_point));
     std::future<void> fut = handler->promise_->get_future();
 
     {
diff --git a/src/concurrent/thread_group.hpp b/src/concurrent/thread_group.hpp
index a7ea833..59bcd9a 100644
--- a/src/concurrent/thread_group.hpp
+++ b/src/concurrent/thread_group.hpp
@@ -29,7 +29,7 @@ class thread_group {
 
   template <typename Function>
   thread_group(const std::string& name, Function f, std::size_t num_threads) : name_(name), first_(nullptr) {
-    create_thread(f, num_threads);
+    create_threads(f, num_threads);
   }
 
   // Destructor joins any remaining threads in the group.