You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/21 04:53:01 UTC

[rocketmq-client-cpp] branch main updated: Polish Scheduler implementation (#373)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 2be2f7c  Polish Scheduler implementation (#373)
2be2f7c is described below

commit 2be2f7cbd2a8672d396ef056b1bb272ffe02e4b5
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Thu Oct 21 12:52:58 2021 +0800

    Polish Scheduler implementation (#373)
---
 src/main/cpp/admin/AdminServerImpl.cpp             |  1 +
 src/main/cpp/client/ClientManagerImpl.cpp          | 29 ++++++++------
 src/main/cpp/client/include/ClientManager.h        |  2 +-
 src/main/cpp/client/include/ClientManagerImpl.h    |  5 ++-
 .../cpp/client/mocks/include/ClientManagerMock.h   |  2 +-
 .../cpp/rocketmq/AsyncReceiveMessageCallback.cpp   |  4 +-
 src/main/cpp/rocketmq/ClientImpl.cpp               | 12 +++---
 .../cpp/rocketmq/DynamicNameServerResolver.cpp     |  4 +-
 src/main/cpp/rocketmq/PushConsumerImpl.cpp         |  4 +-
 .../rocketmq/include/DynamicNameServerResolver.h   |  3 +-
 src/main/cpp/scheduler/SchedulerImpl.cpp           | 44 ++++++++++++++-------
 src/main/cpp/scheduler/include/Scheduler.h         |  4 ++
 src/main/cpp/scheduler/include/SchedulerImpl.h     | 17 ++++++--
 src/test/cpp/ut/admin/AdminServerTest.cpp          |  2 +
 .../cpp/ut/client/ClientManagerFactoryTest.cpp     |  2 +
 src/test/cpp/ut/rocketmq/ClientImplTest.cpp        | 27 +++++++------
 src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp |  1 +
 src/test/cpp/ut/rocketmq/ProducerImplTest.cpp      | 26 +++---------
 src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp  | 13 +++---
 src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp  | 25 ++++--------
 src/test/cpp/ut/scheduler/SchedulerTest.cpp        | 46 +++++++++++-----------
 21 files changed, 146 insertions(+), 127 deletions(-)

diff --git a/src/main/cpp/admin/AdminServerImpl.cpp b/src/main/cpp/admin/AdminServerImpl.cpp
index 7382b97..ce16072 100644
--- a/src/main/cpp/admin/AdminServerImpl.cpp
+++ b/src/main/cpp/admin/AdminServerImpl.cpp
@@ -69,6 +69,7 @@ void AdminServerImpl::loop() {
 
     while (completion_queue_->Next(&tag, &ok)) {
       if (!ok) {
+        delete static_cast<ServerCall*>(tag);
         break;
       }
       static_cast<ServerCall*>(tag)->proceed();
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 5e37dfa..227623d 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -24,6 +24,7 @@
 #include <vector>
 
 #include "ReceiveMessageResult.h"
+#include "Scheduler.h"
 #include "google/rpc/code.pb.h"
 
 #include "InvocationContext.h"
@@ -46,8 +47,8 @@
 ROCKETMQ_NAMESPACE_BEGIN
 
 ClientManagerImpl::ClientManagerImpl(std::string resource_namespace)
-    : resource_namespace_(std::move(resource_namespace)), state_(State::CREATED),
-      completion_queue_(std::make_shared<CompletionQueue>()),
+    : scheduler_(std::make_shared<SchedulerImpl>()), resource_namespace_(std::move(resource_namespace)),
+      state_(State::CREATED), completion_queue_(std::make_shared<CompletionQueue>()),
       callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
       latency_histogram_("Message-Latency", 11) {
   spdlog::set_level(spdlog::level::trace);
@@ -112,7 +113,8 @@ void ClientManagerImpl::start() {
   state_.store(State::STARTING, std::memory_order_relaxed);
 
   callback_thread_pool_->start();
-  scheduler_.start();
+  
+  scheduler_->start();
 
   std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = shared_from_this();
 
@@ -122,8 +124,8 @@ void ClientManagerImpl::start() {
       client_instance->doHealthCheck();
     }
   };
-  health_check_task_id_ = scheduler_.schedule(health_check_functor, HEALTH_CHECK_TASK_NAME, std::chrono::seconds(5),
-                                              std::chrono::seconds(5));
+  health_check_task_id_ = scheduler_->schedule(health_check_functor, HEALTH_CHECK_TASK_NAME, std::chrono::seconds(5),
+                                               std::chrono::seconds(5));
   auto heartbeat_functor = [client_instance_weak_ptr]() {
     auto client_instance = client_instance_weak_ptr.lock();
     if (client_instance) {
@@ -131,7 +133,7 @@ void ClientManagerImpl::start() {
     }
   };
   heartbeat_task_id_ =
-      scheduler_.schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));
+      scheduler_->schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));
 
   completion_queue_thread_ = std::thread(std::bind(&ClientManagerImpl::pollCompletionQueue, this));
 
@@ -142,12 +144,12 @@ void ClientManagerImpl::start() {
     }
   };
   stats_task_id_ =
-      scheduler_.schedule(stats_functor_, STATS_TASK_NAME, std::chrono::seconds(0), std::chrono::seconds(10));
+      scheduler_->schedule(stats_functor_, STATS_TASK_NAME, std::chrono::seconds(0), std::chrono::seconds(10));
   state_.store(State::STARTED, std::memory_order_relaxed);
 }
 
 void ClientManagerImpl::shutdown() {
-  SPDLOG_DEBUG("Client instance shutdown");
+  SPDLOG_INFO("Client manager shutdown");
   if (State::STARTED != state_.load(std::memory_order_relaxed)) {
     SPDLOG_WARN("Unexpected client instance state: {}", state_.load(std::memory_order_relaxed));
     return;
@@ -157,17 +159,18 @@ void ClientManagerImpl::shutdown() {
   callback_thread_pool_->shutdown();
 
   if (health_check_task_id_) {
-    scheduler_.cancel(health_check_task_id_);
+    scheduler_->cancel(health_check_task_id_);
   }
 
   if (heartbeat_task_id_) {
-    scheduler_.cancel(heartbeat_task_id_);
+    scheduler_->cancel(heartbeat_task_id_);
   }
 
   if (stats_task_id_) {
-    scheduler_.cancel(stats_task_id_);
+    scheduler_->cancel(stats_task_id_);
   }
-  scheduler_.shutdown();
+  
+  scheduler_->shutdown();
 
   {
     absl::MutexLock lk(&rpc_clients_mtx_);
@@ -1065,7 +1068,7 @@ bool ClientManagerImpl::wrapMessage(const rmq::Message& item, MQMessageExt& mess
   return true;
 }
 
-Scheduler& ClientManagerImpl::getScheduler() {
+SchedulerSharedPtr ClientManagerImpl::getScheduler() {
   return scheduler_;
 }
 
diff --git a/src/main/cpp/client/include/ClientManager.h b/src/main/cpp/client/include/ClientManager.h
index 2f5223e..dceace3 100644
--- a/src/main/cpp/client/include/ClientManager.h
+++ b/src/main/cpp/client/include/ClientManager.h
@@ -42,7 +42,7 @@ public:
 
   virtual void shutdown() = 0;
 
-  virtual Scheduler& getScheduler() = 0;
+  virtual SchedulerSharedPtr getScheduler() = 0;
 
   virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) = 0;
 
diff --git a/src/main/cpp/client/include/ClientManagerImpl.h b/src/main/cpp/client/include/ClientManagerImpl.h
index f3ed5ff..47d3390 100644
--- a/src/main/cpp/client/include/ClientManagerImpl.h
+++ b/src/main/cpp/client/include/ClientManagerImpl.h
@@ -25,6 +25,7 @@
 #include <system_error>
 #include <vector>
 
+#include "Scheduler.h"
 #include "absl/base/thread_annotations.h"
 #include "absl/container/flat_hash_map.h"
 #include "absl/container/flat_hash_set.h"
@@ -145,7 +146,7 @@ public:
    */
   bool wrapMessage(const rmq::Message& item, MQMessageExt& message_ext) override;
 
-  Scheduler& getScheduler() override;
+  SchedulerSharedPtr getScheduler() override;
 
   /**
    * Ack message asynchronously.
@@ -224,7 +225,7 @@ private:
 
   void logStats();
 
-  SchedulerImpl scheduler_;
+  SchedulerSharedPtr scheduler_;
 
   static const char* HEARTBEAT_TASK_NAME;
   static const char* STATS_TASK_NAME;
diff --git a/src/main/cpp/client/mocks/include/ClientManagerMock.h b/src/main/cpp/client/mocks/include/ClientManagerMock.h
index 4172ab5..01d6d41 100644
--- a/src/main/cpp/client/mocks/include/ClientManagerMock.h
+++ b/src/main/cpp/client/mocks/include/ClientManagerMock.h
@@ -29,7 +29,7 @@ public:
 
   MOCK_METHOD(void, shutdown, (), (override));
 
-  MOCK_METHOD(Scheduler&, getScheduler, (), (override));
+  MOCK_METHOD(SchedulerSharedPtr, getScheduler, (), (override));
 
   MOCK_METHOD((std::shared_ptr<grpc::Channel>), createChannel, (const std::string&), (override));
 
diff --git a/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp b/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
index 3d12818..c3429ab 100644
--- a/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -97,8 +97,8 @@ void AsyncReceiveMessageCallback::receiveMessageLater() {
     }
   };
 
-  client_instance->getScheduler().schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
-                                           std::chrono::seconds(0));
+  client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
+                                            std::chrono::seconds(0));
 }
 
 void AsyncReceiveMessageCallback::receiveMessageImmediately() {
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index 049569a..0d22e95 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -76,8 +76,8 @@ void ClientImpl::start() {
     }
   };
 
-  route_update_handle_ = client_manager_->getScheduler().schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
-                                                                  std::chrono::seconds(10), std::chrono::seconds(30));
+  route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
+                                                                   std::chrono::seconds(10), std::chrono::seconds(30));
 }
 
 void ClientImpl::shutdown() {
@@ -85,7 +85,7 @@ void ClientImpl::shutdown() {
   if (state_.compare_exchange_strong(expected, State::STOPPED)) {
     name_server_resolver_->shutdown();
     if (route_update_handle_) {
-      client_manager_->getScheduler().cancel(route_update_handle_);
+      client_manager_->getScheduler()->cancel(route_update_handle_);
     }
     client_manager_.reset();
   } else {
@@ -437,8 +437,8 @@ void ClientImpl::onPollCommandResponse(const InvocationContext<PollCommandRespon
   }
   if (!ctx->status.ok()) {
     static std::string task_name = "Poll-Command-Later";
-    client_manager_->getScheduler().schedule(std::bind(&ClientImpl::pollCommand, this, ctx->remote_address), task_name,
-                                             std::chrono::seconds(3), std::chrono::seconds(0));
+    client_manager_->getScheduler()->schedule(std::bind(&ClientImpl::pollCommand, this, ctx->remote_address), task_name,
+                                              std::chrono::seconds(3), std::chrono::seconds(0));
     return;
   }
 
@@ -547,7 +547,7 @@ void ClientImpl::healthCheck() {
 
 void ClientImpl::schedule(const std::string& task_name, const std::function<void()>& task,
                           std::chrono::milliseconds delay) {
-  client_manager_->getScheduler().schedule(task, task_name, delay, std::chrono::milliseconds(0));
+  client_manager_->getScheduler()->schedule(task, task_name, delay, std::chrono::milliseconds(0));
 }
 
 void ClientImpl::onHealthCheckResponse(const std::error_code& ec, const InvocationContext<HealthCheckResponse>* ctx) {
diff --git a/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp b/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp
index d2eb40f..b1a9614 100644
--- a/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp
+++ b/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp
@@ -31,8 +31,8 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 DynamicNameServerResolver::DynamicNameServerResolver(absl::string_view endpoint,
                                                      std::chrono::milliseconds refresh_interval)
-    : endpoint_(endpoint.data(), endpoint.length()), refresh_interval_(refresh_interval),
-      scheduler_(absl::make_unique<SchedulerImpl>()) {
+    : endpoint_(endpoint.data(), endpoint.length()), scheduler_(std::make_shared<SchedulerImpl>(1)),
+      refresh_interval_(refresh_interval) {
   absl::string_view remains;
   if (absl::StartsWith(endpoint_, "https://")) {
     ssl_ = true;
diff --git a/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 49eb830..6691716 100644
--- a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -98,7 +98,7 @@ void PushConsumerImpl::start() {
     }
   };
 
-  scan_assignment_handle_ = client_manager_->getScheduler().schedule(
+  scan_assignment_handle_ = client_manager_->getScheduler()->schedule(
       scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME, std::chrono::milliseconds(100), std::chrono::seconds(5));
 
   SPDLOG_INFO("PushConsumer started, groupName={}", group_name_);
@@ -110,7 +110,7 @@ void PushConsumerImpl::shutdown() {
   State expecting = State::STARTED;
   if (state_.compare_exchange_strong(expecting, State::STOPPING)) {
     if (scan_assignment_handle_) {
-      client_manager_->getScheduler().cancel(scan_assignment_handle_);
+      client_manager_->getScheduler()->cancel(scan_assignment_handle_);
       SPDLOG_DEBUG("Scan assignment periodic task cancelled");
     }
 
diff --git a/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h b/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h
index d6fc21e..bac08bf 100644
--- a/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h
+++ b/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h
@@ -55,6 +55,7 @@ public:
 private:
   std::string endpoint_;
 
+  SchedulerSharedPtr scheduler_;
   std::chrono::milliseconds refresh_interval_;
 
   void fetch();
@@ -68,8 +69,6 @@ private:
 
   bool ssl_{false};
   std::unique_ptr<TopAddressing> top_addressing_;
-
-  std::unique_ptr<Scheduler> scheduler_;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/scheduler/SchedulerImpl.cpp b/src/main/cpp/scheduler/SchedulerImpl.cpp
index c5c48e4..7eb4da3 100644
--- a/src/main/cpp/scheduler/SchedulerImpl.cpp
+++ b/src/main/cpp/scheduler/SchedulerImpl.cpp
@@ -31,22 +31,27 @@
 #include "asio/executor_work_guard.hpp"
 #include "asio/io_context.hpp"
 #include "asio/steady_timer.hpp"
-
-#include "LoggerImpl.h"
+#include "spdlog/spdlog.h"
 
 ROCKETMQ_NAMESPACE_BEGIN
 
-SchedulerImpl::SchedulerImpl()
+SchedulerImpl::SchedulerImpl(std::uint32_t worker_num)
     : work_guard_(
-          absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())) {
-  spdlog::set_level(spdlog::level::debug);
+          absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())),
+      worker_num_(worker_num) {
+}
+
+SchedulerImpl::SchedulerImpl() : SchedulerImpl(std::thread::hardware_concurrency()) {
+}
+
+SchedulerImpl::~SchedulerImpl() {
+  shutdown0();
 }
 
 void SchedulerImpl::start() {
   State expected = State::CREATED;
   if (state_.compare_exchange_strong(expected, State::STARTING, std::memory_order_relaxed)) {
-
-    for (unsigned int i = 0; i < std::thread::hardware_concurrency(); i++) {
+    for (std::uint32_t i = 0; i < worker_num_; i++) {
       auto worker = std::thread([this]() {
         {
           State expect = State::STARTING;
@@ -72,7 +77,7 @@ void SchedulerImpl::start() {
 #endif
 
           if (State::STARTED != state_.load(std::memory_order_relaxed)) {
-            SPDLOG_INFO("A scheduler worker quit");
+            SPDLOG_INFO("One scheduler worker thread quit");
             break;
           }
         }
@@ -91,6 +96,10 @@ void SchedulerImpl::start() {
 }
 
 void SchedulerImpl::shutdown() {
+  shutdown0();
+}
+
+void SchedulerImpl::shutdown0() {
   State expected = State::STARTED;
   if (state_.compare_exchange_strong(expected, State::STOPPING, std::memory_order_relaxed)) {
     work_guard_->reset();
@@ -105,6 +114,7 @@ void SchedulerImpl::shutdown() {
         worker.join();
       }
     }
+    threads_.clear();
 
     state_.store(State::STOPPED);
   }
@@ -125,9 +135,12 @@ std::uint32_t SchedulerImpl::schedule(const std::function<void(void)>& functor,
     id = ++task_id;
     tasks_.insert({id, task});
   }
-  asio::steady_timer* timer = new asio::steady_timer(context_, delay);
+  task->task_id = id;
+  task->timer = absl::make_unique<asio::steady_timer>(context_, delay);
+  task->scheduler = shared_from_this();
   SPDLOG_DEBUG("Timer-task[name={}] to fire in {}ms", task_name, delay.count());
-  timer->async_wait(std::bind(&SchedulerImpl::execute, std::placeholders::_1, timer, std::weak_ptr<TimerTask>(task)));
+  auto timer_task_weak_ptr = std::weak_ptr<TimerTask>(task);
+  task->timer->async_wait(std::bind(&SchedulerImpl::execute, std::placeholders::_1, timer_task_weak_ptr));
   return id;
 }
 
@@ -143,10 +156,9 @@ void SchedulerImpl::cancel(std::uint32_t task_id) {
   tasks_.erase(search);
 }
 
-void SchedulerImpl::execute(const asio::error_code& ec, asio::steady_timer* timer, std::weak_ptr<TimerTask> task) {
+void SchedulerImpl::execute(const asio::error_code& ec, std::weak_ptr<TimerTask> task) {
   std::shared_ptr<TimerTask> timer_task = task.lock();
   if (!timer_task) {
-    delete timer;
     return;
   }
 
@@ -168,11 +180,15 @@ void SchedulerImpl::execute(const asio::error_code& ec, asio::steady_timer* time
 #endif
 
   if (timer_task->interval.count()) {
+    auto& timer = timer_task->timer;
     timer->expires_at(timer->expiry() + timer_task->interval);
-    timer->async_wait(std::bind(&SchedulerImpl::execute, std::placeholders::_1, timer, task));
+    timer->async_wait(std::bind(&SchedulerImpl::execute, std::placeholders::_1, task));
     SPDLOG_DEBUG("Repeated timer-task {} to fire in {}ms", timer_task->task_name, timer_task->interval.count());
   } else {
-    delete timer;
+    auto scheduler = timer_task->scheduler.lock();
+    if (scheduler) {
+      scheduler->cancel(timer_task->task_id);
+    }
   }
 }
 
diff --git a/src/main/cpp/scheduler/include/Scheduler.h b/src/main/cpp/scheduler/include/Scheduler.h
index 34d676b..c280ea8 100644
--- a/src/main/cpp/scheduler/include/Scheduler.h
+++ b/src/main/cpp/scheduler/include/Scheduler.h
@@ -20,6 +20,7 @@
 #include <cstdint>
 #include <functional>
 #include <string>
+#include <memory>
 
 #include "rocketmq/RocketMQ.h"
 
@@ -39,4 +40,7 @@ public:
   virtual void cancel(std::uint32_t task_id) = 0;
 };
 
+using SchedulerPtr = std::weak_ptr<Scheduler>;
+using SchedulerSharedPtr = std::shared_ptr<Scheduler>;
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/scheduler/include/SchedulerImpl.h b/src/main/cpp/scheduler/include/SchedulerImpl.h
index 14b5fd2..bcd63d9 100644
--- a/src/main/cpp/scheduler/include/SchedulerImpl.h
+++ b/src/main/cpp/scheduler/include/SchedulerImpl.h
@@ -16,6 +16,7 @@
  */
 #pragma once
 #include <atomic>
+#include <bits/c++config.h>
 #include <chrono>
 #include <cstdint>
 #include <functional>
@@ -34,16 +35,21 @@
 ROCKETMQ_NAMESPACE_BEGIN
 
 struct TimerTask {
+  std::uint32_t task_id;
+  std::string task_name;
   std::function<void(void)> callback;
   std::chrono::milliseconds interval;
-  std::string task_name;
+  std::unique_ptr<asio::steady_timer> timer;
+  SchedulerPtr scheduler;
 };
 
-class SchedulerImpl : public Scheduler {
+class SchedulerImpl : public std::enable_shared_from_this<SchedulerImpl>, public Scheduler {
 public:
   SchedulerImpl();
 
-  ~SchedulerImpl() override = default;
+  explicit SchedulerImpl(std::uint32_t worker_num);
+
+  ~SchedulerImpl() override;
 
   void start() override;
 
@@ -70,13 +76,16 @@ private:
   std::unique_ptr<asio::executor_work_guard<asio::io_context::executor_type>> work_guard_;
   absl::Mutex start_mtx_;
   absl::CondVar start_cv_;
+  std::uint32_t worker_num_{std::thread::hardware_concurrency()};
   std::vector<std::thread> threads_;
   std::atomic<State> state_{State::CREATED};
 
   absl::flat_hash_map<std::uint32_t, std::shared_ptr<TimerTask>> tasks_ GUARDED_BY(tasks_mtx_);
   absl::Mutex tasks_mtx_;
 
-  static void execute(const asio::error_code& ec, asio::steady_timer* timer, std::weak_ptr<TimerTask> task);
+  static void execute(const asio::error_code& ec, std::weak_ptr<TimerTask> task);
+
+  void shutdown0();
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/test/cpp/ut/admin/AdminServerTest.cpp b/src/test/cpp/ut/admin/AdminServerTest.cpp
index e88e00d..eb988cc 100644
--- a/src/test/cpp/ut/admin/AdminServerTest.cpp
+++ b/src/test/cpp/ut/admin/AdminServerTest.cpp
@@ -59,6 +59,8 @@ TEST(AdminServerTest, testSetUp) {
   EXPECT_EQ(spdlog::level::info, logger->level());
 
   admin_server->stop();
+
+  delete admin_server;
 }
 
 } // namespace admin
diff --git a/src/test/cpp/ut/client/ClientManagerFactoryTest.cpp b/src/test/cpp/ut/client/ClientManagerFactoryTest.cpp
index d63cabf..e48200e 100644
--- a/src/test/cpp/ut/client/ClientManagerFactoryTest.cpp
+++ b/src/test/cpp/ut/client/ClientManagerFactoryTest.cpp
@@ -40,6 +40,8 @@ TEST_F(ClientManagerFactoryTest, testGetClientManager) {
   ClientManagerPtr client_manager = ClientManagerFactory::getInstance().getClientManager(client_config_);
   EXPECT_TRUE(client_manager);
   client_manager->start();
+
+  client_manager->shutdown();
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/test/cpp/ut/rocketmq/ClientImplTest.cpp b/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
index 561d26c..f93fbec 100644
--- a/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
@@ -24,6 +24,7 @@
 #include "DynamicNameServerResolver.h"
 #include "HttpClientMock.h"
 #include "NameServerResolverMock.h"
+#include "Scheduler.h"
 #include "SchedulerImpl.h"
 #include "TopAddressing.h"
 #include "rocketmq/RocketMQ.h"
@@ -49,18 +50,23 @@ class ClientImplTest : public testing::Test {
 public:
   void SetUp() override {
     grpc_init();
+    scheduler_ = std::make_shared<SchedulerImpl>();
+
+    http_client_ = absl::make_unique<testing::NiceMock<HttpClientMock>>();
     name_server_resolver_ = std::make_shared<DynamicNameServerResolver>(endpoint_, std::chrono::seconds(1));
-    scheduler_.start();
     client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
     ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
-    ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler_));
+
+    ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::Return(scheduler_));
+    ON_CALL(*client_manager_, start).WillByDefault([&]() { scheduler_->start(); });
+    ON_CALL(*client_manager_, shutdown).WillByDefault([&]() { scheduler_->shutdown(); });
+
     client_ = std::make_shared<TestClientImpl>(group_);
     client_->withNameServerResolver(name_server_resolver_);
   }
 
   void TearDown() override {
     grpc_shutdown();
-    scheduler_.shutdown();
   }
 
 protected:
@@ -68,15 +74,13 @@ protected:
   std::string resource_namespace_{"mq://test"};
   std::string group_{"Group-0"};
   std::shared_ptr<testing::NiceMock<ClientManagerMock>> client_manager_;
-  SchedulerImpl scheduler_;
+  SchedulerSharedPtr scheduler_;
   std::shared_ptr<TestClientImpl> client_;
   std::shared_ptr<DynamicNameServerResolver> name_server_resolver_;
+  std::unique_ptr<testing::NiceMock<HttpClientMock>> http_client_;
 };
 
 TEST_F(ClientImplTest, testBasic) {
-
-  auto http_client = absl::make_unique<HttpClientMock>();
-
   std::string once{"10.0.0.1:9876"};
   std::string then{"10.0.0.1:9876;10.0.0.2:9876"};
   std::multimap<std::string, std::string> header;
@@ -100,8 +104,8 @@ TEST_F(ClientImplTest, testBasic) {
         cv.SignalAll();
       };
 
-  EXPECT_CALL(*http_client, get).WillOnce(testing::Invoke(once_cb)).WillRepeatedly(testing::Invoke(then_cb));
-  name_server_resolver_->injectHttpClient(std::move(http_client));
+  EXPECT_CALL(*http_client_, get).WillOnce(testing::Invoke(once_cb)).WillRepeatedly(testing::Invoke(then_cb));
+  name_server_resolver_->injectHttpClient(std::move(http_client_));
 
   client_->resourceNamespace(resource_namespace_);
   client_->start();
@@ -111,9 +115,10 @@ TEST_F(ClientImplTest, testBasic) {
       cv.WaitWithDeadline(&mtx, absl::Now() + absl::Seconds(3));
     }
   }
-  EXPECT_TRUE(completed);
 
-  // Now that the derivative class has closed its resources, state of ClientImpl should be STOPPING.
+  ASSERT_TRUE(completed);
+
+  // Now that the derivative class has closed its own resources, state of ClientImpl should be STOPPING.
   client_->state(State::STOPPING);
   client_->shutdown();
 }
diff --git a/src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp b/src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp
index 7aab281..47a17ae 100644
--- a/src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp
+++ b/src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp
@@ -174,6 +174,7 @@ TEST_F(DefaultMQProducerUnitTest, testAsyncSendMessage) {
   }
   ASSERT_EQ(msg_id, message_id_);
   producer->shutdown();
+  delete send_callback;
 }
 
 TEST_F(DefaultMQProducerUnitTest, testSendMessage) {
diff --git a/src/test/cpp/ut/rocketmq/ProducerImplTest.cpp b/src/test/cpp/ut/rocketmq/ProducerImplTest.cpp
index 5bf670c..5b99052 100644
--- a/src/test/cpp/ut/rocketmq/ProducerImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/ProducerImplTest.cpp
@@ -20,6 +20,7 @@
 #include "ClientManagerFactory.h"
 #include "ClientManagerMock.h"
 #include "ProducerImpl.h"
+#include "Scheduler.h"
 #include "SchedulerImpl.h"
 #include "StaticNameServerResolver.h"
 #include "TopicRouteData.h"
@@ -38,8 +39,11 @@ public:
 
   void SetUp() override {
     grpc_init();
+    scheduler_ = std::make_shared<SchedulerImpl>();
+    scheduler_->start();
     name_server_resolver_ = std::make_shared<StaticNameServerResolver>(name_server_list_);
     client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
+    ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::Return(scheduler_));
     ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
     producer_ = std::make_shared<ProducerImpl>(group_);
     producer_->resourceNamespace(resource_namespace_);
@@ -60,10 +64,12 @@ public:
   }
 
   void TearDown() override {
+    scheduler_->shutdown();
     grpc_shutdown();
   }
 
 protected:
+  SchedulerSharedPtr scheduler_;
   std::shared_ptr<testing::NiceMock<ClientManagerMock>> client_manager_;
   std::shared_ptr<ProducerImpl> producer_;
   std::string name_server_list_{"10.0.0.1:9876"};
@@ -87,18 +93,11 @@ protected:
 };
 
 TEST_F(ProducerImplTest, testStartShutdown) {
-  SchedulerImpl scheduler;
-  scheduler.start();
-  ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
   producer_->start();
   producer_->shutdown();
-  scheduler.shutdown();
 }
 
 TEST_F(ProducerImplTest, testSend) {
-  SchedulerImpl scheduler;
-  scheduler.start();
-  ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
   auto mock_resolve_route =
       [this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
              std::chrono::milliseconds timeout,
@@ -129,13 +128,9 @@ TEST_F(ProducerImplTest, testSend) {
   EXPECT_FALSE(ec);
   EXPECT_TRUE(cb_invoked);
   producer_->shutdown();
-  scheduler.shutdown();
 }
 
 TEST_F(ProducerImplTest, testSend_WithMessageGroup) {
-  SchedulerImpl scheduler;
-  scheduler.start();
-  ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
   auto mock_resolve_route =
       [this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
              std::chrono::milliseconds timeout,
@@ -167,13 +162,9 @@ TEST_F(ProducerImplTest, testSend_WithMessageGroup) {
   EXPECT_FALSE(ec);
   EXPECT_TRUE(cb_invoked);
   producer_->shutdown();
-  scheduler.shutdown();
 }
 
 TEST_F(ProducerImplTest, testSend_WithMessageQueueSelector) {
-  SchedulerImpl scheduler;
-  scheduler.start();
-  ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
   auto mock_resolve_route =
       [this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
              std::chrono::milliseconds timeout,
@@ -211,7 +202,6 @@ TEST_F(ProducerImplTest, testSend_WithMessageQueueSelector) {
 
   EXPECT_TRUE(cb_invoked);
   producer_->shutdown();
-  scheduler.shutdown();
 }
 
 class TestSendCallback : public SendCallback {
@@ -237,9 +227,6 @@ protected:
 };
 
 TEST_F(ProducerImplTest, testAsyncSend) {
-  SchedulerImpl scheduler;
-  scheduler.start();
-  ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
   auto mock_resolve_route =
       [this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
              std::chrono::milliseconds timeout,
@@ -280,7 +267,6 @@ TEST_F(ProducerImplTest, testAsyncSend) {
   }
 
   EXPECT_TRUE(cb_invoked);
-  scheduler.shutdown();
   producer_->shutdown();
 }
 
diff --git a/src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp b/src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp
index a714519..729aac0 100644
--- a/src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp
@@ -40,12 +40,12 @@ class PullConsumerImplTest : public testing::Test {
 public:
   void SetUp() override {
     grpc_init();
-
+    scheduler_ = std::make_shared<SchedulerImpl>();
     name_server_resolver_ = std::make_shared<StaticNameServerResolver>(name_server_list_);
 
-    scheduler_.start();
+    scheduler_->start();
     client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
-    ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler_));
+    ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::Return(scheduler_));
     ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
 
     pull_consumer_ = std::make_shared<PullConsumerImpl>(group_);
@@ -67,7 +67,7 @@ public:
 
   void TearDown() override {
     grpc_shutdown();
-    scheduler_.shutdown();
+    scheduler_->shutdown();
   }
 
 protected:
@@ -79,7 +79,7 @@ protected:
   std::string tag_{"TagB"};
   std::shared_ptr<testing::NiceMock<ClientManagerMock>> client_manager_;
   std::shared_ptr<PullConsumerImpl> pull_consumer_;
-  SchedulerImpl scheduler_;
+  SchedulerSharedPtr scheduler_;
   std::string broker_name_{"broker-a"};
   int broker_id_{0};
   std::string message_body_{"Message Body Content"};
@@ -190,6 +190,7 @@ TEST_F(PullConsumerImplTest, testPull) {
   EXPECT_FALSE(failure);
 
   pull_consumer_->shutdown();
+  delete pull_callback;
 }
 
 TEST_F(PullConsumerImplTest, testPull_gRPC_error) {
@@ -237,6 +238,7 @@ TEST_F(PullConsumerImplTest, testPull_gRPC_error) {
   EXPECT_FALSE(success);
 
   pull_consumer_->shutdown();
+  delete pull_callback;
 }
 
 TEST_F(PullConsumerImplTest, testPull_biz_error) {
@@ -285,6 +287,7 @@ TEST_F(PullConsumerImplTest, testPull_biz_error) {
   EXPECT_TRUE(failure);
 
   pull_consumer_->shutdown();
+  delete pull_callback;
 }
 
 TEST_F(PullConsumerImplTest, testQueryOffset) {
diff --git a/src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp b/src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp
index bf83bda..b84dbea 100644
--- a/src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp
@@ -24,6 +24,7 @@
 #include "InvocationContext.h"
 #include "MessageAccessor.h"
 #include "PushConsumerImpl.h"
+#include "Scheduler.h"
 #include "StaticNameServerResolver.h"
 #include "grpc/grpc.h"
 #include "rocketmq/MQMessageExt.h"
@@ -45,22 +46,26 @@ public:
 
   void SetUp() override {
     grpc_init();
-
+    scheduler_ = std::make_shared<SchedulerImpl>();
     name_server_resolver_ = std::make_shared<StaticNameServerResolver>(name_server_list_);
-
     client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
     ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
     push_consumer_ = std::make_shared<PushConsumerImpl>(group_);
     push_consumer_->resourceNamespace(resource_namespace_);
     push_consumer_->withNameServerResolver(name_server_resolver_);
     push_consumer_->registerMessageListener(message_listener_.get());
+    scheduler_->start();
+
+    ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::Return(scheduler_));
   }
 
   void TearDown() override {
+    scheduler_->shutdown();
     grpc_shutdown();
   }
 
 protected:
+  SchedulerSharedPtr scheduler_;
   std::string name_server_list_{"10.0.0.1:9876"};
   std::shared_ptr<StaticNameServerResolver> name_server_resolver_;
   std::string resource_namespace_{"mq://test"};
@@ -77,10 +82,6 @@ protected:
 };
 
 TEST_F(PushConsumerImplTest, testAck) {
-  SchedulerImpl scheduler;
-  scheduler.start();
-  ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
-
   auto ack_cb = [](const std::string& target_host, const Metadata& metadata, const AckMessageRequest& request,
                    std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) {
     std::error_code ec;
@@ -118,14 +119,9 @@ TEST_F(PushConsumerImplTest, testAck) {
   }
   EXPECT_TRUE(completed);
   push_consumer_->shutdown();
-  scheduler.shutdown();
 }
 
 TEST_F(PushConsumerImplTest, testNack) {
-  SchedulerImpl scheduler;
-  scheduler.start();
-  ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
-
   auto nack_cb = [](const std::string& target_host, const Metadata& metadata, const NackMessageRequest& request,
                     std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) {
     std::error_code ec;
@@ -161,14 +157,9 @@ TEST_F(PushConsumerImplTest, testNack) {
   }
   EXPECT_TRUE(completed);
   push_consumer_->shutdown();
-  scheduler.shutdown();
 }
 
 TEST_F(PushConsumerImplTest, testForward) {
-  SchedulerImpl scheduler;
-  scheduler.start();
-  ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
-
   InvocationContext<ForwardMessageToDeadLetterQueueResponse> invocation_context;
 
   auto forward_cb =
@@ -201,7 +192,6 @@ TEST_F(PushConsumerImplTest, testForward) {
   message.setDelayTimeLevel(delay_level_);
 
   push_consumer_->forwardToDeadLetterQueue(message, callback);
-
   {
     absl::MutexLock lk(&mtx);
     if (!completed) {
@@ -210,7 +200,6 @@ TEST_F(PushConsumerImplTest, testForward) {
   }
   EXPECT_TRUE(completed);
   push_consumer_->shutdown();
-  scheduler.shutdown();
 }
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/test/cpp/ut/scheduler/SchedulerTest.cpp b/src/test/cpp/ut/scheduler/SchedulerTest.cpp
index 6e223e3..741d831 100644
--- a/src/test/cpp/ut/scheduler/SchedulerTest.cpp
+++ b/src/test/cpp/ut/scheduler/SchedulerTest.cpp
@@ -19,6 +19,7 @@
 #include <exception>
 #include <functional>
 #include <iostream>
+#include <memory>
 #include <thread>
 
 #include "SchedulerImpl.h"
@@ -28,15 +29,22 @@ ROCKETMQ_NAMESPACE_BEGIN
 
 class SchedulerTest : public testing::Test {
 public:
+  SchedulerTest() : scheduler(std::make_shared<SchedulerImpl>()) {  
+  }
+
   void SetUp() override {
+    scheduler->start();
+  }
+
+  void TearDown() override {
+    scheduler->shutdown();
   }
 
 protected:
+    SchedulerSharedPtr scheduler;
 };
 
 TEST_F(SchedulerTest, testSingleShot) {
-  SchedulerImpl scheduler;
-  scheduler.start();
   absl::Mutex mtx;
   absl::CondVar cv;
   int callback_fire_count{0};
@@ -46,7 +54,7 @@ TEST_F(SchedulerTest, testSingleShot) {
     callback_fire_count++;
   };
 
-  scheduler.schedule(callback, "single-shot", std::chrono::milliseconds(10), std::chrono::milliseconds(0));
+  scheduler->schedule(callback, "single-shot", std::chrono::milliseconds(10), std::chrono::milliseconds(0));
 
   // Wait till callback is executed.
   {
@@ -55,13 +63,9 @@ TEST_F(SchedulerTest, testSingleShot) {
       cv.Wait(&mtx);
     }
   }
-
-  scheduler.shutdown();
 }
 
 TEST_F(SchedulerTest, testCancel) {
-  SchedulerImpl scheduler;
-  scheduler.start();
   absl::Mutex mtx;
   absl::CondVar cv;
   int callback_fire_count{0};
@@ -71,16 +75,13 @@ TEST_F(SchedulerTest, testCancel) {
     callback_fire_count++;
   };
 
-  std::uint32_t task_id = scheduler.schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
-  scheduler.cancel(task_id);
+  std::uint32_t task_id = scheduler->schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
+  scheduler->cancel(task_id);
   std::this_thread::sleep_for(std::chrono::seconds(2));
   ASSERT_EQ(0, callback_fire_count);
-  scheduler.shutdown();
 }
 
 TEST_F(SchedulerTest, testPeriodicShot) {
-  SchedulerImpl scheduler;
-  scheduler.start();
   absl::Mutex mtx;
   absl::CondVar cv;
   int callback_fire_count{0};
@@ -91,17 +92,14 @@ TEST_F(SchedulerTest, testPeriodicShot) {
   };
 
   std::uintptr_t task_id =
-      scheduler.schedule(callback, "periodic-task", std::chrono::milliseconds(10), std::chrono::seconds(1));
+      scheduler->schedule(callback, "periodic-task", std::chrono::milliseconds(10), std::chrono::seconds(1));
   // Wait till callback is executed.
   std::this_thread::sleep_for(std::chrono::seconds(5));
   ASSERT_TRUE(callback_fire_count >= 4);
-  scheduler.cancel(task_id);
-  scheduler.shutdown();
+  scheduler->cancel(task_id);
 }
 
 TEST_F(SchedulerTest, testSingleShotWithZeroDelay) {
-  SchedulerImpl scheduler;
-  scheduler.start();
   absl::Mutex mtx;
   absl::CondVar cv;
   int callback_fire_count{0};
@@ -111,7 +109,7 @@ TEST_F(SchedulerTest, testSingleShotWithZeroDelay) {
     callback_fire_count++;
   };
 
-  scheduler.schedule(callback, "single-shot-with-0-delay", std::chrono::milliseconds(0), std::chrono::milliseconds(0));
+  scheduler->schedule(callback, "single-shot-with-0-delay", std::chrono::milliseconds(0), std::chrono::milliseconds(0));
 
   // Wait till callback is executed.
   {
@@ -120,13 +118,9 @@ TEST_F(SchedulerTest, testSingleShotWithZeroDelay) {
       cv.Wait(&mtx);
     }
   }
-
-  scheduler.shutdown();
 }
 
 TEST_F(SchedulerTest, testException) {
-  SchedulerImpl scheduler;
-  scheduler.start();
   absl::Mutex mtx;
   absl::CondVar cv;
   int callback_fire_count{0};
@@ -141,7 +135,7 @@ TEST_F(SchedulerTest, testException) {
     throw e;
   };
 
-  scheduler.schedule(callback, "test-exception", std::chrono::milliseconds(100), std::chrono::milliseconds(100));
+  scheduler->schedule(callback, "test-exception", std::chrono::milliseconds(100), std::chrono::milliseconds(100));
 
   // Wait till callback is executed.
   {
@@ -152,7 +146,11 @@ TEST_F(SchedulerTest, testException) {
   }
 
   std::this_thread::sleep_for(std::chrono::seconds(3));
+}
 
-  scheduler.shutdown();
+TEST(SchedulerLifeCycleTest, testLifeCycle) {
+  auto scheduler = std::make_shared<SchedulerImpl>();
+  scheduler->start();
 }
+
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file