You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 04:52:56 UTC

[inlong] branch master updated: [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed (#5707)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/inlong.git


The following commit(s) were added to refs/heads/master by this push:
     new d55e3d9cf [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed (#5707)
d55e3d9cf is described below

commit d55e3d9cf89ecfa7a61eb43349a7b458c5a4ee77
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Aug 26 12:52:52 2022 +0800

    [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed (#5707)
    
    Co-authored-by: Charley <ch...@apache.org>
---
 .../tubemq-client-cpp/src/baseconsumer.cc          | 40 ++++++++++++++++++----
 1 file changed, 34 insertions(+), 6 deletions(-)

diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index 25001de2b..c7152a16e 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -126,13 +126,32 @@ void BaseConsumer::ShutDown() {
   close2Master();
   // 3. close all brokers
   closeAllBrokers();
-  // 4. remove client stub
-  TubeMQService::Instance()->RmvClientObj(shared_from_this());
-  client_index_ = tb_config::kInvalidValue;
+  // 4. check master hb thread status
+  int check_count = 5;
+  while (master_hb_status_.Get() != 0) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(30));
+    if (--check_count <= 0) {
+      LOG_INFO("[CONSUMER] Found hb status id not zero[%d], client=%s",
+               master_hb_status_.Get(), client_uuid_.c_str());  
+      break;
+    }
+  }
+  check_count = 5;
+  while (master_reg_status_.Get() != 0) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(30));
+    if (--check_count <= 0) {
+      LOG_INFO("[CONSUMER] Found reg status id not zero[%d], client=%s",
+               master_reg_status_.Get(), client_uuid_.c_str());  
+      break;
+    }
+  }
   // 5. join hb thread;
   heart_beat_timer_ = nullptr;
   rebalance_thread_ptr_->join();
   rebalance_thread_ptr_ = nullptr;
+  // 6. remove client stub
+  TubeMQService::Instance()->RmvClientObj(shared_from_this());
+  client_index_ = tb_config::kInvalidValue;
   LOG_INFO("[CONSUMER] ShutDown consumer finished, client=%s", client_uuid_.c_str());
 }
 
@@ -520,8 +539,9 @@ void BaseConsumer::heartBeat2Master() {
   req_protocol->request_id_ = request->request_id_;
   req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
   // send message to target
+  auto self = shared_from_this();
   AsyncRequest(request, req_protocol)
-      .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+      .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
         if (error.Value() != err_code::kErrSuccess) {
           master_sh_retry_cnt_++;
           LOG_WARN("[CONSUMER] heartBeat2Master failue to (%s:%d) : %s, client=%s",
@@ -559,7 +579,6 @@ void BaseConsumer::heartBeat2Master() {
           }
         }
         heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms()));
-        auto self = shared_from_this();
         heart_beat_timer_->async_wait([self, this](const std::error_code& ec) {
           if (ec) {
             return;
@@ -665,6 +684,10 @@ void BaseConsumer::processConnect2Broker(ConsumerEvent& event) {
     rmtdata_cache_.FilterPartitions(subscribe_info, subscribed_partitions, unsub_partitions);
     if (!unsub_partitions.empty()) {
       for (it = unsub_partitions.begin(); it != unsub_partitions.end(); it++) {
+        if (!isClientRunning()) {
+          LOG_TRACE("[processConnect2Broker] client stopped, break pos1, clientid=%s", client_uuid_.c_str());
+          break;
+        }
         LOG_TRACE("[processConnect2Broker] connect to %s, clientid=%s",
                   it->GetPartitionKey().c_str(), client_uuid_.c_str());
         auto request = std::make_shared<RequestContext>();
@@ -681,6 +704,10 @@ void BaseConsumer::processConnect2Broker(ConsumerEvent& event) {
         // send message to target
         ResponseContext response_context;
         ErrorCode error = SyncRequest(response_context, request, req_protocol);
+        if (!isClientRunning()) {
+          LOG_TRACE("[processConnect2Broker] client stopped, break pos2, clientid=%s", client_uuid_.c_str());
+          break;
+        }
         if (error.Value() != err_code::kErrSuccess) {
           LOG_WARN("[Connect2Broker] request network failure to (%s:%d) : %s",
                    it->GetBrokerHost().c_str(), it->GetBrokerPort(), error.Message().c_str());
@@ -759,8 +786,9 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
   req_protocol->request_id_ = request->request_id_;
   req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
   // send message to target
+  auto self = shared_from_this();
   AsyncRequest(request, req_protocol)
-      .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+      .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
         if (error.Value() != err_code::kErrSuccess) {
           LOG_WARN("[Heartbeat2Broker] request network  to failure (%s), ression is %s",
                    broker_info.GetAddrInfo().c_str(), error.Message().c_str());