You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by li...@apache.org on 2021/10/20 02:01:21 UTC
[rocketmq-client-cpp] branch main updated: Bugfix: Compare and
initiate poll-command-cycle when topic route is updated (#371)
This is an automated email from the ASF dual-hosted git repository.
lizhanhui pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/main by this push:
new 33b0cee Bugfix: Compare and initiate poll-command-cycle when topic route is updated (#371)
33b0cee is described below
commit 33b0cee832d09c5f635c35800b0393385a3ed630
Author: aaron ai <ya...@gmail.com>
AuthorDate: Wed Oct 20 10:01:15 2021 +0800
Bugfix: Compare and initiate poll-command-cycle when topic route is updated (#371)
---
src/main/cpp/rocketmq/ClientImpl.cpp | 33 +++++++++++++++++++++++++++++++++
1 file changed, 33 insertions(+)
diff --git a/src/main/cpp/rocketmq/ClientImpl.cpp b/src/main/cpp/rocketmq/ClientImpl.cpp
index 0cc18d9..0bc2205 100644
--- a/src/main/cpp/rocketmq/ClientImpl.cpp
+++ b/src/main/cpp/rocketmq/ClientImpl.cpp
@@ -303,8 +303,18 @@ void ClientImpl::updateRouteCache(const std::string& topic, const std::error_cod
return;
}
+ absl::flat_hash_set<std::string> new_hosts;
{
absl::MutexLock lk(&topic_route_table_mtx_);
+ absl::flat_hash_set<std::string> existed_hosts;
+ for (const auto& item : topic_route_table_) {
+ for (const auto& partition : item.second->partitions()) {
+ std::string endpoint = partition.asMessageQueue().serviceAddress();
+ if (!existed_hosts.contains(endpoint)) {
+ existed_hosts.emplace(std::move(endpoint));
+ }
+ }
+ }
if (!topic_route_table_.contains(topic)) {
topic_route_table_.insert({topic, route});
SPDLOG_INFO("TopicRouteData for topic={} has changed. NONE --> {}", topic, route->debugString());
@@ -317,10 +327,25 @@ void ClientImpl::updateRouteCache(const std::string& topic, const std::error_cod
route->debugString());
}
}
+ absl::flat_hash_set<std::string> hosts;
+ for (const auto& item : topic_route_table_) {
+ for (const auto& partition : item.second->partitions()) {
+ std::string endpoint = partition.asMessageQueue().serviceAddress();
+ if (!hosts.contains(endpoint)) {
+ hosts.emplace(std::move(endpoint));
+ }
+ }
+ }
+ std::set_difference(hosts.begin(), hosts.end(), existed_hosts.begin(), existed_hosts.end(),
+ std::inserter(new_hosts, new_hosts.begin()));
+ }
+ for (const auto& endpoints : new_hosts) {
+ pollCommand(endpoints);
}
}
void ClientImpl::pollCommand(const std::string& target) {
+ SPDLOG_INFO("Start to poll command to remote, target={}", target);
absl::flat_hash_map<std::string, std::string> metadata;
Signature::sign(this, metadata);
@@ -351,6 +376,7 @@ void ClientImpl::pollCommand(const std::string& target) {
}
void ClientImpl::verifyMessageConsumption(std::string remote_address, std::string command_id, MQMessageExt message) {
+ SPDLOG_INFO("Received message to verify consumption, messageId={}", message.getMsgId());
MessageListener* listener = messageListener();
Metadata metadata;
@@ -379,6 +405,13 @@ void ClientImpl::verifyMessageConsumption(std::string remote_address, std::strin
}
void ClientImpl::onPollCommandResponse(const InvocationContext<PollCommandResponse>* ctx) {
+ std::string address = ctx->remote_address;
+ absl::flat_hash_set<std::string> hosts;
+ endpointsInUse(hosts);
+ if (!hosts.contains(address)) {
+ SPDLOG_INFO("Endpoints={} was not used, stop to poll command.", address);
+ return;
+ }
if (!ctx->status.ok()) {
static std::string task_name = "Poll-Command-Later";
client_manager_->getScheduler().schedule(std::bind(&ClientImpl::pollCommand, this, ctx->remote_address), task_name,