You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/28 14:20:41 UTC

[inlong] 01/02: [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed(addendum) (#5721)

This is an automated email from the ASF dual-hosted git repository.

dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git

commit 303a2179543c4570794621097a338d0f1e4271db
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Aug 27 20:56:47 2022 +0800

    [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed(addendum) (#5721)
---
 .../tubemq-client-cpp/src/baseconsumer.cc          | 68 +++++++++++++---------
 1 file changed, 40 insertions(+), 28 deletions(-)

diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index c7152a16e..5fbff8ddc 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -127,27 +127,23 @@ void BaseConsumer::ShutDown() {
   // 3. close all brokers
   closeAllBrokers();
   // 4. check master hb thread status
-  int check_count = 5;
-  while (master_hb_status_.Get() != 0) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(30));
-    if (--check_count <= 0) {
-      LOG_INFO("[CONSUMER] Found hb status id not zero[%d], client=%s",
-               master_hb_status_.Get(), client_uuid_.c_str());  
-      break;
-    }
-  }
-  check_count = 5;
-  while (master_reg_status_.Get() != 0) {
-    std::this_thread::sleep_for(std::chrono::milliseconds(30));
-    if (--check_count <= 0) {
-      LOG_INFO("[CONSUMER] Found reg status id not zero[%d], client=%s",
-               master_reg_status_.Get(), client_uuid_.c_str());  
-      break;
+  int check_count = 0;
+  while (master_hb_status_.Get() != 0 || master_reg_status_.Get() != 0) {
+    std::this_thread::sleep_for(std::chrono::milliseconds(40));
+    if (++check_count % 10 == 0) {
+      if (check_count >= 1000) {
+          LOG_INFO("[CONSUMER] Found hb[%d] or reg[%d] not zero, count=%d, exit, client=%s",
+                   master_hb_status_.Get(), master_reg_status_.Get(), check_count, client_uuid_.c_str());
+        break;
+      } else {
+        LOG_INFO("[CONSUMER] Found hb[%d] or reg[%d] not zero, count=%d, continue, client=%s",
+                 master_hb_status_.Get(), master_reg_status_.Get(), check_count, client_uuid_.c_str());
+      }
     }
   }
   // 5. join hb thread;
-  heart_beat_timer_ = nullptr;
   rebalance_thread_ptr_->join();
+  heart_beat_timer_ = nullptr;
   rebalance_thread_ptr_ = nullptr;
   // 6. remove client stub
   TubeMQService::Instance()->RmvClientObj(shared_from_this());
@@ -539,21 +535,26 @@ void BaseConsumer::heartBeat2Master() {
   req_protocol->request_id_ = request->request_id_;
   req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
   // send message to target
-  auto self = shared_from_this();
   AsyncRequest(request, req_protocol)
-      .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
+      .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+        if (GetClientIndex() == tb_config::kInvalidValue ||
+          !TubeMQService::Instance()->IsRunning() ||
+          !isClientRunning()) {
+          master_hb_status_.CompareAndSet(1, 0);
+          return;
+        }
         if (error.Value() != err_code::kErrSuccess) {
           master_sh_retry_cnt_++;
           LOG_WARN("[CONSUMER] heartBeat2Master failue to (%s:%d) : %s, client=%s",
                    target_ip.c_str(), target_port, error.Message().c_str(), client_uuid_.c_str());
           if (master_sh_retry_cnt_ >= tb_config::kMaxMasterHBRetryCount) {
-              LOG_WARN("[CONSUMER] heartBeat2Master found over max-hb-retry(%d), client=%s",
-                       master_sh_retry_cnt_, client_uuid_.c_str());
-              master_sh_retry_cnt_ = 0;
-              is_master_actived_.Set(false);
-              asyncRegister2Master(true);
-              master_hb_status_.CompareAndSet(1, 0);
-              return;
+            LOG_WARN("[CONSUMER] heartBeat2Master found over max-hb-retry(%d), client=%s",
+                     master_sh_retry_cnt_, client_uuid_.c_str());
+            master_sh_retry_cnt_ = 0;
+            is_master_actived_.Set(false);
+            asyncRegister2Master(true);
+            master_hb_status_.CompareAndSet(1, 0);
+            return;
           }
         } else {
           // process response
@@ -578,7 +579,14 @@ void BaseConsumer::heartBeat2Master() {
             }
           }
         }
+        if (GetClientIndex() == tb_config::kInvalidValue ||
+          !TubeMQService::Instance()->IsRunning() ||
+          !isClientRunning()) {
+          master_hb_status_.CompareAndSet(1, 0);
+          return;
+        }
         heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms()));
+        auto self = shared_from_this();
         heart_beat_timer_->async_wait([self, this](const std::error_code& ec) {
           if (ec) {
             return;
@@ -786,9 +794,13 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
   req_protocol->request_id_ = request->request_id_;
   req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
   // send message to target
-  auto self = shared_from_this();
   AsyncRequest(request, req_protocol)
-      .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
+      .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+        if (GetClientIndex() == tb_config::kInvalidValue ||
+          !TubeMQService::Instance()->IsRunning() ||
+          !isClientRunning()) {
+          return;
+        }
         if (error.Value() != err_code::kErrSuccess) {
           LOG_WARN("[Heartbeat2Broker] request network  to failure (%s), ression is %s",
                    broker_info.GetAddrInfo().c_str(), error.Message().c_str());