You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/20 02:32:11 UTC
[rocketmq-clients] branch master updated: Sync client settings every 5 minutes (#59)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git
The following commit(s) were added to refs/heads/master by this push:
new f6fdbbd Sync client settings every 5 minutes (#59)
f6fdbbd is described below
commit f6fdbbdbd8eceead8b7adf7d07c2d02e81d04cd6
Author: Zhanhui Li <li...@gmail.com>
AuthorDate: Wed Jul 20 10:32:06 2022 +0800
Sync client settings every 5 minutes (#59)
---
cpp/source/client/SessionImpl.cpp | 2 +-
cpp/source/client/TelemetryBidiReactor.cpp | 4 ++--
cpp/source/client/include/Session.h | 2 ++
cpp/source/client/include/SessionImpl.h | 2 +-
cpp/source/rocketmq/ClientImpl.cpp | 23 +++++++++++++++++++++++
cpp/source/rocketmq/include/ClientImpl.h | 5 +++++
6 files changed, 34 insertions(+), 4 deletions(-)
diff --git a/cpp/source/client/SessionImpl.cpp b/cpp/source/client/SessionImpl.cpp
index 5b7c085..1ecf326 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -31,7 +31,7 @@ bool SessionImpl::await() {
void SessionImpl::syncSettings() {
auto ptr = client_.lock();
-
+ SPDLOG_INFO("Sync client settings to {}", rpc_client_->remoteAddress());
TelemetryCommand command;
command.mutable_settings()->CopyFrom(ptr->clientSettings());
telemetry_->write(command);
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp
index e88eb13..20860e4 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -152,7 +152,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
writes_.push_back(response);
}
fireWrite();
- StartRead(&read_);
break;
}
@@ -176,7 +175,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
raw->mutableExtension().target_endpoint = peer_address_;
raw->mutableExtension().nonce = read_.verify_message_command().nonce();
client->verify(message, cb);
- StartRead(&read_);
break;
}
@@ -185,6 +183,8 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
break;
}
}
+
+ fireRead();
}
void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
diff --git a/cpp/source/client/include/Session.h b/cpp/source/client/include/Session.h
index 3c32ca5..ea2267f 100644
--- a/cpp/source/client/include/Session.h
+++ b/cpp/source/client/include/Session.h
@@ -25,6 +25,8 @@ public:
virtual ~Session() = default;
virtual bool await() = 0;
+
+ virtual void syncSettings() = 0;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/SessionImpl.h b/cpp/source/client/include/SessionImpl.h
index 3ab381d..8207080 100644
--- a/cpp/source/client/include/SessionImpl.h
+++ b/cpp/source/client/include/SessionImpl.h
@@ -39,7 +39,7 @@ private:
// TODO: use unique_ptr
std::shared_ptr<TelemetryBidiReactor> telemetry_;
- void syncSettings();
+ void syncSettings() override;
};
ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp
index c578eb7..b90035e 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -169,6 +169,16 @@ void ClientImpl::start() {
route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
std::chrono::seconds(10), std::chrono::seconds(30));
+ auto telemetry_functor = [ptr]() {
+ std::shared_ptr<ClientImpl> base = ptr.lock();
+ if (base) {
+ SPDLOG_INFO("Sync client settings to servers");
+ base->syncClientSettings();
+ }
+ };
+ telemetry_handle_ = client_manager_->getScheduler()->schedule(telemetry_functor, TELEMETRY_TASK_NAME,
+ std::chrono::minutes(5), std::chrono::minutes(5));
+
auto endpoints = client_config_.metric.endpoints;
std::string target;
switch (endpoints.scheme()) {
@@ -220,6 +230,11 @@ void ClientImpl::shutdown() {
if (route_update_handle_) {
client_manager_->getScheduler()->cancel(route_update_handle_);
}
+
+ if (telemetry_handle_) {
+ client_manager_->getScheduler()->cancel(telemetry_handle_);
+ }
+
client_manager_.reset();
} else {
SPDLOG_ERROR("Try to shutdown ClientImpl, but its state is not as expected. Expecting: {}, Actual: {}",
@@ -228,6 +243,7 @@ void ClientImpl::shutdown() {
}
const char* ClientImpl::UPDATE_ROUTE_TASK_NAME = "route_updater";
+const char* ClientImpl::TELEMETRY_TASK_NAME = "client_settings_sync";
void ClientImpl::endpointsInUse(absl::flat_hash_set<std::string>& endpoints) {
absl::MutexLock lk(&topic_route_table_mtx_);
@@ -323,6 +339,13 @@ void ClientImpl::fetchRouteFor(const std::string& topic,
absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
}
+void ClientImpl::syncClientSettings() {
+ absl::MutexLock lk(&session_map_mtx_);
+ for (const auto& entry : session_map_) {
+ entry.second->syncSettings();
+ }
+}
+
void ClientImpl::updateRouteInfo() {
if (State::STARTED != state_.load(std::memory_order_relaxed) &&
State::STARTING != state_.load(std::memory_order_relaxed)) {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h
index 07f1efe..2a06a0f 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -142,6 +142,11 @@ protected:
static const char* UPDATE_ROUTE_TASK_NAME;
std::uint32_t route_update_handle_{0};
+ static const char* TELEMETRY_TASK_NAME;
+ std::uint32_t telemetry_handle_{0};
+
+ void syncClientSettings() LOCKS_EXCLUDED(session_map_mtx_);
+
// Set Name Server Resolver
std::shared_ptr<NameServerResolver> name_server_resolver_;