You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@inlong.apache.org by do...@apache.org on 2022/08/26 05:03:29 UTC
[inlong] 08/08: [INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed (#5707)
This is an automated email from the ASF dual-hosted git repository.
dockerzhang pushed a commit to branch release-1.3.0
in repository https://gitbox.apache.org/repos/asf/inlong.git
commit 555d2ddbeeb04759ccbf83e83de2beb66ab12bc6
Author: Goson Zhang <46...@qq.com>
AuthorDate: Fri Aug 26 12:52:52 2022 +0800
[INLONG-5700][TubeMQ] Core file generated while the C++ consumer is closed (#5707)
Co-authored-by: Charley <ch...@apache.org>
---
.../tubemq-client-cpp/src/baseconsumer.cc | 40 ++++++++++++++++++----
1 file changed, 34 insertions(+), 6 deletions(-)
diff --git a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index 25001de2b..c7152a16e 100644
--- a/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/inlong-tubemq/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -126,13 +126,32 @@ void BaseConsumer::ShutDown() {
close2Master();
// 3. close all brokers
closeAllBrokers();
- // 4. remove client stub
- TubeMQService::Instance()->RmvClientObj(shared_from_this());
- client_index_ = tb_config::kInvalidValue;
+ // 4. check master hb thread status
+ int check_count = 5;
+ while (master_hb_status_.Get() != 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(30));
+ if (--check_count <= 0) {
+ LOG_INFO("[CONSUMER] Found hb status id not zero[%d], client=%s",
+ master_hb_status_.Get(), client_uuid_.c_str());
+ break;
+ }
+ }
+ check_count = 5;
+ while (master_reg_status_.Get() != 0) {
+ std::this_thread::sleep_for(std::chrono::milliseconds(30));
+ if (--check_count <= 0) {
+ LOG_INFO("[CONSUMER] Found reg status id not zero[%d], client=%s",
+ master_reg_status_.Get(), client_uuid_.c_str());
+ break;
+ }
+ }
// 5. join hb thread;
heart_beat_timer_ = nullptr;
rebalance_thread_ptr_->join();
rebalance_thread_ptr_ = nullptr;
+ // 6. remove client stub
+ TubeMQService::Instance()->RmvClientObj(shared_from_this());
+ client_index_ = tb_config::kInvalidValue;
LOG_INFO("[CONSUMER] ShutDown consumer finished, client=%s", client_uuid_.c_str());
}
@@ -520,8 +539,9 @@ void BaseConsumer::heartBeat2Master() {
req_protocol->request_id_ = request->request_id_;
req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
// send message to target
+ auto self = shared_from_this();
AsyncRequest(request, req_protocol)
- .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+ .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
if (error.Value() != err_code::kErrSuccess) {
master_sh_retry_cnt_++;
LOG_WARN("[CONSUMER] heartBeat2Master failue to (%s:%d) : %s, client=%s",
@@ -559,7 +579,6 @@ void BaseConsumer::heartBeat2Master() {
}
}
heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms()));
- auto self = shared_from_this();
heart_beat_timer_->async_wait([self, this](const std::error_code& ec) {
if (ec) {
return;
@@ -665,6 +684,10 @@ void BaseConsumer::processConnect2Broker(ConsumerEvent& event) {
rmtdata_cache_.FilterPartitions(subscribe_info, subscribed_partitions, unsub_partitions);
if (!unsub_partitions.empty()) {
for (it = unsub_partitions.begin(); it != unsub_partitions.end(); it++) {
+ if (!isClientRunning()) {
+ LOG_TRACE("[processConnect2Broker] client stopped, break pos1, clientid=%s", client_uuid_.c_str());
+ break;
+ }
LOG_TRACE("[processConnect2Broker] connect to %s, clientid=%s",
it->GetPartitionKey().c_str(), client_uuid_.c_str());
auto request = std::make_shared<RequestContext>();
@@ -681,6 +704,10 @@ void BaseConsumer::processConnect2Broker(ConsumerEvent& event) {
// send message to target
ResponseContext response_context;
ErrorCode error = SyncRequest(response_context, request, req_protocol);
+ if (!isClientRunning()) {
+ LOG_TRACE("[processConnect2Broker] client stopped, break pos2, clientid=%s", client_uuid_.c_str());
+ break;
+ }
if (error.Value() != err_code::kErrSuccess) {
LOG_WARN("[Connect2Broker] request network failure to (%s:%d) : %s",
it->GetBrokerHost().c_str(), it->GetBrokerPort(), error.Message().c_str());
@@ -759,8 +786,9 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
req_protocol->request_id_ = request->request_id_;
req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
// send message to target
+ auto self = shared_from_this();
AsyncRequest(request, req_protocol)
- .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+ .AddCallBack([&](ErrorCode error, const ResponseContext& response_context) {
if (error.Value() != err_code::kErrSuccess) {
LOG_WARN("[Heartbeat2Broker] request network to failure (%s), ression is %s",
broker_info.GetAddrInfo().c_str(), error.Message().c_str());