You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/28 14:20:41 UTC
[inlong] 01/02: [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed(addendum) (#5721)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 303a2179543c4570794621097a338d0f1e4271db
Author: Goson Zhang <46...@qq.com>
AuthorDate: Sat Aug 27 20:56:47 2022 +0800
[INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed(addendum) (#5721)
---
.../tubemq-client-cpp/src/baseconsumer.cc | 68 +++++++++++++---------
1 file changed, 40 insertions(+), 28 deletions(-)
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index c7152a16e..5fbff8ddc 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -127,27 +127,23 @@ void BaseConsumer::ShutDown() {
// 3. close all brokers
closeAllBrokers();
// 4. check master hb thread status
- int check_count = 5;
- while (master_hb_status_.Get() != 0) {
- std::this_thread::sleep_for(std::chrono::milliseconds(30));
- if (--check_count <= 0) {
- LOG_INFO("[CONSUMER] Found hb status id not zero[%d], client=%s",
- master_hb_status_.Get(), client_uuid_.c_str());
- break;
- }
- }
- check_count = 5;
- while (master_reg_status_.Get() != 0) {
- std::this_thread::sleep_for(std::chrono::milliseconds(30));
- if (--check_count <= 0) {
- LOG_INFO("[CONSUMER] Found reg status id not zero[%d], client=%s",
- master_reg_status_.Get(), client_uuid_.c_str());
- break;
+ int check_count = 0;
+ while (master_hb_status_.Get() != 0 || master_reg_status_.Get() != 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(40));
+ if (++check_count % 10 == 0) {
+ if (check_count >= 1000) {
+ LOG_INFO("[CONSUMER] Found hb[%d] or reg[%d] not zero, count=%d, exit, client=%s",
+ master_hb_status_.Get(), master_reg_status_.Get(), check_count, client_uuid_.c_str());
+ break;
+ } else {
+ LOG_INFO("[CONSUMER] Found hb[%d] or reg[%d] not zero, count=%d, continue, client=%s",
+ master_hb_status_.Get(), master_reg_status_.Get(), check_count, client_uuid_.c_str());
+ }
}
}
// 5. join hb thread;
- heart_beat_timer_ = nullptr;
rebalance_thread_ptr_->join();
+ heart_beat_timer_ = nullptr;
rebalance_thread_ptr_ = nullptr;
// 6. remove client stub
TubeMQService::Instance()->RmvClientObj(shared_from_this());
@@ -539,21 +535,26 @@ void BaseConsumer::heartBeat2Master() {
req_protocol->request_id_ = request->request_id_;
req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
// send message to target
- auto self = shared_from_this();
AsyncRequest(request, req_protocol)
- .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
+ .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+ if (GetClientIndex() == tb_config::kInvalidValue ||
+ !TubeMQService::Instance()->IsRunning() ||
+ !isClientRunning()) {
+ master_hb_status_.CompareAndSet(1, 0);
+ return;
+ }
if (error.Value() != err_code::kErrSuccess) {
master_sh_retry_cnt_++;
LOG_WARN("[CONSUMER] heartBeat2Master failue to (%s:%d) : %s, client=%s",
target_ip.c_str(), target_port, error.Message().c_str(), client_uuid_.c_str());
if (master_sh_retry_cnt_ >= tb_config::kMaxMasterHBRetryCount) {
- LOG_WARN("[CONSUMER] heartBeat2Master found over max-hb-retry(%d), client=%s",
- master_sh_retry_cnt_, client_uuid_.c_str());
- master_sh_retry_cnt_ = 0;
- is_master_actived_.Set(false);
- asyncRegister2Master(true);
- master_hb_status_.CompareAndSet(1, 0);
- return;
+ LOG_WARN("[CONSUMER] heartBeat2Master found over max-hb-retry(%d), client=%s",
+ master_sh_retry_cnt_, client_uuid_.c_str());
+ master_sh_retry_cnt_ = 0;
+ is_master_actived_.Set(false);
+ asyncRegister2Master(true);
+ master_hb_status_.CompareAndSet(1, 0);
+ return;
}
} else {
// process response
@@ -578,7 +579,14 @@ void BaseConsumer::heartBeat2Master() {
}
}
}
+ if (GetClientIndex() == tb_config::kInvalidValue ||
+ !TubeMQService::Instance()->IsRunning() ||
+ !isClientRunning()) {
+ master_hb_status_.CompareAndSet(1, 0);
+ return;
+ }
heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms()));
+ auto self = shared_from_this();
heart_beat_timer_->async_wait([self, this](const std::error_code& ec) {
if (ec) {
return;
@@ -786,9 +794,13 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
req_protocol->request_id_ = request->request_id_;
req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
// send message to target
- auto self = shared_from_this();
AsyncRequest(request, req_protocol)
- .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
+ .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+ if (GetClientIndex() == tb_config::kInvalidValue ||
+ !TubeMQService::Instance()->IsRunning() ||
+ !isClientRunning()) {
+ return;
+ }
if (error.Value() != err_code::kErrSuccess) {
LOG_WARN("[Heartbeat2Broker] request network to failure (%s), ression is %s",
broker_info.GetAddrInfo().c_str(), error.Message().c_str());