You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2022/07/19 13:23:29 UTC

[rocketmq-clients] 01/01: Sync client settings every 5 minutes

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch cpp_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-clients.git

commit 6e79aaeb9e469ba2468be42cb0844c7bf0927fbb
Author: Li Zhanhui <li...@gmail.com>
AuthorDate: Tue Jul 19 21:23:08 2022 +0800

    Sync client settings every 5 minutes
---
 cpp/source/client/SessionImpl.cpp          |  2 +-
 cpp/source/client/TelemetryBidiReactor.cpp |  4 ++--
 cpp/source/client/include/Session.h        |  2 ++
 cpp/source/client/include/SessionImpl.h    |  2 +-
 cpp/source/rocketmq/ClientImpl.cpp         | 23 +++++++++++++++++++++++
 cpp/source/rocketmq/include/ClientImpl.h   |  5 +++++
 6 files changed, 34 insertions(+), 4 deletions(-)

diff --git a/cpp/source/client/SessionImpl.cpp b/cpp/source/client/SessionImpl.cpp
index 5b7c085..1ecf326 100644
--- a/cpp/source/client/SessionImpl.cpp
+++ b/cpp/source/client/SessionImpl.cpp
@@ -31,7 +31,7 @@ bool SessionImpl::await() {
 
 void SessionImpl::syncSettings() {
   auto ptr = client_.lock();
-
+  SPDLOG_INFO("Sync client settings to {}", rpc_client_->remoteAddress());
   TelemetryCommand command;
   command.mutable_settings()->CopyFrom(ptr->clientSettings());
   telemetry_->write(command);
diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp
index e88eb13..20860e4 100644
--- a/cpp/source/client/TelemetryBidiReactor.cpp
+++ b/cpp/source/client/TelemetryBidiReactor.cpp
@@ -152,7 +152,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
         writes_.push_back(response);
       }
       fireWrite();
-      StartRead(&read_);
       break;
     }
 
@@ -176,7 +175,6 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
       raw->mutableExtension().target_endpoint = peer_address_;
       raw->mutableExtension().nonce = read_.verify_message_command().nonce();
       client->verify(message, cb);
-      StartRead(&read_);
       break;
     }
 
@@ -185,6 +183,8 @@ void TelemetryBidiReactor::OnReadDone(bool ok) {
       break;
     }
   }
+
+  fireRead();
 }
 
 void TelemetryBidiReactor::applySettings(const rmq::Settings& settings) {
diff --git a/cpp/source/client/include/Session.h b/cpp/source/client/include/Session.h
index 3c32ca5..ea2267f 100644
--- a/cpp/source/client/include/Session.h
+++ b/cpp/source/client/include/Session.h
@@ -25,6 +25,8 @@ public:
   virtual ~Session() = default;
 
   virtual bool await() = 0;
+
+  virtual void syncSettings() = 0;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/client/include/SessionImpl.h b/cpp/source/client/include/SessionImpl.h
index 3ab381d..8207080 100644
--- a/cpp/source/client/include/SessionImpl.h
+++ b/cpp/source/client/include/SessionImpl.h
@@ -39,7 +39,7 @@ private:
   // TODO: use unique_ptr
   std::shared_ptr<TelemetryBidiReactor> telemetry_;
 
-  void syncSettings();
+  void syncSettings() override;
 };
 
 ROCKETMQ_NAMESPACE_END
\ No newline at end of file
diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp
index c578eb7..b90035e 100644
--- a/cpp/source/rocketmq/ClientImpl.cpp
+++ b/cpp/source/rocketmq/ClientImpl.cpp
@@ -169,6 +169,16 @@ void ClientImpl::start() {
   route_update_handle_ = client_manager_->getScheduler()->schedule(route_update_functor, UPDATE_ROUTE_TASK_NAME,
                                                                    std::chrono::seconds(10), std::chrono::seconds(30));
 
+  auto telemetry_functor = [ptr]() {
+    std::shared_ptr<ClientImpl> base = ptr.lock();
+    if (base) {
+      SPDLOG_INFO("Sync client settings to servers");
+      base->syncClientSettings();
+    }
+  };
+  telemetry_handle_ = client_manager_->getScheduler()->schedule(telemetry_functor, TELEMETRY_TASK_NAME,
+                                                                std::chrono::minutes(5), std::chrono::minutes(5));
+
   auto endpoints = client_config_.metric.endpoints;
   std::string target;
   switch (endpoints.scheme()) {
@@ -220,6 +230,11 @@ void ClientImpl::shutdown() {
     if (route_update_handle_) {
       client_manager_->getScheduler()->cancel(route_update_handle_);
     }
+
+    if (telemetry_handle_) {
+      client_manager_->getScheduler()->cancel(telemetry_handle_);
+    }
+
     client_manager_.reset();
   } else {
     SPDLOG_ERROR("Try to shutdown ClientImpl, but its state is not as expected. Expecting: {}, Actual: {}",
@@ -228,6 +243,7 @@ void ClientImpl::shutdown() {
 }
 
 const char* ClientImpl::UPDATE_ROUTE_TASK_NAME = "route_updater";
+const char* ClientImpl::TELEMETRY_TASK_NAME = "client_settings_sync";
 
 void ClientImpl::endpointsInUse(absl::flat_hash_set<std::string>& endpoints) {
   absl::MutexLock lk(&topic_route_table_mtx_);
@@ -323,6 +339,13 @@ void ClientImpl::fetchRouteFor(const std::string& topic,
                                 absl::ToChronoMilliseconds(client_config_.request_timeout), callback);
 }
 
+void ClientImpl::syncClientSettings() {
+  absl::MutexLock lk(&session_map_mtx_);
+  for (const auto& entry : session_map_) {
+    entry.second->syncSettings();
+  }
+}
+
 void ClientImpl::updateRouteInfo() {
   if (State::STARTED != state_.load(std::memory_order_relaxed) &&
       State::STARTING != state_.load(std::memory_order_relaxed)) {
diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h
index 07f1efe..2a06a0f 100644
--- a/cpp/source/rocketmq/include/ClientImpl.h
+++ b/cpp/source/rocketmq/include/ClientImpl.h
@@ -142,6 +142,11 @@ protected:
   static const char* UPDATE_ROUTE_TASK_NAME;
   std::uint32_t route_update_handle_{0};
 
+  static const char* TELEMETRY_TASK_NAME;
+  std::uint32_t telemetry_handle_{0};
+
+  void syncClientSettings() LOCKS_EXCLUDED(session_map_mtx_);
+
   // Set Name Server Resolver
   std::shared_ptr<NameServerResolver> name_server_resolver_;