You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/20 02:01:21 UTC

[rocketmq-client-cpp] branch main updated: Bugfix: Compare and initiate poll-command-cycle when topic route is updated (#371)

This is an automated email from the ASF dual-hosted git repository.

lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/main by this push:
     new 33b0cee  Bugfix: Compare and initiate poll-command-cycle when topic route is updated (#371)
33b0cee is described below

commit 33b0cee832d09c5f635c35800b0393385a3ed630
Author: aaron ai <ya...@gmail.com>
AuthorDate: Wed Oct 20 10:01:15 2021 +0800

    Bugfix: Compare and initiate poll-command-cycle when topic route is updated (#371)
---
 src/main/cpp/rocketmq/ClientImpl.cpp | 33 +++++++++++++++++++++++++++++++++
 1 file changed, 33 insertions(+)

diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index 0cc18d9..0bc2205 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -303,8 +303,18 @@ void ClientImpl::updateRouteCache(const std::string& topic, const std::error_cod
     return;
   }
 
+  absl::flat_hash_set<std::string> new_hosts;
   {
     absl::MutexLock lk(&topic_route_table_mtx_);
+    absl::flat_hash_set<std::string> existed_hosts;
+    for (const auto& item : topic_route_table_) {
+      for (const auto& partition : item.second->partitions()) {
+        std::string endpoint = partition.asMessageQueue().serviceAddress();
+        if (!existed_hosts.contains(endpoint)) {
+          existed_hosts.emplace(std::move(endpoint));
+        }
+      }
+    }
     if (!topic_route_table_.contains(topic)) {
       topic_route_table_.insert({topic, route});
       SPDLOG_INFO("TopicRouteData for topic={} has changed. NONE --> {}", topic, route->debugString());
@@ -317,10 +327,25 @@ void ClientImpl::updateRouteCache(const std::string& topic, const std::error_cod
                     route->debugString());
       }
     }
+    absl::flat_hash_set<std::string> hosts;
+    for (const auto& item : topic_route_table_) {
+      for (const auto& partition : item.second->partitions()) {
+        std::string endpoint = partition.asMessageQueue().serviceAddress();
+        if (!hosts.contains(endpoint)) {
+          hosts.emplace(std::move(endpoint));
+        }
+      }
+    }
+    std::set_difference(hosts.begin(), hosts.end(), existed_hosts.begin(), existed_hosts.end(),
+                        std::inserter(new_hosts, new_hosts.begin()));
+  }
+  for (const auto& endpoints : new_hosts) {
+    pollCommand(endpoints);
   }
 }
 
 void ClientImpl::pollCommand(const std::string& target) {
+  SPDLOG_INFO("Start to poll command to remote, target={}", target);
   absl::flat_hash_map<std::string, std::string> metadata;
   Signature::sign(this, metadata);
 
@@ -351,6 +376,7 @@ void ClientImpl::pollCommand(const std::string& target) {
 }
 
 void ClientImpl::verifyMessageConsumption(std::string remote_address, std::string command_id, MQMessageExt message) {
+  SPDLOG_INFO("Received message to verify consumption, messageId={}", message.getMsgId());
   MessageListener* listener = messageListener();
 
   Metadata metadata;
@@ -379,6 +405,13 @@ void ClientImpl::verifyMessageConsumption(std::string remote_address, std::strin
 }
 
 void ClientImpl::onPollCommandResponse(const InvocationContext<PollCommandResponse>* ctx) {
+  std::string address = ctx->remote_address;
+  absl::flat_hash_set<std::string> hosts;
+  endpointsInUse(hosts);
+  if (!hosts.contains(address)) {
+    SPDLOG_INFO("Endpoints={} was not used, stop to poll command.", address);
+    return;
+  }
   if (!ctx->status.ok()) {
     static std::string task_name = "Poll-Command-Later";
     client_manager_->getScheduler().schedule(std::bind(&ClientImpl::pollCommand, this, ctx->remote_address), task_name,