You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/21 04:53:01 UTC
[rocketmq-client-cpp] branch main updated: Polish Scheduler
implementation (#373)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 2be2f7c Polish Scheduler implementation (#373)
2be2f7c is described below
commit 2be2f7cbd2a8672d396ef056b1bb272ffe02e4b5
Author: Zhanhui Li <li...@apache.org>
AuthorDate: Thu Oct 21 12:52:58 2021 +0800
Polish Scheduler implementation (#373)
---
src/main/cpp/admin/AdminServerImpl.cpp | 1 +
src/main/cpp/client/ClientManagerImpl.cpp | 29 ++++++++------
src/main/cpp/client/include/ClientManager.h | 2 +-
src/main/cpp/client/include/ClientManagerImpl.h | 5 ++-
.../cpp/client/mocks/include/ClientManagerMock.h | 2 +-
.../cpp/rocketmq/AsyncReceiveMessageCallback.cpp | 4 +-
src/main/cpp/rocketmq/ClientImpl.cpp | 12 +++---
.../cpp/rocketmq/DynamicNameServerResolver.cpp | 4 +-
src/main/cpp/rocketmq/PushConsumerImpl.cpp | 4 +-
.../rocketmq/include/DynamicNameServerResolver.h | 3 +-
src/main/cpp/scheduler/SchedulerImpl.cpp | 44 ++++++++++++++-------
src/main/cpp/scheduler/include/Scheduler.h | 4 ++
src/main/cpp/scheduler/include/SchedulerImpl.h | 17 ++++++--
src/test/cpp/ut/admin/AdminServerTest.cpp | 2 +
.../cpp/ut/client/ClientManagerFactoryTest.cpp | 2 +
src/test/cpp/ut/rocketmq/ClientImplTest.cpp | 27 +++++++------
src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp | 1 +
src/test/cpp/ut/rocketmq/ProducerImplTest.cpp | 26 +++---------
src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp | 13 +++---
src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp | 25 ++++--------
src/test/cpp/ut/scheduler/SchedulerTest.cpp | 46 +++++++++++-----------
21 files changed, 146 insertions(+), 127 deletions(-)
diff --git a/src/main/cpp/admin/AdminServerImpl.cpp b/src/main/cpp/admin/AdminServerImpl.cpp
index 7382b97..ce16072 100644
--- a/src/main/cpp/admin/AdminServerImpl.cpp
+++ b/src/main/cpp/admin/AdminServerImpl.cpp
@@ -69,6 +69,7 @@ void AdminServerImpl::loop() {
while (completion_queue_->Next(&tag, &ok)) {
if (!ok) {
+ delete static_cast<ServerCall*>(tag);
break;
}
static_cast<ServerCall*>(tag)->proceed();
diff --git a/src/main/cpp/client/ClientManagerImpl.cpp b/src/main/cpp/client/ClientManagerImpl.cpp
index 5e37dfa..227623d 100644
--- a/src/main/cpp/client/ClientManagerImpl.cpp
+++ b/src/main/cpp/client/ClientManagerImpl.cpp
@@ -24,6 +24,7 @@
#include <vector>
#include "ReceiveMessageResult.h"
+#include "Scheduler.h"
#include "google/rpc/code.pb.h"
#include "InvocationContext.h"
@@ -46,8 +47,8 @@
ROCKETMQ_NAMESPACE_BEGIN
ClientManagerImpl::ClientManagerImpl(std::string resource_namespace)
- : resource_namespace_(std::move(resource_namespace)), state_(State::CREATED),
- completion_queue_(std::make_shared<CompletionQueue>()),
+ : scheduler_(std::make_shared<SchedulerImpl>()), resource_namespace_(std::move(resource_namespace)),
+ state_(State::CREATED), completion_queue_(std::make_shared<CompletionQueue>()),
callback_thread_pool_(absl::make_unique<ThreadPoolImpl>(std::thread::hardware_concurrency())),
latency_histogram_("Message-Latency", 11) {
spdlog::set_level(spdlog::level::trace);
@@ -112,7 +113,8 @@ void ClientManagerImpl::start() {
state_.store(State::STARTING, std::memory_order_relaxed);
callback_thread_pool_->start();
- scheduler_.start();
+
+ scheduler_->start();
std::weak_ptr<ClientManagerImpl> client_instance_weak_ptr = shared_from_this();
@@ -122,8 +124,8 @@ void ClientManagerImpl::start() {
client_instance->doHealthCheck();
}
};
- health_check_task_id_ = scheduler_.schedule(health_check_functor, HEALTH_CHECK_TASK_NAME, std::chrono::seconds(5),
- std::chrono::seconds(5));
+ health_check_task_id_ = scheduler_->schedule(health_check_functor, HEALTH_CHECK_TASK_NAME, std::chrono::seconds(5),
+ std::chrono::seconds(5));
auto heartbeat_functor = [client_instance_weak_ptr]() {
auto client_instance = client_instance_weak_ptr.lock();
if (client_instance) {
@@ -131,7 +133,7 @@ void ClientManagerImpl::start() {
}
};
heartbeat_task_id_ =
- scheduler_.schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));
+ scheduler_->schedule(heartbeat_functor, HEARTBEAT_TASK_NAME, std::chrono::seconds(1), std::chrono::seconds(10));
completion_queue_thread_ = std::thread(std::bind(&ClientManagerImpl::pollCompletionQueue, this));
@@ -142,12 +144,12 @@ void ClientManagerImpl::start() {
}
};
stats_task_id_ =
- scheduler_.schedule(stats_functor_, STATS_TASK_NAME, std::chrono::seconds(0), std::chrono::seconds(10));
+ scheduler_->schedule(stats_functor_, STATS_TASK_NAME, std::chrono::seconds(0), std::chrono::seconds(10));
state_.store(State::STARTED, std::memory_order_relaxed);
}
void ClientManagerImpl::shutdown() {
- SPDLOG_DEBUG("Client instance shutdown");
+ SPDLOG_INFO("Client manager shutdown");
if (State::STARTED != state_.load(std::memory_order_relaxed)) {
SPDLOG_WARN("Unexpected client instance state: {}", state_.load(std::memory_order_relaxed));
return;
@@ -157,17 +159,18 @@ void ClientManagerImpl::shutdown() {
callback_thread_pool_->shutdown();
if (health_check_task_id_) {
- scheduler_.cancel(health_check_task_id_);
+ scheduler_->cancel(health_check_task_id_);
}
if (heartbeat_task_id_) {
- scheduler_.cancel(heartbeat_task_id_);
+ scheduler_->cancel(heartbeat_task_id_);
}
if (stats_task_id_) {
- scheduler_.cancel(stats_task_id_);
+ scheduler_->cancel(stats_task_id_);
}
- scheduler_.shutdown();
+
+ scheduler_->shutdown();
{
absl::MutexLock lk(&rpc_clients_mtx_);
@@ -1065,7 +1068,7 @@ bool ClientManagerImpl::wrapMessage(const rmq::Message& item, MQMessageExt& mess
return true;
}
-Scheduler& ClientManagerImpl::getScheduler() {
+SchedulerSharedPtr ClientManagerImpl::getScheduler() {
return scheduler_;
}
diff --git a/src/main/cpp/client/include/ClientManager.h b/src/main/cpp/client/include/ClientManager.h
index 2f5223e..dceace3 100644
--- a/src/main/cpp/client/include/ClientManager.h
+++ b/src/main/cpp/client/include/ClientManager.h
@@ -42,7 +42,7 @@ public:
virtual void shutdown() = 0;
- virtual Scheduler& getScheduler() = 0;
+ virtual SchedulerSharedPtr getScheduler() = 0;
virtual std::shared_ptr<grpc::Channel> createChannel(const std::string& target_host) = 0;
diff --git a/src/main/cpp/client/include/ClientManagerImpl.h b/src/main/cpp/client/include/ClientManagerImpl.h
index f3ed5ff..47d3390 100644
--- a/src/main/cpp/client/include/ClientManagerImpl.h
+++ b/src/main/cpp/client/include/ClientManagerImpl.h
@@ -25,6 +25,7 @@
#include <system_error>
#include <vector>
+#include "Scheduler.h"
#include "absl/base/thread_annotations.h"
#include "absl/container/flat_hash_map.h"
#include "absl/container/flat_hash_set.h"
@@ -145,7 +146,7 @@ public:
*/
bool wrapMessage(const rmq::Message& item, MQMessageExt& message_ext) override;
- Scheduler& getScheduler() override;
+ SchedulerSharedPtr getScheduler() override;
/**
* Ack message asynchronously.
@@ -224,7 +225,7 @@ private:
void logStats();
- SchedulerImpl scheduler_;
+ SchedulerSharedPtr scheduler_;
static const char* HEARTBEAT_TASK_NAME;
static const char* STATS_TASK_NAME;
diff --git a/src/main/cpp/client/mocks/include/ClientManagerMock.h b/src/main/cpp/client/mocks/include/ClientManagerMock.h
index 4172ab5..01d6d41 100644
--- a/src/main/cpp/client/mocks/include/ClientManagerMock.h
+++ b/src/main/cpp/client/mocks/include/ClientManagerMock.h
@@ -29,7 +29,7 @@ public:
MOCK_METHOD(void, shutdown, (), (override));
- MOCK_METHOD(Scheduler&, getScheduler, (), (override));
+ MOCK_METHOD(SchedulerSharedPtr, getScheduler, (), (override));
MOCK_METHOD((std::shared_ptr<grpc::Channel>), createChannel, (const std::string&), (override));
diff --git a/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp b/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
index 3d12818..c3429ab 100644
--- a/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
+++ b/src/main/cpp/rocketmq/AsyncReceiveMessageCallback.cpp
@@ -97,8 +97,8 @@ void AsyncReceiveMessageCallback::receiveMessageLater() {
}
};
- client_instance->getScheduler().schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
- std::chrono::seconds(0));
+ client_instance->getScheduler()->schedule(task, RECEIVE_LATER_TASK_NAME, std::chrono::seconds(1),
+ std::chrono::seconds(0));
}
void AsyncReceiveMessageCallback::receiveMessageImmediately() {
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index 049569a..0d22e95 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -76,8 +76,8 @@ void ClientImpl::start() {
}
};
- route_update_handle_ = client_manager_->getScheduler().schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
- std::chrono::seconds(10), std::chrono::seconds(30));
+ route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
+ std::chrono::seconds(10), std::chrono::seconds(30));
}
void ClientImpl::shutdown() {
@@ -85,7 +85,7 @@ void ClientImpl::shutdown() {
if (state_.compare_exchange_strong(expected, State::STOPPED)) {
name_server_resolver_->shutdown();
if (route_update_handle_) {
- client_manager_->getScheduler().cancel(route_update_handle_);
+ client_manager_->getScheduler()->cancel(route_update_handle_);
}
client_manager_.reset();
} else {
@@ -437,8 +437,8 @@ void ClientImpl::onPollCommandResponse(const InvocationContext<PollCommandRespon
}
if (!ctx->status.ok()) {
static std::string task_name = "Poll-Command-Later";
- client_manager_->getScheduler().schedule(std::bind(&ClientImpl::pollCommand, this, ctx->remote_address), task_name,
- std::chrono::seconds(3), std::chrono::seconds(0));
+ client_manager_->getScheduler()->schedule(std::bind(&ClientImpl::pollCommand, this, ctx->remote_address), task_name,
+ std::chrono::seconds(3), std::chrono::seconds(0));
return;
}
@@ -547,7 +547,7 @@ void ClientImpl::healthCheck() {
void ClientImpl::schedule(const std::string& task_name, const std::function<void()>& task,
std::chrono::milliseconds delay) {
- client_manager_->getScheduler().schedule(task, task_name, delay, std::chrono::milliseconds(0));
+ client_manager_->getScheduler()->schedule(task, task_name, delay, std::chrono::milliseconds(0));
}
void ClientImpl::onHealthCheckResponse(const std::error_code& ec, const InvocationContext<HealthCheckResponse>* ctx) {
diff --git a/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp b/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp
index d2eb40f..b1a9614 100644
--- a/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp
+++ b/src/main/cpp/rocketmq/DynamicNameServerResolver.cpp
@@ -31,8 +31,8 @@ ROCKETMQ_NAMESPACE_BEGIN
DynamicNameServerResolver::DynamicNameServerResolver(absl::string_view endpoint,
std::chrono::milliseconds refresh_interval)
- : endpoint_(endpoint.data(), endpoint.length()), refresh_interval_(refresh_interval),
- scheduler_(absl::make_unique<SchedulerImpl>()) {
+ : endpoint_(endpoint.data(), endpoint.length()), scheduler_(std::make_shared<SchedulerImpl>(1)),
+ refresh_interval_(refresh_interval) {
absl::string_view remains;
if (absl::StartsWith(endpoint_, "https://")) {
ssl_ = true;
diff --git a/src/main/cpp/rocketmq/PushConsumerImpl.cpp b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
index 49eb830..6691716 100644
--- a/src/main/cpp/rocketmq/PushConsumerImpl.cpp
+++ b/src/main/cpp/rocketmq/PushConsumerImpl.cpp
@@ -98,7 +98,7 @@ void PushConsumerImpl::start() {
}
};
- scan_assignment_handle_ = client_manager_->getScheduler().schedule(
+ scan_assignment_handle_ = client_manager_->getScheduler()->schedule(
scan_assignment_functor, SCAN_ASSIGNMENT_TASK_NAME, std::chrono::milliseconds(100), std::chrono::seconds(5));
SPDLOG_INFO("PushConsumer started, groupName={}", group_name_);
@@ -110,7 +110,7 @@ void PushConsumerImpl::shutdown() {
State expecting = State::STARTED;
if (state_.compare_exchange_strong(expecting, State::STOPPING)) {
if (scan_assignment_handle_) {
- client_manager_->getScheduler().cancel(scan_assignment_handle_);
+ client_manager_->getScheduler()->cancel(scan_assignment_handle_);
SPDLOG_DEBUG("Scan assignment periodic task cancelled");
}
diff --git a/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h b/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h
index d6fc21e..bac08bf 100644
--- a/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h
+++ b/src/main/cpp/rocketmq/include/DynamicNameServerResolver.h
@@ -55,6 +55,7 @@ public:
private:
std::string endpoint_;
+ SchedulerSharedPtr scheduler_;
std::chrono::milliseconds refresh_interval_;
void fetch();
@@ -68,8 +69,6 @@ private:
bool ssl_{false};
std::unique_ptr<TopAddressing> top_addressing_;
-
- std::unique_ptr<Scheduler> scheduler_;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/scheduler/SchedulerImpl.cpp b/src/main/cpp/scheduler/SchedulerImpl.cpp
index c5c48e4..7eb4da3 100644
--- a/src/main/cpp/scheduler/SchedulerImpl.cpp
+++ b/src/main/cpp/scheduler/SchedulerImpl.cpp
@@ -31,22 +31,27 @@
#include "asio/executor_work_guard.hpp"
#include "asio/io_context.hpp"
#include "asio/steady_timer.hpp"
-
-#include "LoggerImpl.h"
+#include "spdlog/spdlog.h"
ROCKETMQ_NAMESPACE_BEGIN
-SchedulerImpl::SchedulerImpl()
+SchedulerImpl::SchedulerImpl(std::uint32_t worker_num)
: work_guard_(
- absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())) {
- spdlog::set_level(spdlog::level::debug);
+ absl::make_unique<asio::executor_work_guard<asio::io_context::executor_type>>(context_.get_executor())),
+ worker_num_(worker_num) {
+}
+
+SchedulerImpl::SchedulerImpl() : SchedulerImpl(std::thread::hardware_concurrency()) {
+}
+
+SchedulerImpl::~SchedulerImpl() {
+ shutdown0();
}
void SchedulerImpl::start() {
State expected = State::CREATED;
if (state_.compare_exchange_strong(expected, State::STARTING, std::memory_order_relaxed)) {
-
- for (unsigned int i = 0; i < std::thread::hardware_concurrency(); i++) {
+ for (std::uint32_t i = 0; i < worker_num_; i++) {
auto worker = std::thread([this]() {
{
State expect = State::STARTING;
@@ -72,7 +77,7 @@ void SchedulerImpl::start() {
#endif
if (State::STARTED != state_.load(std::memory_order_relaxed)) {
- SPDLOG_INFO("A scheduler worker quit");
+ SPDLOG_INFO("One scheduler worker thread quit");
break;
}
}
@@ -91,6 +96,10 @@ void SchedulerImpl::start() {
}
void SchedulerImpl::shutdown() {
+ shutdown0();
+}
+
+void SchedulerImpl::shutdown0() {
State expected = State::STARTED;
if (state_.compare_exchange_strong(expected, State::STOPPING, std::memory_order_relaxed)) {
work_guard_->reset();
@@ -105,6 +114,7 @@ void SchedulerImpl::shutdown() {
worker.join();
}
}
+ threads_.clear();
state_.store(State::STOPPED);
}
@@ -125,9 +135,12 @@ std::uint32_t SchedulerImpl::schedule(const std::function<void(void)>& functor,
id = ++task_id;
tasks_.insert({id, task});
}
- asio::steady_timer* timer = new asio::steady_timer(context_, delay);
+ task->task_id = id;
+ task->timer = absl::make_unique<asio::steady_timer>(context_, delay);
+ task->scheduler = shared_from_this();
SPDLOG_DEBUG("Timer-task[name={}] to fire in {}ms", task_name, delay.count());
- timer->async_wait(std::bind(&SchedulerImpl::execute, std::placeholders::_1, timer, std::weak_ptr<TimerTask>(task)));
+ auto timer_task_weak_ptr = std::weak_ptr<TimerTask>(task);
+ task->timer->async_wait(std::bind(&SchedulerImpl::execute, std::placeholders::_1, timer_task_weak_ptr));
return id;
}
@@ -143,10 +156,9 @@ void SchedulerImpl::cancel(std::uint32_t task_id) {
tasks_.erase(search);
}
-void SchedulerImpl::execute(const asio::error_code& ec, asio::steady_timer* timer, std::weak_ptr<TimerTask> task) {
+void SchedulerImpl::execute(const asio::error_code& ec, std::weak_ptr<TimerTask> task) {
std::shared_ptr<TimerTask> timer_task = task.lock();
if (!timer_task) {
- delete timer;
return;
}
@@ -168,11 +180,15 @@ void SchedulerImpl::execute(const asio::error_code& ec, asio::steady_timer* time
#endif
if (timer_task->interval.count()) {
+ auto& timer = timer_task->timer;
timer->expires_at(timer->expiry() + timer_task->interval);
- timer->async_wait(std::bind(&SchedulerImpl::execute, std::placeholders::_1, timer, task));
+ timer->async_wait(std::bind(&SchedulerImpl::execute, std::placeholders::_1, task));
SPDLOG_DEBUG("Repeated timer-task {} to fire in {}ms", timer_task->task_name, timer_task->interval.count());
} else {
- delete timer;
+ auto scheduler = timer_task->scheduler.lock();
+ if (scheduler) {
+ scheduler->cancel(timer_task->task_id);
+ }
}
}
diff --git a/src/main/cpp/scheduler/include/Scheduler.h b/src/main/cpp/scheduler/include/Scheduler.h
index 34d676b..c280ea8 100644
--- a/src/main/cpp/scheduler/include/Scheduler.h
+++ b/src/main/cpp/scheduler/include/Scheduler.h
@@ -20,6 +20,7 @@
#include <cstdint>
#include <functional>
#include <string>
+#include <memory>
#include "rocketmq/RocketMQ.h"
@@ -39,4 +40,7 @@ public:
virtual void cancel(std::uint32_t task_id) = 0;
};
+using SchedulerPtr = std::weak_ptr<Scheduler>;
+using SchedulerSharedPtr = std::shared_ptr<Scheduler>;
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/main/cpp/scheduler/include/SchedulerImpl.h b/src/main/cpp/scheduler/include/SchedulerImpl.h
index 14b5fd2..bcd63d9 100644
--- a/src/main/cpp/scheduler/include/SchedulerImpl.h
+++ b/src/main/cpp/scheduler/include/SchedulerImpl.h
@@ -16,6 +16,7 @@
*/
#pragma once
#include <atomic>
+#include <bits/c++config.h>
#include <chrono>
#include <cstdint>
#include <functional>
@@ -34,16 +35,21 @@
ROCKETMQ_NAMESPACE_BEGIN
struct TimerTask {
+ std::uint32_t task_id;
+ std::string task_name;
std::function<void(void)> callback;
std::chrono::milliseconds interval;
- std::string task_name;
+ std::unique_ptr<asio::steady_timer> timer;
+ SchedulerPtr scheduler;
};
-class SchedulerImpl : public Scheduler {
+class SchedulerImpl : public std::enable_shared_from_this<SchedulerImpl>, public Scheduler {
public:
SchedulerImpl();
- ~SchedulerImpl() override = default;
+ explicit SchedulerImpl(std::uint32_t worker_num);
+
+ ~SchedulerImpl() override;
void start() override;
@@ -70,13 +76,16 @@ private:
std::unique_ptr<asio::executor_work_guard<asio::io_context::executor_type>> work_guard_;
absl::Mutex start_mtx_;
absl::CondVar start_cv_;
+ std::uint32_t worker_num_{std::thread::hardware_concurrency()};
std::vector<std::thread> threads_;
std::atomic<State> state_{State::CREATED};
absl::flat_hash_map<std::uint32_t, std::shared_ptr<TimerTask>> tasks_ GUARDED_BY(tasks_mtx_);
absl::Mutex tasks_mtx_;
- static void execute(const asio::error_code& ec, asio::steady_timer* timer, std::weak_ptr<TimerTask> task);
+ static void execute(const asio::error_code& ec, std::weak_ptr<TimerTask> task);
+
+ void shutdown0();
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/test/cpp/ut/admin/AdminServerTest.cpp b/src/test/cpp/ut/admin/AdminServerTest.cpp
index e88e00d..eb988cc 100644
--- a/src/test/cpp/ut/admin/AdminServerTest.cpp
+++ b/src/test/cpp/ut/admin/AdminServerTest.cpp
@@ -59,6 +59,8 @@ TEST(AdminServerTest, testSetUp) {
EXPECT_EQ(spdlog::level::info, logger->level());
admin_server->stop();
+
+ delete admin_server;
}
} // namespace admin
diff --git a/src/test/cpp/ut/client/ClientManagerFactoryTest.cpp b/src/test/cpp/ut/client/ClientManagerFactoryTest.cpp
index d63cabf..e48200e 100644
--- a/src/test/cpp/ut/client/ClientManagerFactoryTest.cpp
+++ b/src/test/cpp/ut/client/ClientManagerFactoryTest.cpp
@@ -40,6 +40,8 @@ TEST_F(ClientManagerFactoryTest, testGetClientManager) {
ClientManagerPtr client_manager = ClientManagerFactory::getInstance().getClientManager(client_config_);
EXPECT_TRUE(client_manager);
client_manager->start();
+
+ client_manager->shutdown();
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/test/cpp/ut/rocketmq/ClientImplTest.cpp b/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
index 561d26c..f93fbec 100644
--- a/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/ClientImplTest.cpp
@@ -24,6 +24,7 @@
#include "DynamicNameServerResolver.h"
#include "HttpClientMock.h"
#include "NameServerResolverMock.h"
+#include "Scheduler.h"
#include "SchedulerImpl.h"
#include "TopAddressing.h"
#include "rocketmq/RocketMQ.h"
@@ -49,18 +50,23 @@ class ClientImplTest : public testing::Test {
public:
void SetUp() override {
grpc_init();
+ scheduler_ = std::make_shared<SchedulerImpl>();
+
+ http_client_ = absl::make_unique<testing::NiceMock<HttpClientMock>>();
name_server_resolver_ = std::make_shared<DynamicNameServerResolver>(endpoint_, std::chrono::seconds(1));
- scheduler_.start();
client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler_));
+
+ ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::Return(scheduler_));
+ ON_CALL(*client_manager_, start).WillByDefault([&]() { scheduler_->start(); });
+ ON_CALL(*client_manager_, shutdown).WillByDefault([&]() { scheduler_->shutdown(); });
+
client_ = std::make_shared<TestClientImpl>(group_);
client_->withNameServerResolver(name_server_resolver_);
}
void TearDown() override {
grpc_shutdown();
- scheduler_.shutdown();
}
protected:
@@ -68,15 +74,13 @@ protected:
std::string resource_namespace_{"mq://test"};
std::string group_{"Group-0"};
std::shared_ptr<testing::NiceMock<ClientManagerMock>> client_manager_;
- SchedulerImpl scheduler_;
+ SchedulerSharedPtr scheduler_;
std::shared_ptr<TestClientImpl> client_;
std::shared_ptr<DynamicNameServerResolver> name_server_resolver_;
+ std::unique_ptr<testing::NiceMock<HttpClientMock>> http_client_;
};
TEST_F(ClientImplTest, testBasic) {
-
- auto http_client = absl::make_unique<HttpClientMock>();
-
std::string once{"10.0.0.1:9876"};
std::string then{"10.0.0.1:9876;10.0.0.2:9876"};
std::multimap<std::string, std::string> header;
@@ -100,8 +104,8 @@ TEST_F(ClientImplTest, testBasic) {
cv.SignalAll();
};
- EXPECT_CALL(*http_client, get).WillOnce(testing::Invoke(once_cb)).WillRepeatedly(testing::Invoke(then_cb));
- name_server_resolver_->injectHttpClient(std::move(http_client));
+ EXPECT_CALL(*http_client_, get).WillOnce(testing::Invoke(once_cb)).WillRepeatedly(testing::Invoke(then_cb));
+ name_server_resolver_->injectHttpClient(std::move(http_client_));
client_->resourceNamespace(resource_namespace_);
client_->start();
@@ -111,9 +115,10 @@ TEST_F(ClientImplTest, testBasic) {
cv.WaitWithDeadline(&mtx, absl::Now() + absl::Seconds(3));
}
}
- EXPECT_TRUE(completed);
- // Now that the derivative class has closed its resources, state of ClientImpl should be STOPPING.
+ ASSERT_TRUE(completed);
+
+ // Now that the derivative class has closed its own resources, state of ClientImpl should be STOPPING.
client_->state(State::STOPPING);
client_->shutdown();
}
diff --git a/src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp b/src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp
index 7aab281..47a17ae 100644
--- a/src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp
+++ b/src/test/cpp/ut/rocketmq/DefaultMQProducerTest.cpp
@@ -174,6 +174,7 @@ TEST_F(DefaultMQProducerUnitTest, testAsyncSendMessage) {
}
ASSERT_EQ(msg_id, message_id_);
producer->shutdown();
+ delete send_callback;
}
TEST_F(DefaultMQProducerUnitTest, testSendMessage) {
diff --git a/src/test/cpp/ut/rocketmq/ProducerImplTest.cpp b/src/test/cpp/ut/rocketmq/ProducerImplTest.cpp
index 5bf670c..5b99052 100644
--- a/src/test/cpp/ut/rocketmq/ProducerImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/ProducerImplTest.cpp
@@ -20,6 +20,7 @@
#include "ClientManagerFactory.h"
#include "ClientManagerMock.h"
#include "ProducerImpl.h"
+#include "Scheduler.h"
#include "SchedulerImpl.h"
#include "StaticNameServerResolver.h"
#include "TopicRouteData.h"
@@ -38,8 +39,11 @@ public:
void SetUp() override {
grpc_init();
+ scheduler_ = std::make_shared<SchedulerImpl>();
+ scheduler_->start();
name_server_resolver_ = std::make_shared<StaticNameServerResolver>(name_server_list_);
client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
+ ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::Return(scheduler_));
ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
producer_ = std::make_shared<ProducerImpl>(group_);
producer_->resourceNamespace(resource_namespace_);
@@ -60,10 +64,12 @@ public:
}
void TearDown() override {
+ scheduler_->shutdown();
grpc_shutdown();
}
protected:
+ SchedulerSharedPtr scheduler_;
std::shared_ptr<testing::NiceMock<ClientManagerMock>> client_manager_;
std::shared_ptr<ProducerImpl> producer_;
std::string name_server_list_{"10.0.0.1:9876"};
@@ -87,18 +93,11 @@ protected:
};
TEST_F(ProducerImplTest, testStartShutdown) {
- SchedulerImpl scheduler;
- scheduler.start();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
producer_->start();
producer_->shutdown();
- scheduler.shutdown();
}
TEST_F(ProducerImplTest, testSend) {
- SchedulerImpl scheduler;
- scheduler.start();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
auto mock_resolve_route =
[this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
@@ -129,13 +128,9 @@ TEST_F(ProducerImplTest, testSend) {
EXPECT_FALSE(ec);
EXPECT_TRUE(cb_invoked);
producer_->shutdown();
- scheduler.shutdown();
}
TEST_F(ProducerImplTest, testSend_WithMessageGroup) {
- SchedulerImpl scheduler;
- scheduler.start();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
auto mock_resolve_route =
[this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
@@ -167,13 +162,9 @@ TEST_F(ProducerImplTest, testSend_WithMessageGroup) {
EXPECT_FALSE(ec);
EXPECT_TRUE(cb_invoked);
producer_->shutdown();
- scheduler.shutdown();
}
TEST_F(ProducerImplTest, testSend_WithMessageQueueSelector) {
- SchedulerImpl scheduler;
- scheduler.start();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
auto mock_resolve_route =
[this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
@@ -211,7 +202,6 @@ TEST_F(ProducerImplTest, testSend_WithMessageQueueSelector) {
EXPECT_TRUE(cb_invoked);
producer_->shutdown();
- scheduler.shutdown();
}
class TestSendCallback : public SendCallback {
@@ -237,9 +227,6 @@ protected:
};
TEST_F(ProducerImplTest, testAsyncSend) {
- SchedulerImpl scheduler;
- scheduler.start();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
auto mock_resolve_route =
[this](const std::string& target_host, const Metadata& metadata, const QueryRouteRequest& request,
std::chrono::milliseconds timeout,
@@ -280,7 +267,6 @@ TEST_F(ProducerImplTest, testAsyncSend) {
}
EXPECT_TRUE(cb_invoked);
- scheduler.shutdown();
producer_->shutdown();
}
diff --git a/src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp b/src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp
index a714519..729aac0 100644
--- a/src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/PullConsumerImplTest.cpp
@@ -40,12 +40,12 @@ class PullConsumerImplTest : public testing::Test {
public:
void SetUp() override {
grpc_init();
-
+ scheduler_ = std::make_shared<SchedulerImpl>();
name_server_resolver_ = std::make_shared<StaticNameServerResolver>(name_server_list_);
- scheduler_.start();
+ scheduler_->start();
client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler_));
+ ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::Return(scheduler_));
ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
pull_consumer_ = std::make_shared<PullConsumerImpl>(group_);
@@ -67,7 +67,7 @@ public:
void TearDown() override {
grpc_shutdown();
- scheduler_.shutdown();
+ scheduler_->shutdown();
}
protected:
@@ -79,7 +79,7 @@ protected:
std::string tag_{"TagB"};
std::shared_ptr<testing::NiceMock<ClientManagerMock>> client_manager_;
std::shared_ptr<PullConsumerImpl> pull_consumer_;
- SchedulerImpl scheduler_;
+ SchedulerSharedPtr scheduler_;
std::string broker_name_{"broker-a"};
int broker_id_{0};
std::string message_body_{"Message Body Content"};
@@ -190,6 +190,7 @@ TEST_F(PullConsumerImplTest, testPull) {
EXPECT_FALSE(failure);
pull_consumer_->shutdown();
+ delete pull_callback;
}
TEST_F(PullConsumerImplTest, testPull_gRPC_error) {
@@ -237,6 +238,7 @@ TEST_F(PullConsumerImplTest, testPull_gRPC_error) {
EXPECT_FALSE(success);
pull_consumer_->shutdown();
+ delete pull_callback;
}
TEST_F(PullConsumerImplTest, testPull_biz_error) {
@@ -285,6 +287,7 @@ TEST_F(PullConsumerImplTest, testPull_biz_error) {
EXPECT_TRUE(failure);
pull_consumer_->shutdown();
+ delete pull_callback;
}
TEST_F(PullConsumerImplTest, testQueryOffset) {
diff --git a/src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp b/src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp
index bf83bda..b84dbea 100644
--- a/src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp
+++ b/src/test/cpp/ut/rocketmq/PushConsumerImplTest.cpp
@@ -24,6 +24,7 @@
#include "InvocationContext.h"
#include "MessageAccessor.h"
#include "PushConsumerImpl.h"
+#include "Scheduler.h"
#include "StaticNameServerResolver.h"
#include "grpc/grpc.h"
#include "rocketmq/MQMessageExt.h"
@@ -45,22 +46,26 @@ public:
void SetUp() override {
grpc_init();
-
+ scheduler_ = std::make_shared<SchedulerImpl>();
name_server_resolver_ = std::make_shared<StaticNameServerResolver>(name_server_list_);
-
client_manager_ = std::make_shared<testing::NiceMock<ClientManagerMock>>();
ClientManagerFactory::getInstance().addClientManager(resource_namespace_, client_manager_);
push_consumer_ = std::make_shared<PushConsumerImpl>(group_);
push_consumer_->resourceNamespace(resource_namespace_);
push_consumer_->withNameServerResolver(name_server_resolver_);
push_consumer_->registerMessageListener(message_listener_.get());
+ scheduler_->start();
+
+ ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::Return(scheduler_));
}
void TearDown() override {
+ scheduler_->shutdown();
grpc_shutdown();
}
protected:
+ SchedulerSharedPtr scheduler_;
std::string name_server_list_{"10.0.0.1:9876"};
std::shared_ptr<StaticNameServerResolver> name_server_resolver_;
std::string resource_namespace_{"mq://test"};
@@ -77,10 +82,6 @@ protected:
};
TEST_F(PushConsumerImplTest, testAck) {
- SchedulerImpl scheduler;
- scheduler.start();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
-
auto ack_cb = [](const std::string& target_host, const Metadata& metadata, const AckMessageRequest& request,
std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) {
std::error_code ec;
@@ -118,14 +119,9 @@ TEST_F(PushConsumerImplTest, testAck) {
}
EXPECT_TRUE(completed);
push_consumer_->shutdown();
- scheduler.shutdown();
}
TEST_F(PushConsumerImplTest, testNack) {
- SchedulerImpl scheduler;
- scheduler.start();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
-
auto nack_cb = [](const std::string& target_host, const Metadata& metadata, const NackMessageRequest& request,
std::chrono::milliseconds timeout, const std::function<void(const std::error_code&)>& cb) {
std::error_code ec;
@@ -161,14 +157,9 @@ TEST_F(PushConsumerImplTest, testNack) {
}
EXPECT_TRUE(completed);
push_consumer_->shutdown();
- scheduler.shutdown();
}
TEST_F(PushConsumerImplTest, testForward) {
- SchedulerImpl scheduler;
- scheduler.start();
- ON_CALL(*client_manager_, getScheduler).WillByDefault(testing::ReturnRef(scheduler));
-
InvocationContext<ForwardMessageToDeadLetterQueueResponse> invocation_context;
auto forward_cb =
@@ -201,7 +192,6 @@ TEST_F(PushConsumerImplTest, testForward) {
message.setDelayTimeLevel(delay_level_);
push_consumer_->forwardToDeadLetterQueue(message, callback);
-
{
absl::MutexLock lk(&mtx);
if (!completed) {
@@ -210,7 +200,6 @@ TEST_F(PushConsumerImplTest, testForward) {
}
EXPECT_TRUE(completed);
push_consumer_->shutdown();
- scheduler.shutdown();
}
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/src/test/cpp/ut/scheduler/SchedulerTest.cpp b/src/test/cpp/ut/scheduler/SchedulerTest.cpp
index 6e223e3..741d831 100644
--- a/src/test/cpp/ut/scheduler/SchedulerTest.cpp
+++ b/src/test/cpp/ut/scheduler/SchedulerTest.cpp
@@ -19,6 +19,7 @@
#include <exception>
#include <functional>
#include <iostream>
+#include <memory>
#include <thread>
#include "SchedulerImpl.h"
@@ -28,15 +29,22 @@ ROCKETMQ_NAMESPACE_BEGIN
class SchedulerTest : public testing::Test {
public:
+ SchedulerTest() : scheduler(std::make_shared<SchedulerImpl>()) {
+ }
+
void SetUp() override {
+ scheduler->start();
+ }
+
+ void TearDown() override {
+ scheduler->shutdown();
}
protected:
+ SchedulerSharedPtr scheduler;
};
TEST_F(SchedulerTest, testSingleShot) {
- SchedulerImpl scheduler;
- scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
@@ -46,7 +54,7 @@ TEST_F(SchedulerTest, testSingleShot) {
callback_fire_count++;
};
- scheduler.schedule(callback, "single-shot", std::chrono::milliseconds(10), std::chrono::milliseconds(0));
+ scheduler->schedule(callback, "single-shot", std::chrono::milliseconds(10), std::chrono::milliseconds(0));
// Wait till callback is executed.
{
@@ -55,13 +63,9 @@ TEST_F(SchedulerTest, testSingleShot) {
cv.Wait(&mtx);
}
}
-
- scheduler.shutdown();
}
TEST_F(SchedulerTest, testCancel) {
- SchedulerImpl scheduler;
- scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
@@ -71,16 +75,13 @@ TEST_F(SchedulerTest, testCancel) {
callback_fire_count++;
};
- std::uint32_t task_id = scheduler.schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
- scheduler.cancel(task_id);
+ std::uint32_t task_id = scheduler->schedule(callback, "test-cancel", std::chrono::seconds(1), std::chrono::seconds(1));
+ scheduler->cancel(task_id);
std::this_thread::sleep_for(std::chrono::seconds(2));
ASSERT_EQ(0, callback_fire_count);
- scheduler.shutdown();
}
TEST_F(SchedulerTest, testPeriodicShot) {
- SchedulerImpl scheduler;
- scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
@@ -91,17 +92,14 @@ TEST_F(SchedulerTest, testPeriodicShot) {
};
std::uintptr_t task_id =
- scheduler.schedule(callback, "periodic-task", std::chrono::milliseconds(10), std::chrono::seconds(1));
+ scheduler->schedule(callback, "periodic-task", std::chrono::milliseconds(10), std::chrono::seconds(1));
// Wait till callback is executed.
std::this_thread::sleep_for(std::chrono::seconds(5));
ASSERT_TRUE(callback_fire_count >= 4);
- scheduler.cancel(task_id);
- scheduler.shutdown();
+ scheduler->cancel(task_id);
}
TEST_F(SchedulerTest, testSingleShotWithZeroDelay) {
- SchedulerImpl scheduler;
- scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
@@ -111,7 +109,7 @@ TEST_F(SchedulerTest, testSingleShotWithZeroDelay) {
callback_fire_count++;
};
- scheduler.schedule(callback, "single-shot-with-0-delay", std::chrono::milliseconds(0), std::chrono::milliseconds(0));
+ scheduler->schedule(callback, "single-shot-with-0-delay", std::chrono::milliseconds(0), std::chrono::milliseconds(0));
// Wait till callback is executed.
{
@@ -120,13 +118,9 @@ TEST_F(SchedulerTest, testSingleShotWithZeroDelay) {
cv.Wait(&mtx);
}
}
-
- scheduler.shutdown();
}
TEST_F(SchedulerTest, testException) {
- SchedulerImpl scheduler;
- scheduler.start();
absl::Mutex mtx;
absl::CondVar cv;
int callback_fire_count{0};
@@ -141,7 +135,7 @@ TEST_F(SchedulerTest, testException) {
throw e;
};
- scheduler.schedule(callback, "test-exception", std::chrono::milliseconds(100), std::chrono::milliseconds(100));
+ scheduler->schedule(callback, "test-exception", std::chrono::milliseconds(100), std::chrono::milliseconds(100));
// Wait till callback is executed.
{
@@ -152,7 +146,11 @@ TEST_F(SchedulerTest, testException) {
}
std::this_thread::sleep_for(std::chrono::seconds(3));
+}
- scheduler.shutdown();
+TEST(SchedulerLifeCycleTest, testLifeCycle) {
+ auto scheduler = std::make_shared<SchedulerImpl>();
+ scheduler->start();
}
+
ROCKETMQ_NAMESPACE_END
\ No newline at end of file