You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by go...@apache.org on 2020/09/14 12:25:39 UTC
[incubator-tubemq] branch tubemq-client-cpp updated:
[TUBEMQ-348]C++SDK Client handler detail (#260)
This is an automated email from the ASF dual-hosted git repository.
gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
new dde7066 [TUBEMQ-348]C++SDK Client handler detail (#260)
dde7066 is described below
commit dde70666d956807dc0fe60993cee86a6c49010e0
Author: charlely <41...@users.noreply.github.com>
AuthorDate: Mon Sep 14 20:25:33 2020 +0800
[TUBEMQ-348]C++SDK Client handler detail (#260)
Co-authored-by: charleli <ch...@tencent.com>
---
.../tubemq-client-cpp/src/baseconsumer.cc | 1619 ++++++++++++++++++++
.../tubemq-client-cpp/src/baseconsumer.h | 146 ++
.../tubemq-client-cpp/src/client_service.cc | 196 ++-
.../tubemq-client-cpp/src/client_service.h | 296 ++++
4 files changed, 2224 insertions(+), 33 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
new file mode 100644
index 0000000..215892d
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -0,0 +1,1619 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "baseconsumer.h"
+
+#include <signal.h>
+#include <unistd.h>
+
+#include <sstream>
+
+#include "client_service.h"
+#include "const_config.h"
+#include "const_rpc.h"
+#include "logger.h"
+#include "singleton.h"
+#include "transport.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+#include "tubemq_transport.h"
+#include "utils.h"
+#include "version.h"
+
+namespace tubemq {
+
+using std::lock_guard;
+using std::stringstream;
+
+BaseConsumer::BaseConsumer() : BaseClient(false) {
+ status_.Set(0);
+ unreport_times_ = 0;
+ visit_token_.Set(tb_config::kInvalidValue);
+ nextauth_2_master.Set(false);
+ nextauth_2_broker.Set(false);
+ masters_map_.clear();
+ is_master_actived_.Set(false);
+ master_reg_status_.Set(0);
+ master_hb_status_.Set(0);
+ last_master_hbtime_ = 0;
+ master_sh_retry_cnt_ = 0;
+}
+
+BaseConsumer::~BaseConsumer() {
+ //
+}
+
+bool BaseConsumer::Start(string& err_info, const ConsumerConfig& config) {
+ ConsumerConfig tmp_config;
+ if (!status_.CompareAndSet(0, 1)) {
+ err_info = "Ok";
+ return true;
+ }
+ // check configure
+ if (config.GetGroupName().length() == 0 || config.GetMasterAddrInfo().length() == 0) {
+ err_info = "Parameter error: not set master address info or group name!";
+ return false;
+ }
+ //
+ if (!TubeMQService::Instance()->IsRunning()) {
+ err_info = "TubeMQ Service not startted!";
+ return false;
+ }
+ if (!TubeMQService::Instance()->AddClientObj(err_info, this)) {
+ client_index_ = tb_config::kInvalidValue;
+ status_.CompareAndSet(1, 0);
+ return false;
+ }
+ config_ = config;
+ if (!initMasterAddress(err_info, config.GetMasterAddrInfo())) {
+ return false;
+ }
+ client_uuid_ = buildUUID();
+ sub_info_.SetConsumeTarget(config_);
+ rmtdata_cache_.SetConsumerInfo(client_uuid_, config_.GetGroupName());
+ // initial resource
+
+ // register to master
+ int32_t error_code;
+ if (!register2Master(error_code, err_info, false)) {
+ status_.CompareAndSet(1, 0);
+ return false;
+ }
+ status_.CompareAndSet(1, 2);
+ heart_beat_timer_ = TubeMQService::Instance()->CreateTimer();
+ heart_beat_timer_->expires_after(std::chrono::milliseconds(config_.GetHeartbeatPeriodMs() / 2));
+ heart_beat_timer_->async_wait([this](const std::error_code& ec) {
+ if (ec) {
+ return;
+ }
+ heartBeat2Master();
+ });
+ rebalance_thread_ptr_ = std::make_shared<std::thread>([this]() { processRebalanceEvent(); });
+ LOG_INFO("[CONSUMER] start consumer success, client=%s", client_uuid_.c_str());
+ err_info = "Ok";
+ return true;
+}
+
+void BaseConsumer::ShutDown() {
+ if (!status_.CompareAndSet(2, 0)) {
+ return;
+ }
+ LOG_INFO("[CONSUMER] ShutDown consumer begin, client=%s", client_uuid_.c_str());
+ // 1. exist rebalance thread
+ ConsumerEvent empty_event;
+ rmtdata_cache_.OfferEvent(empty_event);
+ // 2. close to master
+ close2Master();
+ // 3. close all brokers
+ closeAllBrokers();
+ // 4. remove client stub
+ TubeMQService::Instance()->RmvClientObj(this);
+ client_index_ = tb_config::kInvalidValue;
+ // 5. join hb thread;
+ heart_beat_timer_ = nullptr;
+ rebalance_thread_ptr_->join();
+ rebalance_thread_ptr_ = nullptr;
+ LOG_INFO("[CONSUMER] ShutDown consumer finished, client=%s", client_uuid_.c_str());
+}
+
+bool BaseConsumer::GetMessage(ConsumerResult& result) {
+ int32_t error_code;
+ string err_info;
+ PartitionExt partition_ext;
+ string confirm_context;
+
+ if (!IsConsumeReady(result)) {
+ return false;
+ }
+ if (!rmtdata_cache_.SelectPartition(error_code,
+ err_info, partition_ext, confirm_context)) {
+ result.SetFailureResult(error_code, err_info);
+ return false;
+ }
+ long curr_offset = tb_config::kInvalidValue;
+ bool filter_consume = sub_info_.IsFilterConsume(partition_ext.GetTopic());
+ PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
+ partition_ext.GetPartitionKey(), curr_offset);
+ auto request = std::make_shared<RequestContext>();
+ TubeMQCodec::ReqProtocolPtr req_protocol = TubeMQCodec::GetReqProtocol();
+ // build getmessage request
+ buidGetMessageC2B(partition_ext, req_protocol);
+ request->codec_ = std::make_shared<TubeMQCodec>();
+ request->ip_ = partition_ext.GetBrokerHost();
+ request->port_ = partition_ext.GetBrokerPort();
+ request->timeout_ = config_.GetRpcReadTimeoutMs();
+ request->request_id_ = Singleton<UniqueSeqId>::Instance().Next();
+ req_protocol->request_id_ = request->request_id_;
+ req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
+
+ LOG_TRACE("[CONSUMER] GetMessage select partition=%s, client=%s",
+ partition_ext.GetPartitionKey().c_str(), client_uuid_.c_str());
+
+ // send message to target
+ ResponseContext response_context;
+ ErrorCode error = SyncRequest(response_context, request, req_protocol);
+ LOG_TRACE("[CONSUMER] GetMessage received response, ret_code=%d, client=%s",
+ error.Value(), client_uuid_.c_str());
+ if (!TubeMQService::Instance()->IsRunning()) {
+ result.SetFailureResult(err_code::kErrMQServiceStop, "TubeMQ Service stopped!");
+ return false;
+ }
+ if (!isClientRunning()) {
+ result.SetFailureResult(err_code::kErrClientStop, "TubeMQ Client stopped!");
+ return false;
+ }
+ if (error.Value() == err_code::kErrSuccess) {
+ // process response
+ auto rsp = any_cast<TubeMQCodec::RspProtocolPtr>(response_context.rsp_);
+ processGetMessageRspB2C(result, peer_info, filter_consume, partition_ext, confirm_context, rsp);
+ return result.IsSuccess();
+ } else {
+ rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false);
+ result.SetFailureResult(error.Value(), error.Message(), partition_ext.GetTopic(), peer_info);
+ return false;
+ }
+}
+
+bool BaseConsumer::IsConsumeReady(ConsumerResult& result) {
+ int32_t ret_code;
+ int64_t start_time = Utils::GetCurrentTimeMillis();
+ while (true) {
+ if (!TubeMQService::Instance()->IsRunning()) {
+ result.SetFailureResult(err_code::kErrMQServiceStop, "TubeMQ Service stopped!");
+ return false;
+ }
+ if (!isClientRunning()) {
+ result.SetFailureResult(err_code::kErrClientStop, "TubeMQ Client stopped!");
+ return false;
+ }
+ ret_code = rmtdata_cache_.GetCurConsumeStatus();
+ if (err_code::kErrSuccess == ret_code) {
+ return true;
+ }
+ if ((config_.GetMaxPartCheckPeriodMs() > 0)
+ && (Utils::GetCurrentTimeMillis() - start_time
+ > config_.GetMaxPartCheckPeriodMs())) {
+ switch (ret_code) {
+ case err_code::kErrNoPartAssigned: {
+ result.SetFailureResult(ret_code,
+ "No partition info in local cache, please retry later!");
+ }
+ break;
+
+ case err_code::kErrAllPartInUse: {
+ result.SetFailureResult(ret_code,
+ "No idle partition to consume, please retry later!");
+ }
+ break;
+
+ case err_code::kErrAllPartWaiting:
+ default: {
+ result.SetFailureResult(ret_code,
+ "All partitions reach max position, please retry later!");
+ }
+ break;
+ }
+ return false;
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(config_.GetPartCheckSliceMs()));
+ }
+ return true;
+}
+
+bool BaseConsumer::GetCurConsumedInfo(map<string, ConsumeOffsetInfo>& consume_info_map) {
+ bool has_data = false;
+ consume_info_map.clear();
+ map<string, int64_t> part_offset_map;
+ map<string, int64_t>::iterator it_part;
+ rmtdata_cache_.GetCurPartitionOffsets(part_offset_map);
+ for (it_part = part_offset_map.begin(); it_part != part_offset_map.end(); ++it_part) {
+ ConsumeOffsetInfo tmp_info(it_part->first, it_part->second);
+ consume_info_map[it_part->first] = tmp_info;
+ has_data = true;
+ }
+ return has_data;
+}
+
+bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
+ ConsumerResult& result) {
+ if (!TubeMQService::Instance()->IsRunning()) {
+ result.SetFailureResult(err_code::kErrMQServiceStop, "TubeMQ Service stopped!");
+ return false;
+ }
+ if (!isClientRunning()) {
+ result.SetFailureResult(err_code::kErrClientStop, "TubeMQ Client stopped!");
+ return false;
+ }
+
+ LOG_TRACE("[CONSUMER] Confirm begin, confirm_context = %s, is_consumed =%d, client=%s",
+ confirm_context.c_str(), is_consumed, client_uuid_.c_str());
+
+ string token1 = delimiter::kDelimiterAt;
+ string token2 = delimiter::kDelimiterColon;
+ string::size_type pos1, pos2;
+ pos1 = confirm_context.find(token1);
+ if (string::npos == pos1) {
+ result.SetFailureResult(
+ err_code::kErrBadRequest,
+ "Illegel confirm_context content: unregular confirm_context value format!");
+ return false;
+ }
+ string part_key = Utils::Trim(confirm_context.substr(0, pos1));
+ string booked_time_str =
+ Utils::Trim(confirm_context.substr(pos1 + token1.size(), confirm_context.size()));
+ long booked_time = atol(booked_time_str.c_str());
+ pos1 = part_key.find(token2);
+ if (string::npos == pos1) {
+ result.SetFailureResult(err_code::kErrBadRequest,
+ "Illegel confirm_context content: unregular index key value format!");
+ return false;
+ }
+ pos1 = pos1 + token1.size();
+ string topic_name = part_key.substr(pos1);
+ pos2 = topic_name.rfind(token2);
+ if (string::npos == pos2) {
+ result.SetFailureResult(
+ err_code::kErrBadRequest,
+ "Illegel confirm_context content: unregular index's topic key value format!");
+ return false;
+ }
+ topic_name = topic_name.substr(0, pos2);
+ if (!rmtdata_cache_.IsPartitionInUse(part_key, booked_time)) {
+ result.SetFailureResult(err_code::kErrConfirmTimeout, "The confirm_context's value invalid!");
+ return false;
+ }
+ PartitionExt partition_ext;
+ bool ret_result = rmtdata_cache_.GetPartitionExt(part_key, partition_ext);
+ if (!ret_result) {
+ result.SetFailureResult(err_code::kErrConfirmTimeout,
+ "Not found the partition by confirm_context!");
+ return false;
+ }
+ long curr_offset = tb_config::kInvalidValue;
+ PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
+ partition_ext.GetPartitionKey(), curr_offset);
+ auto request = std::make_shared<RequestContext>();
+ TubeMQCodec::ReqProtocolPtr req_protocol = TubeMQCodec::GetReqProtocol();
+ // build CommitC2B request
+ buidCommitC2B(partition_ext, is_consumed, req_protocol);
+ request->codec_ = std::make_shared<TubeMQCodec>();
+ request->ip_ = partition_ext.GetBrokerHost();
+ request->port_ = partition_ext.GetBrokerPort();
+ request->timeout_ = config_.GetRpcReadTimeoutMs();
+ request->request_id_ = Singleton<UniqueSeqId>::Instance().Next();
+ req_protocol->request_id_ = request->request_id_;
+ req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
+
+ LOG_TRACE("[CONSUMER] Confirm to %s, client=%s",
+ partition_ext.GetPartitionKey().c_str(), client_uuid_.c_str());
+
+ // send message to target
+ ResponseContext response_context;
+ ErrorCode error = SyncRequest(response_context, request, req_protocol);
+ if (!TubeMQService::Instance()->IsRunning()) {
+ result.SetFailureResult(err_code::kErrMQServiceStop, "TubeMQ Service stopped!");
+ return false;
+ }
+ if (!isClientRunning()) {
+ result.SetFailureResult(err_code::kErrClientStop, "TubeMQ Client stopped!");
+ return false;
+ }
+
+ LOG_TRACE("[CONSUMER] Confirm response result=%d, client=%s",
+ error.Value(), client_uuid_.c_str());
+
+ if (error.Value() == err_code::kErrSuccess) {
+ // process response
+ auto rsp = any_cast<TubeMQCodec::RspProtocolPtr>(response_context.rsp_);
+ if (rsp->success_) {
+ CommitOffsetResponseB2C rsp_b2c;
+ ret_result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
+ (int)(rsp->rsp_body_.data().length()));
+ if (ret_result) {
+ if (rsp_b2c.success()) {
+ curr_offset = rsp_b2c.curroffset();
+ peer_info.SetCurrOffset(curr_offset);
+ result.SetSuccessResult(err_code::kErrSuccess, topic_name, peer_info);
+ } else {
+ result.SetFailureResult(rsp_b2c.errcode(), rsp_b2c.errmsg(), topic_name, peer_info);
+ }
+ } else {
+ result.SetFailureResult(err_code::kErrParseFailure,
+ "Parse CommitOffsetResponseB2C response failure!", topic_name,
+ peer_info);
+ }
+ } else {
+ result.SetFailureResult(rsp->code_, rsp->error_msg_, topic_name, peer_info);
+ }
+ } else {
+ result.SetFailureResult(error.Value(), error.Message(), topic_name, peer_info);
+ }
+ string err_info;
+ rmtdata_cache_.BookedPartionInfo(part_key, curr_offset);
+ rmtdata_cache_.RelPartition(err_info, sub_info_.IsFilterConsume(topic_name), confirm_context,
+ is_consumed);
+ LOG_TRACE("[CONSUMER] Confirm response finished, result=%d, client=%s",
+ result.IsSuccess(), client_uuid_.c_str());
+ return result.IsSuccess();
+}
+
+bool BaseConsumer::register2Master(int32_t& error_code, string& err_info, bool need_change) {
+ string target_ip;
+ int target_port;
+
+ // set regist process status to begin
+ if (!master_reg_status_.CompareAndSet(0, 1)) {
+ err_info = "register2Master process has began!";
+ return false;
+ }
+
+ LOG_INFO("register2Master process begin: ");
+ // check client status
+ if (status_.Get() == 0) {
+ master_reg_status_.CompareAndSet(1, 0);
+ err_info = "Consumer not startted!";
+ return false;
+ }
+ LOG_DEBUG("[CONSUMER], initial register2master request, clientId=%s", client_uuid_.c_str());
+ // get master address and port
+ if (need_change) {
+ getNextMasterAddr(target_ip, target_port);
+ } else {
+ getCurrentMasterAddr(target_ip, target_port);
+ }
+ bool result = false;
+ int retry_count = 0;
+ int maxRetrycount = masters_map_.size();
+ err_info = "Master register failure, no online master service!";
+ while (retry_count < maxRetrycount) {
+ if (!TubeMQService::Instance()->IsRunning()) {
+ err_info = "TubeMQ Service is stopped!";
+ master_reg_status_.CompareAndSet(1, 0);
+ return false;
+ }
+ auto request = std::make_shared<RequestContext>();
+ TubeMQCodec::ReqProtocolPtr req_protocol = TubeMQCodec::GetReqProtocol();
+ // build register request
+ buidRegisterRequestC2M(req_protocol);
+ // set parameters
+ request->codec_ = std::make_shared<TubeMQCodec>();
+ request->ip_ = target_ip;
+ request->port_ = target_port;
+ request->timeout_ = config_.GetRpcReadTimeoutMs();
+ request->request_id_ = Singleton<UniqueSeqId>::Instance().Next();
+ req_protocol->request_id_ = request->request_id_;
+ req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
+ // send message to target
+ ResponseContext response_context;
+ ErrorCode error = SyncRequest(response_context, request, req_protocol);
+ LOG_INFO("register2Master response come, error.value is %d", error.Value());
+ if (error.Value() == err_code::kErrSuccess) {
+ // process response
+ auto rsp = any_cast<TubeMQCodec::RspProtocolPtr>(response_context.rsp_);
+ result = processRegisterResponseM2C(error_code, err_info, rsp);
+ if (result) {
+ err_info = "Ok";
+ is_master_actived_.Set(true);
+ last_master_hbtime_ = Utils::GetCurrentTimeMillis();
+ break;
+ } else {
+ is_master_actived_.Set(false);
+ }
+ } else {
+ error_code = error.Value();
+ err_info = error.Message();
+ }
+ if (error_code == err_code::kErrConsumeGroupForbidden
+ || error_code == err_code::kErrConsumeContentForbidden) {
+ // set regist process status to existed
+ master_reg_status_.CompareAndSet(1, 0);
+ LOG_WARN("[CONSUMER] register2master(%s:%d) failure exist register, client=%s,reason:%s",
+ target_ip.c_str(), target_port, client_uuid_.c_str(), err_info.c_str());
+ return false;
+ } else {
+ LOG_WARN(
+ "[CONSUMER] register2master(%s:%d) failure, client=%s, retrycount=(%d-%d), reason:%s",
+ target_ip.c_str(), target_port, client_uuid_.c_str(), maxRetrycount, retry_count + 1,
+ err_info.c_str());
+ }
+ retry_count++;
+ getNextMasterAddr(target_ip, target_port);
+ }
+ // set regist process status to existed
+ master_reg_status_.CompareAndSet(1, 0);
+ LOG_INFO("[CONSUMER] register2Master finished, client=%s, result:%d, err_info:%s",
+ client_uuid_.c_str(), result, err_info.c_str());
+ return result;
+}
+
+void BaseConsumer::asyncRegister2Master(bool need_change) {
+ TubeMQService::Instance()->Post([this, need_change]() {
+ int32_t error_code;
+ string error_info;
+ if (!is_master_actived_.Get()) {
+ auto ret_result = register2Master(error_code, error_info, need_change);
+ LOG_INFO("[CONSUMER] asyncRegister2Master ret_result:%d, master_sh_retry_cnt_:%d", ret_result,
+ master_sh_retry_cnt_);
+ if (ret_result) {
+ is_master_actived_.Set(true);
+ master_sh_retry_cnt_ = 0;
+ } else {
+ master_sh_retry_cnt_++;
+ }
+ }
+ heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms()));
+ heart_beat_timer_->async_wait([this](const std::error_code& ec) {
+ if (ec) {
+ return;
+ }
+ heartBeat2Master();
+ });
+ });
+}
+
+void BaseConsumer::heartBeat2Master() {
+ // timer task
+ // 1. check if need re-register, if true, first call register
+ // 2. call heartbeat to master
+ // 3. process response
+ // 4. call timer again
+ string target_ip;
+ int target_port;
+
+ // set heartbeat process status to begin
+ if (!master_hb_status_.CompareAndSet(0, 1)) {
+ LOG_INFO("check hb process status, heartBeat2Master process has began!");
+ return;
+ }
+
+ LOG_TRACE("heartBeat2Master process begin:");
+
+ if (!TubeMQService::Instance()->IsRunning()) {
+ master_hb_status_.CompareAndSet(1, 0);
+ LOG_INFO("[CONSUMER] heartBeat2Master failure: TubeMQ Service is stopped! client=%s",
+ client_uuid_.c_str());
+ return;
+ }
+
+ if (!isClientRunning()) {
+ master_hb_status_.CompareAndSet(1, 0);
+ LOG_INFO("[CONSUMER] heartBeat2Master failure: TubeMQ Client stopped! client=%s",
+ client_uuid_.c_str());
+ return;
+ }
+
+ // check status in master
+ // if not actived first register, or send heartbeat
+ if (!is_master_actived_.Get()) {
+ LOG_INFO("[CONSUMER] heartBeat2Master found master not active, re-register first! client=%s",
+ client_uuid_.c_str());
+ asyncRegister2Master(false);
+ master_hb_status_.CompareAndSet(1, 0);
+ return;
+ }
+ // check partition status
+ if (Utils::GetCurrentTimeMillis() - last_master_hbtime_ > 30000) {
+ rmtdata_cache_.handleExpiredPartitions(
+ config_.GetMaxConfirmWaitPeriodMs());
+ }
+ // select current master
+ getCurrentMasterAddr(target_ip, target_port);
+ auto request = std::make_shared<RequestContext>();
+ TubeMQCodec::ReqProtocolPtr req_protocol = TubeMQCodec::GetReqProtocol();
+ // build heartbeat 2 master request
+ buidHeartRequestC2M(req_protocol);
+ request->codec_ = std::make_shared<TubeMQCodec>();
+ request->ip_ = target_ip;
+ request->port_ = target_port;
+ request->timeout_ = config_.GetRpcReadTimeoutMs();
+ request->request_id_ = Singleton<UniqueSeqId>::Instance().Next();
+ req_protocol->request_id_ = request->request_id_;
+ req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
+ // send message to target
+ AsyncRequest(request, req_protocol)
+ .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+ if (error.Value() != err_code::kErrSuccess) {
+ master_sh_retry_cnt_++;
+ LOG_WARN("[CONSUMER] heartBeat2Master failue to (%s:%d) : %s, client=%s",
+ target_ip.c_str(), target_port, error.Message().c_str(), client_uuid_.c_str());
+ } else {
+ // process response
+ auto rsp = any_cast<TubeMQCodec::RspProtocolPtr>(response_context.rsp_);
+ int32_t error_code = 0;
+ std::string error_info;
+ auto ret_result = processHBResponseM2C(error_code, error_info, rsp);
+ LOG_TRACE("[CONSUMER] processHBResponseM2C return result = %d! client=%s",
+ ret_result, client_uuid_.c_str());
+ if (ret_result) {
+ is_master_actived_.Set(true);
+ master_sh_retry_cnt_ = 0;
+ } else {
+ master_sh_retry_cnt_++;
+ if (error_code == err_code::kErrHbNoNode ||
+ error_info.find("StandbyException") != string::npos) {
+ is_master_actived_.Set(false);
+ LOG_WARN("[CONSUMER] hb2master found no-node or standby, re-register, client=%s",
+ client_uuid_.c_str());
+ asyncRegister2Master(!(error_code == err_code::kErrHbNoNode));
+ master_hb_status_.CompareAndSet(1, 0);
+ return;
+ }
+ }
+ }
+ heart_beat_timer_->expires_after(std::chrono::milliseconds(nextHeartBeatPeriodms()));
+ heart_beat_timer_->async_wait([this](const std::error_code& ec) {
+ if (ec) {
+ return;
+ }
+ heartBeat2Master();
+ });
+ master_hb_status_.CompareAndSet(1, 0);
+ LOG_TRACE("[CONSUMER] processHBResponseM2C process result finished, client=%s",
+ client_uuid_.c_str());
+ });
+ return;
+}
+
+int32_t BaseConsumer::nextHeartBeatPeriodms() {
+ int32_t next_hb_periodms = config_.GetHeartbeatPeriodMs();
+ if (master_sh_retry_cnt_ >= config_.GetMaxHeartBeatRetryTimes()) {
+ next_hb_periodms = config_.GetHeartbeatPeriodAftFailMs();
+ }
+ return next_hb_periodms;
+}
+
+void BaseConsumer::processRebalanceEvent() {
+ // thread wait until event come
+ LOG_INFO("[CONSUMER] rebalance event Handler startted!");
+ while (true) {
+ if (!TubeMQService::Instance()->IsRunning()) {
+ LOG_INFO("[CONSUMER] Rebalance found Service stopped, existed, client=%s",
+ client_uuid_.c_str());
+ break;
+ }
+ if (!isClientRunning()) {
+ LOG_INFO("[CONSUMER] Rebalance found Client stopped, existed, client=%s",
+ client_uuid_.c_str());
+ break;
+ }
+ ConsumerEvent event;
+ rmtdata_cache_.TakeEvent(event);
+ if (event.GetEventStatus() == tb_config::kInvalidValue &&
+ event.GetRebalanceId() == tb_config::kInvalidValue) {
+ LOG_INFO("[CONSUMER] Rebalance found Shutdown notify, existed, client=%s",
+ client_uuid_.c_str());
+ break;
+ }
+ rmtdata_cache_.ClearEvent();
+ switch (event.GetEventType()) {
+ case 2:
+ case 20: {
+ processDisConnect2Broker(event);
+ rmtdata_cache_.OfferEventResult(event);
+ } break;
+
+ case 1:
+ case 10: {
+ processConnect2Broker(event);
+ rmtdata_cache_.OfferEventResult(event);
+ } break;
+
+ default: {
+ //
+ } break;
+ }
+ }
+
+ LOG_INFO("[CONSUMER] rebalance event Handler stopped!");
+ return;
+}
+
+void BaseConsumer::close2Master() {
+ string target_ip;
+ int target_port;
+
+ LOG_INFO("[CONSUMER] close2Master begin, clientid=%s", client_uuid_.c_str());
+ getCurrentMasterAddr(target_ip, target_port);
+ auto request = std::make_shared<RequestContext>();
+ TubeMQCodec::ReqProtocolPtr req_protocol = TubeMQCodec::GetReqProtocol();
+ // build close2master request
+ buidCloseRequestC2M(req_protocol);
+ request->codec_ = std::make_shared<TubeMQCodec>();
+ request->ip_ = target_ip;
+ request->port_ = target_port;
+ request->timeout_ = config_.GetRpcReadTimeoutMs();
+ request->request_id_ = Singleton<UniqueSeqId>::Instance().Next();
+ req_protocol->request_id_ = request->request_id_;
+ req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
+ // send message to target
+ AsyncRequest(request, req_protocol);
+ LOG_INFO("[CONSUMER] close2Master finished, clientid=%s", client_uuid_.c_str());
+ // not need wait response
+ return;
+}
+
+void BaseConsumer::processConnect2Broker(ConsumerEvent& event) {
+ LOG_TRACE("[processConnect2Broker] begin to process connect event, clientid=%s",
+ client_uuid_.c_str());
+ if (!isClientRunning()) {
+ return;
+ }
+ bool ret_result;
+ int32_t error_code;
+ string error_info;
+ list<PartitionExt> subscribed_partitions;
+ list<PartitionExt> unsub_partitions;
+ list<PartitionExt>::iterator it;
+ list<SubscribeInfo> subscribe_info = event.GetSubscribeInfoList();
+ if (!subscribe_info.empty()) {
+ rmtdata_cache_.FilterPartitions(subscribe_info, subscribed_partitions, unsub_partitions);
+ if (!unsub_partitions.empty()) {
+ for (it = unsub_partitions.begin(); it != unsub_partitions.end(); it++) {
+ LOG_TRACE("[processConnect2Broker] connect to %s, clientid=%s",
+ it->GetPartitionKey().c_str(), client_uuid_.c_str());
+ auto request = std::make_shared<RequestContext>();
+ TubeMQCodec::ReqProtocolPtr req_protocol = TubeMQCodec::GetReqProtocol();
+ // build close2master request
+ buidRegisterRequestC2B(*it, req_protocol);
+ request->codec_ = std::make_shared<TubeMQCodec>();
+ request->ip_ = it->GetBrokerHost();
+ request->port_ = it->GetBrokerPort();
+ request->timeout_ = config_.GetRpcReadTimeoutMs();
+ request->request_id_ = Singleton<UniqueSeqId>::Instance().Next();
+ req_protocol->request_id_ = request->request_id_;
+ req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
+ // send message to target
+ ResponseContext response_context;
+ ErrorCode error = SyncRequest(response_context, request, req_protocol);
+ if (error.Value() != err_code::kErrSuccess) {
+ LOG_WARN("[Connect2Broker] request network failure to (%s:%d) : %s",
+ it->GetBrokerHost().c_str(), it->GetBrokerPort(), error.Message().c_str());
+ } else {
+ // process response
+ auto rsp = any_cast<TubeMQCodec::RspProtocolPtr>(response_context.rsp_);
+ ret_result = processRegResponseB2C(error_code, error_info, rsp);
+ if (ret_result) {
+ LOG_TRACE("[processConnect2Broker] add broker hb timer for %s, clientid=%s",
+ it->GetBrokerInfo().GetAddrInfo().c_str(), client_uuid_.c_str());
+ rmtdata_cache_.AddNewPartition(*it);
+ addBrokerHBTimer(it->GetBrokerInfo());
+ }
+ }
+ }
+ }
+ }
+ sub_info_.BookFstRegistered();
+ event.SetEventStatus(2);
+ LOG_TRACE("[processConnect2Broker] out connect event process, clientid=%s",
+ client_uuid_.c_str());
+}
+
+void BaseConsumer::processDisConnect2Broker(ConsumerEvent& event) {
+ LOG_TRACE("[processDisConnect2Broker] begin to process disConnect event, clientid=%s",
+ client_uuid_.c_str());
+ if (!isClientRunning()) {
+ return;
+ }
+ list<SubscribeInfo> subscribe_info = event.GetSubscribeInfoList();
+ if (!subscribe_info.empty()) {
+ map<NodeInfo, list<PartitionExt> > rmv_partitions;
+ rmtdata_cache_.RemoveAndGetPartition(subscribe_info,
+ config_.IsRollbackIfConfirmTimeout(), rmv_partitions);
+ if (!rmv_partitions.empty()) {
+ LOG_TRACE("[processDisConnect2Broker] unregister 2 broker process, clientid=%s",
+ client_uuid_.c_str());
+ unregister2Brokers(rmv_partitions, true);
+ }
+ }
+ event.SetEventStatus(2);
+ LOG_TRACE("[processDisConnect2Broker] out disConnect event process, clientid=%s",
+ client_uuid_.c_str());
+ return;
+}
+
+void BaseConsumer::closeAllBrokers() {
+ map<NodeInfo, list<PartitionExt> > broker_parts;
+ LOG_INFO("[CONSUMER] closeAllBrokers begin, clientid=%s", client_uuid_.c_str());
+ rmtdata_cache_.GetAllClosedBrokerParts(broker_parts);
+ if (!broker_parts.empty()) {
+ unregister2Brokers(broker_parts, false);
+ }
+ LOG_INFO("[CONSUMER] closeAllBrokers end, clientid=%s", client_uuid_.c_str());
+}
+
+
+void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
+ LOG_TRACE("[Heartbeat2Broker] process hb to broker(%s) startted!",
+ broker_info.GetAddrInfo().c_str());
+ if (!isClientRunning()) {
+ return;
+ }
+ list<PartitionExt> partition_list;
+ list<PartitionExt>::iterator it;
+ rmtdata_cache_.GetPartitionByBroker(broker_info, partition_list);
+ if (partition_list.empty()) {
+ reSetBrokerHBTimer(broker_info);
+ LOG_TRACE("[Heartbeat2Broker] no alive partitions, for broker(%s), out!",
+ broker_info.GetAddrInfo().c_str());
+ return;
+ }
+
+ set<string> req_part_keys;
+ for (it = partition_list.begin(); it != partition_list.end(); ++it) {
+ req_part_keys.insert(it->GetPartitionKey());
+ }
+ auto request = std::make_shared<RequestContext>();
+ TubeMQCodec::ReqProtocolPtr req_protocol = TubeMQCodec::GetReqProtocol();
+ // build heartbeat2broker request
+ buidHeartBeatC2B(partition_list, req_protocol);
+ request->codec_ = std::make_shared<TubeMQCodec>();
+ request->ip_ = broker_info.GetHost();
+ request->port_ = broker_info.GetPort();
+ request->timeout_ = config_.GetRpcReadTimeoutMs();
+ request->request_id_ = Singleton<UniqueSeqId>::Instance().Next();
+ req_protocol->request_id_ = request->request_id_;
+ req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
+
+ LOG_TRACE("[Heartbeat2Broker] send hb request to (%s)!", broker_info.GetAddrInfo().c_str());
+
+ // send message to target
+ AsyncRequest(request, req_protocol)
+ .AddCallBack([=](ErrorCode error, const ResponseContext& response_context) {
+ if (error.Value() != err_code::kErrSuccess) {
+ LOG_WARN("[Heartbeat2Broker] request network to failure (%s), ression is %s",
+ broker_info.GetAddrInfo().c_str(), error.Message().c_str());
+ } else {
+ // process response
+ auto rsp = any_cast<TubeMQCodec::RspProtocolPtr>(response_context.rsp_);
+ LOG_TRACE("[Heartbeat2Broker] receive hb response!");
+ if (rsp->success_) {
+ HeartBeatResponseB2C rsp_b2c;
+ bool result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
+ (int)(rsp->rsp_body_.data().length()));
+ if (result) {
+ set<string> partition_keys;
+ if (rsp_b2c.success()) {
+ if (rsp_b2c.has_haspartfailure() && rsp_b2c.haspartfailure()) {
+ for (int tmp_i = 0; tmp_i < rsp_b2c.failureinfo_size(); tmp_i++) {
+ string token_key = delimiter::kDelimiterColon;
+ string fullpart_str = rsp_b2c.failureinfo(tmp_i);
+ string::size_type pos1 = fullpart_str.find(token_key);
+ if (pos1 == string::npos) {
+ continue;
+ }
+ // int error_code = atoi(fullpart_str.substr(0, pos1).c_str());
+ string part_str =
+ fullpart_str.substr(pos1 + token_key.size(), fullpart_str.size());
+ Partition part(part_str);
+ partition_keys.insert(part.GetPartitionKey());
+ LOG_TRACE("[Heartbeat2Broker] found partiton(%s) hb failure!",
+ part.GetPartitionKey().c_str());
+ }
+ }
+ rmtdata_cache_.RemovePartition(partition_keys);
+ } else {
+ if (rsp_b2c.errcode() == err_code::kErrCertificateFailure) {
+ rmtdata_cache_.RemovePartition(req_part_keys);
+ LOG_WARN("[Heartbeat2Broker] request (%s) CertificateFailure",
+ broker_info.GetAddrInfo().c_str());
+ }
+ }
+ }
+ }
+ }
+ LOG_TRACE("[Heartbeat2Broker] out hb response process, add broker(%s) timer!",
+ broker_info.GetAddrInfo().c_str());
+ reSetBrokerHBTimer(broker_info);
+ });
+}
+
+void BaseConsumer::buidRegisterRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol) {
+ string reg_msg;
+ RegisterRequestC2M c2m_request;
+ list<string>::iterator it_topics;
+ list<SubscribeInfo>::iterator it_sub;
+ c2m_request.set_clientid(client_uuid_);
+ c2m_request.set_hostname(TubeMQService::Instance()->GetLocalHost());
+ c2m_request.set_requirebound(sub_info_.IsBoundConsume());
+ c2m_request.set_groupname(config_.GetGroupName());
+ c2m_request.set_sessiontime(sub_info_.GetSubscribedTime());
+ // subscribed topic list
+ list<string> sub_topics = sub_info_.GetSubTopics();
+ for (it_topics = sub_topics.begin(); it_topics != sub_topics.end(); ++it_topics) {
+ c2m_request.add_topiclist(*it_topics);
+ }
+ c2m_request.set_defflowcheckid(rmtdata_cache_.GetDefFlowCtrlId());
+ c2m_request.set_groupflowcheckid(rmtdata_cache_.GetGroupFlowCtrlId());
+ c2m_request.set_qrypriorityid(rmtdata_cache_.GetGroupQryPriorityId());
+ // reported subscribed info
+ list<SubscribeInfo> subscribe_lst;
+ rmtdata_cache_.GetSubscribedInfo(subscribe_lst);
+ for (it_sub = subscribe_lst.begin(); it_sub != subscribe_lst.end(); ++it_sub) {
+ c2m_request.add_subscribeinfo(it_sub->ToString());
+ }
+ // get topic conditions
+ list<string> topic_conds = sub_info_.GetTopicConds();
+ for (it_topics = topic_conds.begin(); it_topics != topic_conds.end(); ++it_topics) {
+ c2m_request.add_topiccondition(*it_topics);
+ }
+ // add bound consume info
+ if (sub_info_.IsBoundConsume()) {
+ c2m_request.set_sessionkey(sub_info_.GetSessionKey());
+ c2m_request.set_selectbig(sub_info_.SelectBig());
+ c2m_request.set_totalcount(sub_info_.GetSourceCnt());
+ c2m_request.set_requiredpartition(sub_info_.GetBoundPartInfo());
+ c2m_request.set_notallocated(sub_info_.IsNotAllocated());
+ }
+ // authenticate info
+ if (needGenMasterCertificateInfo(true)) {
+ MasterCertificateInfo* pmst_certinfo = c2m_request.mutable_authinfo();
+ AuthenticateInfo* pauthinfo = pmst_certinfo->mutable_authinfo();
+ genMasterAuthenticateToken(pauthinfo, config_.GetUsrName(), config_.GetUsrPassWord());
+ }
+ //
+ c2m_request.SerializeToString(®_msg);
+ req_protocol->method_id_ = rpc_config::kMasterMethoddConsumerRegister;
+ req_protocol->prot_msg_ = reg_msg;
+}
+
+void BaseConsumer::buidHeartRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol) {
+ string hb_msg;
+ HeartRequestC2M c2m_request;
+ list<string>::iterator it_topics;
+ list<SubscribeInfo>::iterator it_sub;
+ c2m_request.set_clientid(client_uuid_);
+ c2m_request.set_groupname(config_.GetGroupName());
+ c2m_request.set_defflowcheckid(rmtdata_cache_.GetDefFlowCtrlId());
+ c2m_request.set_groupflowcheckid(rmtdata_cache_.GetGroupFlowCtrlId());
+ c2m_request.set_qrypriorityid(rmtdata_cache_.GetGroupQryPriorityId());
+ c2m_request.set_reportsubscribeinfo(false);
+ ConsumerEvent event;
+ list<SubscribeInfo>::iterator it;
+ list<SubscribeInfo> subscribe_info_lst;
+ bool has_event = rmtdata_cache_.PollEventResult(event);
+ // judge if report subscribe info
+ if ((has_event) || (++unreport_times_ > config_.GetMaxSubinfoReportIntvl())) {
+ unreport_times_ = 0;
+ c2m_request.set_reportsubscribeinfo(true);
+ rmtdata_cache_.GetSubscribedInfo(subscribe_info_lst);
+ if (has_event) {
+ EventProto* event_proto = c2m_request.mutable_event();
+ event_proto->set_rebalanceid(event.GetRebalanceId());
+ event_proto->set_optype(event.GetEventType());
+ event_proto->set_status(event.GetEventStatus());
+ list<SubscribeInfo> event_sub = event.GetSubscribeInfoList();
+ for (it = event_sub.begin(); it != event_sub.end(); it++) {
+ event_proto->add_subscribeinfo(it->ToString());
+ }
+ }
+ if (!subscribe_info_lst.empty()) {
+ for (it = subscribe_info_lst.begin(); it != subscribe_info_lst.end(); it++) {
+ c2m_request.add_subscribeinfo(it->ToString());
+ }
+ }
+ }
+ if (needGenMasterCertificateInfo(true)) {
+ MasterCertificateInfo* pmst_certinfo = c2m_request.mutable_authinfo();
+ AuthenticateInfo* pauthinfo = pmst_certinfo->mutable_authinfo();
+ genMasterAuthenticateToken(pauthinfo, config_.GetUsrName(), config_.GetUsrPassWord());
+ }
+ c2m_request.SerializeToString(&hb_msg);
+ req_protocol->method_id_ = rpc_config::kMasterMethoddConsumerHeatbeat;
+ req_protocol->prot_msg_ = hb_msg;
+}
+
+void BaseConsumer::buidCloseRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol) {
+ string close_msg;
+ CloseRequestC2M c2m_request;
+ c2m_request.set_clientid(client_uuid_);
+ c2m_request.set_groupname(config_.GetGroupName());
+ if (needGenMasterCertificateInfo(true)) {
+ MasterCertificateInfo* pmst_certinfo = c2m_request.mutable_authinfo();
+ AuthenticateInfo* pauthinfo = pmst_certinfo->mutable_authinfo();
+ genMasterAuthenticateToken(pauthinfo, config_.GetUsrName(), config_.GetUsrPassWord());
+ }
+ c2m_request.SerializeToString(&close_msg);
+ req_protocol->method_id_ = rpc_config::kMasterMethoddConsumerClose;
+ req_protocol->prot_msg_ = close_msg;
+}
+
+void BaseConsumer::buidRegisterRequestC2B(const PartitionExt& partition,
+ TubeMQCodec::ReqProtocolPtr& req_protocol) {
+ string register_msg;
+ set<string> filter_cond_set;
+ map<string, set<string> > filter_map;
+ RegisterRequestC2B c2b_request;
+ c2b_request.set_clientid(client_uuid_);
+ c2b_request.set_groupname(config_.GetGroupName());
+ c2b_request.set_optype(rpc_config::kRegOpTypeRegister);
+ c2b_request.set_topicname(partition.GetTopic());
+ c2b_request.set_partitionid(partition.GetPartitionId());
+ c2b_request.set_qrypriorityid(rmtdata_cache_.GetGroupQryPriorityId());
+ bool is_first_reg = rmtdata_cache_.IsPartitionFirstReg(partition.GetPartitionKey());
+ c2b_request.set_readstatus(getConsumeReadStatus(is_first_reg));
+ if (sub_info_.IsFilterConsume(partition.GetTopic())) {
+ filter_map = sub_info_.GetTopicFilterMap();
+ if (filter_map.find(partition.GetTopic()) != filter_map.end()) {
+ filter_cond_set = filter_map[partition.GetTopic()];
+ for (set<string>::iterator it_cond = filter_cond_set.begin();
+ it_cond != filter_cond_set.end(); it_cond++) {
+ c2b_request.add_filtercondstr(*it_cond);
+ }
+ }
+ }
+ int64_t part_offset = tb_config::kInvalidValue;
+ sub_info_.GetAssignedPartOffset(partition.GetPartitionKey(), part_offset);
+ if (part_offset != tb_config::kInvalidValue) {
+ c2b_request.set_curroffset(part_offset);
+ }
+ AuthorizedInfo* p_authInfo = c2b_request.mutable_authinfo();
+ genBrokerAuthenticInfo(p_authInfo, true);
+ c2b_request.SerializeToString(®ister_msg);
+ req_protocol->method_id_ = rpc_config::kBrokerMethoddConsumerRegister;
+ req_protocol->prot_msg_ = register_msg;
+}
+
+void BaseConsumer::buidUnRegRequestC2B(const PartitionExt& partition,
+ TubeMQCodec::ReqProtocolPtr& req_protocol) {
+ string unreg_msg;
+ RegisterRequestC2B c2b_request;
+ c2b_request.set_clientid(client_uuid_);
+ c2b_request.set_groupname(config_.GetGroupName());
+ c2b_request.set_optype(rpc_config::kRegOpTypeUnReg);
+ c2b_request.set_topicname(partition.GetTopic());
+ c2b_request.set_partitionid(partition.GetPartitionId());
+ c2b_request.set_readstatus(1);
+ AuthorizedInfo* p_authInfo = c2b_request.mutable_authinfo();
+ genBrokerAuthenticInfo(p_authInfo, true);
+ c2b_request.SerializeToString(&unreg_msg);
+ req_protocol->method_id_ = rpc_config::kBrokerMethoddConsumerRegister;
+ req_protocol->prot_msg_ = unreg_msg;
+}
+
+void BaseConsumer::buidHeartBeatC2B(const list<PartitionExt>& partitions,
+ TubeMQCodec::ReqProtocolPtr& req_protocol) {
+ string hb_msg;
+ HeartBeatRequestC2B c2b_request;
+ list<PartitionExt>::const_iterator it_part;
+ c2b_request.set_clientid(client_uuid_);
+ c2b_request.set_groupname(config_.GetGroupName());
+ c2b_request.set_readstatus(getConsumeReadStatus(false));
+ c2b_request.set_qrypriorityid(rmtdata_cache_.GetGroupQryPriorityId());
+ for (it_part = partitions.begin(); it_part != partitions.end(); ++it_part) {
+ c2b_request.add_partitioninfo(it_part->ToString());
+ }
+ AuthorizedInfo* p_authInfo = c2b_request.mutable_authinfo();
+ genBrokerAuthenticInfo(p_authInfo, true);
+ c2b_request.SerializeToString(&hb_msg);
+ req_protocol->method_id_ = rpc_config::kBrokerMethoddConsumerHeatbeat;
+ req_protocol->prot_msg_ = hb_msg;
+}
+
+void BaseConsumer::buidGetMessageC2B(const PartitionExt& partition,
+ TubeMQCodec::ReqProtocolPtr& req_protocol) {
+ string get_msg;
+ GetMessageRequestC2B c2b_request;
+ c2b_request.set_clientid(client_uuid_);
+ c2b_request.set_groupname(config_.GetGroupName());
+ c2b_request.set_topicname(partition.GetTopic());
+ c2b_request.set_escflowctrl(rmtdata_cache_.IsUnderGroupCtrl());
+ c2b_request.set_partitionid(partition.GetPartitionId());
+ c2b_request.set_lastpackconsumed(partition.IsLastConsumed());
+ c2b_request.set_manualcommitoffset(false);
+ c2b_request.SerializeToString(&get_msg);
+ req_protocol->method_id_ = rpc_config::kBrokerMethoddConsumerGetMsg;
+ req_protocol->prot_msg_ = get_msg;
+}
+
+void BaseConsumer::buidCommitC2B(const PartitionExt& partition, bool is_last_consumed,
+ TubeMQCodec::ReqProtocolPtr& req_protocol) {
+ string commit_msg;
+ CommitOffsetRequestC2B c2b_request;
+ c2b_request.set_clientid(client_uuid_);
+ c2b_request.set_groupname(config_.GetGroupName());
+ c2b_request.set_topicname(partition.GetTopic());
+ c2b_request.set_partitionid(partition.GetPartitionId());
+ c2b_request.set_lastpackconsumed(is_last_consumed);
+ c2b_request.SerializeToString(&commit_msg);
+ req_protocol->method_id_ = rpc_config::kBrokerMethoddConsumerCommit;
+ req_protocol->prot_msg_ = commit_msg;
+}
+
+bool BaseConsumer::processRegisterResponseM2C(int32_t& error_code, string& err_info,
+ const TubeMQCodec::RspProtocolPtr& rsp_protocol) {
+ LOG_TRACE("processRegisterResponseM2C, process message begin");
+ if (!rsp_protocol->success_) {
+ error_code = rsp_protocol->code_;
+ err_info = rsp_protocol->error_msg_;
+ LOG_TRACE("processRegisterResponseM2C, rsp_protocol->success_ not true, out");
+ return false;
+ }
+ RegisterResponseM2C rsp_m2c;
+ bool result = rsp_m2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
+ (int)(rsp_protocol->rsp_body_.data().length()));
+ if (!result) {
+ error_code = err_code::kErrParseFailure;
+ err_info = "Parse RegisterResponseM2C response failure!";
+
+ LOG_TRACE("processRegisterResponseM2C come, parse message failure!");
+ return false;
+ }
+ if (!rsp_m2c.success()) {
+ error_code = rsp_m2c.errcode();
+ err_info = rsp_m2c.errmsg();
+
+ LOG_TRACE("processRegisterResponseM2C come, return failure, errorcode = %d!", error_code);
+ return false;
+ }
+ // update policy
+ if (rsp_m2c.has_notallocated() && !rsp_m2c.notallocated()) {
+ sub_info_.CompAndSetNotAllocated(true, false);
+ }
+ if (rsp_m2c.has_defflowcheckid() || rsp_m2c.has_groupflowcheckid()) {
+ if (rsp_m2c.has_defflowcheckid()) {
+ rmtdata_cache_.UpdateDefFlowCtrlInfo(rsp_m2c.defflowcheckid(), rsp_m2c.defflowcontrolinfo());
+ }
+ int qryPriorityId = rsp_m2c.has_qrypriorityid() ? rsp_m2c.qrypriorityid()
+ : rmtdata_cache_.GetGroupQryPriorityId();
+ rmtdata_cache_.UpdateGroupFlowCtrlInfo(qryPriorityId, rsp_m2c.groupflowcheckid(),
+ rsp_m2c.groupflowcontrolinfo());
+ }
+ if (rsp_m2c.has_authorizedinfo()) {
+ processAuthorizedToken(rsp_m2c.authorizedinfo());
+ }
+ error_code = err_code::kErrSuccess;
+ err_info = "Ok";
+ LOG_TRACE("processRegisterResponseM2C, process finished, out");
+
+ return true;
+}
+
+bool BaseConsumer::processHBResponseM2C(int32_t& error_code, string& err_info,
+ const TubeMQCodec::RspProtocolPtr& rsp_protocol) {
+ LOG_TRACE("processHBResponseM2C, process message begin");
+ if (!rsp_protocol->success_) {
+ error_code = rsp_protocol->code_;
+ err_info = rsp_protocol->error_msg_;
+ LOG_TRACE("processHBResponseM2C success_ is false, errcode=%d, errinfo=%s",
+ error_code, err_info.c_str());
+ return false;
+ }
+ HeartResponseM2C rsp_m2c;
+ bool result = rsp_m2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
+ (int32_t)(rsp_protocol->rsp_body_.data().length()));
+ if (!result) {
+ error_code = err_code::kErrParseFailure;
+ err_info = "Parse HeartResponseM2C response failure!";
+
+ LOG_TRACE("processHBResponseM2C Parse result failure, out");
+ return false;
+ }
+ if (!rsp_m2c.success()) {
+ error_code = rsp_m2c.errcode();
+ err_info = rsp_m2c.errmsg();
+ LOG_TRACE("processHBResponseM2C response is false, errorInfo=%s, out", err_info.c_str());
+ return false;
+ }
+ // update policy
+ if (rsp_m2c.has_notallocated() && !rsp_m2c.notallocated()) {
+ sub_info_.CompAndSetNotAllocated(true, false);
+ }
+ if (rsp_m2c.has_defflowcheckid() || rsp_m2c.has_groupflowcheckid()) {
+ if (rsp_m2c.has_defflowcheckid()) {
+ rmtdata_cache_.UpdateDefFlowCtrlInfo(rsp_m2c.defflowcheckid(), rsp_m2c.defflowcontrolinfo());
+ }
+ int qryPriorityId = rsp_m2c.has_qrypriorityid() ? rsp_m2c.qrypriorityid()
+ : rmtdata_cache_.GetGroupQryPriorityId();
+ rmtdata_cache_.UpdateGroupFlowCtrlInfo(qryPriorityId, rsp_m2c.groupflowcheckid(),
+ rsp_m2c.groupflowcontrolinfo());
+ }
+ if (rsp_m2c.has_authorizedinfo()) {
+ processAuthorizedToken(rsp_m2c.authorizedinfo());
+ }
+ if (rsp_m2c.has_requireauth()) {
+ nextauth_2_master.Set(rsp_m2c.requireauth());
+ }
+ // Get the latest rebalance task
+ if (rsp_m2c.has_event()) {
+ EventProto eventProto = rsp_m2c.event();
+ if (eventProto.rebalanceid() > 0) {
+ list<SubscribeInfo> subcribe_infos;
+ for (int i = 0; i < eventProto.subscribeinfo_size(); i++) {
+ SubscribeInfo sub_info(eventProto.subscribeinfo(i));
+ subcribe_infos.push_back(sub_info);
+ }
+ ConsumerEvent new_event(eventProto.rebalanceid(), eventProto.optype(), subcribe_infos, 0);
+ rmtdata_cache_.OfferEvent(new_event);
+ }
+ }
+ last_master_hbtime_ = Utils::GetCurrentTimeMillis();
+ error_code = err_code::kErrSuccess;
+ err_info = "Ok";
+ LOG_TRACE("processHBResponseM2C, process finished, out");
+
+ return true;
+}
+
+void BaseConsumer::unregister2Brokers(map<NodeInfo, list<PartitionExt> >& unreg_partitions,
+ bool wait_rsp) {
+ string err_info;
+ map<NodeInfo, list<PartitionExt> >::iterator it;
+ list<PartitionExt>::iterator it_part;
+
+ LOG_TRACE("unregister2Brokers, process begin");
+
+ if (unreg_partitions.empty()) {
+ LOG_TRACE("unregister2Brokers, unreg partitions empty, out");
+ return;
+ }
+ for (it = unreg_partitions.begin(); it != unreg_partitions.end(); ++it) {
+ list<PartitionExt> part_list = it->second;
+ for (it_part = part_list.begin(); it_part != part_list.end(); ++it_part) {
+ LOG_TRACE("unregister2Brokers, partitionkey=%s",
+ it_part->GetPartitionKey().c_str());
+ auto request = std::make_shared<RequestContext>();
+ TubeMQCodec::ReqProtocolPtr req_protocol = TubeMQCodec::GetReqProtocol();
+ // build unregister 2 broker request
+ buidUnRegRequestC2B(*it_part, req_protocol);
+ request->codec_ = std::make_shared<TubeMQCodec>();
+ request->ip_ = it_part->GetBrokerHost();
+ request->port_ = it_part->GetBrokerPort();
+ request->timeout_ = config_.GetRpcReadTimeoutMs();
+ request->request_id_ = Singleton<UniqueSeqId>::Instance().Next();
+ req_protocol->request_id_ = request->request_id_;
+ req_protocol->rpc_read_timeout_ = config_.GetRpcReadTimeoutMs() - 500;
+ // send message to target
+ // not need process
+ ResponseContext response_context;
+ if (wait_rsp) {
+ SyncRequest(response_context, request, req_protocol);
+ } else {
+ AsyncRequest(request, req_protocol);
+ }
+ LOG_TRACE("unregister2Brokers, partitionkey=%s return come",
+ it_part->GetPartitionKey().c_str());
+ }
+ }
+}
+
+bool BaseConsumer::processRegResponseB2C(int32_t& error_code, string& err_info,
+ const TubeMQCodec::RspProtocolPtr& rsp_protocol) {
+ LOG_TRACE("processRegResponseB2C, process begin");
+ if (!rsp_protocol->success_) {
+ error_code = rsp_protocol->code_;
+ err_info = rsp_protocol->error_msg_;
+ LOG_TRACE("processRegResponseB2C, not success, out");
+ return false;
+ }
+ RegisterResponseB2C rsp_b2c;
+ bool result = rsp_b2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
+ (int)(rsp_protocol->rsp_body_.data().length()));
+ if (!result) {
+ error_code = err_code::kErrParseFailure;
+ err_info = "Parse RegisterResponseB2C response failure!";
+ LOG_TRACE("processRegResponseB2C, parse body failure, out");
+ return false;
+ }
+ if (!rsp_b2c.success()) {
+ error_code = rsp_b2c.errcode();
+ err_info = rsp_b2c.errmsg();
+ LOG_TRACE("processRegResponseB2C, return failure, error is %s, out", err_info.c_str());
+ return false;
+ }
+ error_code = err_code::kErrSuccess;
+ err_info = "Ok";
+ LOG_TRACE("processRegResponseB2C, success, finished");
+ return true;
+}
+
+void BaseConsumer::convertMessages(int32_t& msg_size, list<Message>& message_list,
+ bool filter_consume, const string& topic_name,
+ GetMessageResponseB2C& rsp_b2c) {
+ // #lizard forgives
+ msg_size = 0;
+ message_list.clear();
+ if (rsp_b2c.messages_size() == 0) {
+ return;
+ }
+ for (int i = 0; i < rsp_b2c.messages_size(); i++) {
+ TransferedMessage tsfMsg = rsp_b2c.messages(i);
+ int32_t flag = tsfMsg.flag();
+ int64_t message_id = tsfMsg.messageid();
+ int32_t in_check_sum = tsfMsg.checksum();
+ int32_t payload_length = tsfMsg.payloaddata().length();
+ int32_t calc_checksum = Utils::Crc32(tsfMsg.payloaddata());
+ if (in_check_sum != calc_checksum) {
+
+ LOG_TRACE("[CONSUMER] convertMessages [%d], Crc32 failure, in=%d, calc=%d, client=%s",
+ i, in_check_sum, calc_checksum, client_uuid_.c_str());
+
+ continue;
+ }
+ int read_pos = 0;
+ int data_len = payload_length;
+ map<string, string> properties;
+ std::unique_ptr<char[]> payload_data(new char[payload_length]);
+ memcpy(&payload_data[0], tsfMsg.payloaddata().c_str(), payload_length);
+ if ((flag & tb_config::kMsgFlagIncProperties) == 1) {
+ if (payload_length < 4) {
+
+ LOG_TRACE("[CONSUMER] convertMessages [%d], payload_length(%d) < 4, client=%s",
+ i, payload_length, client_uuid_.c_str());
+
+ continue;
+ }
+ int32_t attr_len = ntohl(*(int*)(&payload_data[0]));
+ read_pos += 4;
+ data_len -= 4;
+ if (attr_len > data_len) {
+ LOG_TRACE("[CONSUMER] convertMessages [%d], attr_len(%d) > data_len(%d), client=%s",
+ i, attr_len, data_len, client_uuid_.c_str());
+ continue;
+ }
+ string attribute(&payload_data[0] + read_pos, attr_len);
+ read_pos += attr_len;
+ data_len -= attr_len;
+ Utils::Split(attribute, properties, delimiter::kDelimiterComma, delimiter::kDelimiterEqual);
+ if (filter_consume) {
+ map<string, set<string> > topic_filter_map = sub_info_.GetTopicFilterMap();
+ map<string, set<string> >::const_iterator it = topic_filter_map.find(topic_name);
+ if (properties.find(tb_config::kRsvPropKeyFilterItem) != properties.end()) {
+ string msg_key = properties[tb_config::kRsvPropKeyFilterItem];
+ if (it != topic_filter_map.end()) {
+ set<string> filters = it->second;
+ if (filters.find(msg_key) == filters.end()) {
+ LOG_TRACE("[CONSUMER] convertMessages [%d], filter consume, not matched, client=%s",
+ i, client_uuid_.c_str());
+ continue;
+ }
+ }
+ }
+ }
+ }
+ Message message(topic_name, flag, message_id, &payload_data[0] + read_pos, data_len,
+ properties);
+ message_list.push_back(message);
+ msg_size += data_len;
+ }
+
+ LOG_TRACE("[CONSUMER] convertMessages finished, count=%ld, client=%s",
+ message_list.size(), client_uuid_.c_str());
+
+ return;
+}
+
+bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& peer_info,
+ bool filter_consume, const PartitionExt& partition_ext,
+ const string& confirm_context,
+ const TubeMQCodec::RspProtocolPtr& rsp) {
+ string err_info;
+
+ if (!rsp->success_) {
+ rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false);
+ result.SetFailureResult(rsp->code_, rsp->error_msg_, partition_ext.GetTopic(), peer_info);
+
+ LOG_TRACE("[CONSUMER] processGetMessageRspB2C failure, code_=%d, error_msg_=%s, client=%s",
+ rsp->code_, rsp->error_msg_.c_str(), client_uuid_.c_str());
+
+ return false;
+ }
+ GetMessageResponseB2C rsp_b2c;
+ bool ret_result =
+ rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(), (int)(rsp->rsp_body_.data().length()));
+ if (!ret_result) {
+ rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false);
+ result.SetFailureResult(err_code::kErrServerError,
+ "Parse GetMessageResponseB2C response failure!",
+ partition_ext.GetTopic(), peer_info);
+
+ LOG_TRACE("[CONSUMER] processGetMessageRspB2C parse failure, client=%s", client_uuid_.c_str());
+
+ return false;
+ }
+
+ switch (rsp_b2c.errcode()) {
+ case err_code::kErrSuccess: {
+ bool esc_limit = (rsp_b2c.has_escflowctrl() && rsp_b2c.escflowctrl());
+ long data_dltval =
+ rsp_b2c.has_currdatadlt() ? rsp_b2c.currdatadlt() : tb_config::kInvalidValue;
+ long curr_offset = rsp_b2c.has_curroffset() ? rsp_b2c.curroffset() : tb_config::kInvalidValue;
+ bool req_slow = rsp_b2c.has_requireslow() ? rsp_b2c.requireslow() : false;
+ int msg_size = 0;
+ list<Message> message_list;
+ convertMessages(msg_size, message_list, filter_consume, partition_ext.GetTopic(), rsp_b2c);
+ rmtdata_cache_.BookedPartionInfo(partition_ext.GetPartitionKey(), curr_offset,
+ err_code::kErrSuccess, esc_limit, msg_size, 0, data_dltval,
+ req_slow);
+ peer_info.SetCurrOffset(curr_offset);
+ result.SetSuccessResult(err_code::kErrSuccess, partition_ext.GetTopic(), peer_info,
+ confirm_context, message_list);
+ return true;
+ }
+
+ case err_code::kErrHbNoNode:
+ case err_code::kErrCertificateFailure:
+ case err_code::kErrDuplicatePartition: {
+ rmtdata_cache_.RemovePartition(err_info, confirm_context);
+ result.SetFailureResult(rsp_b2c.errcode(), rsp_b2c.errmsg(), partition_ext.GetTopic(),
+ peer_info);
+ return false;
+ }
+
+ case err_code::kErrConsumeSpeedLimit: {
+ // Process with server side speed limit
+ long def_dlttime = rsp_b2c.has_minlimittime() ? rsp_b2c.minlimittime()
+ : config_.GetMsgNotFoundWaitPeriodMs();
+ rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false,
+ tb_config::kInvalidValue, rsp_b2c.errcode(), false, 0,
+ def_dlttime, tb_config::kInvalidValue);
+ result.SetFailureResult(rsp_b2c.errcode(), rsp_b2c.errmsg(), partition_ext.GetTopic(),
+ peer_info);
+ return false;
+ }
+
+ case err_code::kErrNotFound:
+ case err_code::kErrForbidden:
+ case err_code::kErrMoved:
+ case err_code::kErrServiceUnavilable:
+ default: {
+ // Slow down the request based on the limitation configuration when meet these errors
+ long limit_dlt = 300;
+ switch (rsp_b2c.errcode()) {
+ case err_code::kErrForbidden: {
+ limit_dlt = 2000;
+ break;
+ }
+ case err_code::kErrServiceUnavilable: {
+ limit_dlt = 300;
+ break;
+ }
+ case err_code::kErrMoved: {
+ limit_dlt = 200;
+ break;
+ }
+ case err_code::kErrNotFound: {
+ limit_dlt = config_.GetMsgNotFoundWaitPeriodMs();
+ break;
+ }
+ default: {
+ //
+ }
+ }
+ rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false,
+ tb_config::kInvalidValue, rsp_b2c.errcode(), false, 0, limit_dlt,
+ tb_config::kInvalidValue);
+ result.SetFailureResult(rsp_b2c.errcode(), rsp_b2c.errmsg(), partition_ext.GetTopic(),
+ peer_info);
+ return false;
+ }
+ }
+ return true;
+}
+
+bool BaseConsumer::isClientRunning() { return (status_.Get() == 2); }
+
+string BaseConsumer::buildUUID() {
+ stringstream ss;
+ ss << config_.GetGroupName();
+ ss << "_";
+ ss << TubeMQService::Instance()->GetLocalHost();
+ ss << "-";
+ ss << getpid();
+ ss << "-";
+ ss << Utils::GetCurrentTimeMillis();
+ ss << "-";
+ ss << GetClientIndex();
+ ss << "-";
+ ss << kTubeMQClientVersion;
+ return ss.str();
+}
+
+int32_t BaseConsumer::getConsumeReadStatus(bool is_first_reg) {
+ int32_t readStatus = rpc_config::kConsumeStatusNormal;
+ if (is_first_reg) {
+ if (config_.GetConsumePosition() == 0) {
+ readStatus = rpc_config::kConsumeStatusFromMax;
+ LOG_INFO("[Consumer From Max Offset], clientId=%s", client_uuid_.c_str());
+ } else if (config_.GetConsumePosition() > 0) {
+ readStatus = rpc_config::kConsumeStatusFromMaxAlways;
+ LOG_INFO("[Consumer From Max Offset Always], clientId=%s", client_uuid_.c_str());
+ }
+ }
+ LOG_INFO("[getConsumeReadStatus], readStatus=%d, is_first_reg=%d, config_.GetConsumePosition()=%d",
+ readStatus, is_first_reg, config_.GetConsumePosition());
+ return readStatus;
+}
+
+bool BaseConsumer::initMasterAddress(string& err_info, const string& master_info) {
+ masters_map_.clear();
+ Utils::Split(master_info, masters_map_, delimiter::kDelimiterComma, delimiter::kDelimiterColon);
+ if (masters_map_.empty()) {
+ err_info = "Illegal parameter: master_info is blank!";
+ return false;
+ }
+ bool needXfs = false;
+ map<string, int32_t>::iterator it;
+ for (it = masters_map_.begin(); it != masters_map_.end(); it++) {
+ if (Utils::NeedDnsXfs(it->first)) {
+ needXfs = true;
+ break;
+ }
+ }
+ it = masters_map_.begin();
+ curr_master_addr_ = it->first;
+ if (needXfs) {
+ TubeMQService::Instance()->AddMasterAddress(err_info, master_info);
+ }
+ err_info = "Ok";
+ return true;
+}
+
+void BaseConsumer::getNextMasterAddr(string& ipaddr, int32_t& port) {
+ map<string, int32_t>::iterator it;
+ it = masters_map_.find(curr_master_addr_);
+ if (it != masters_map_.end()) {
+ it++;
+ if (it == masters_map_.end()) {
+ it = masters_map_.begin();
+ }
+ } else {
+ it = masters_map_.begin();
+ }
+ ipaddr = it->first;
+ port = it->second;
+ curr_master_addr_ = it->first;
+ if (Utils::NeedDnsXfs(ipaddr)) {
+ TubeMQService::Instance()->GetXfsMasterAddress(curr_master_addr_, ipaddr);
+ }
+ LOG_TRACE("getNextMasterAddr address is %s:%d", ipaddr.c_str(), port);
+}
+
+void BaseConsumer::getCurrentMasterAddr(string& ipaddr, int32_t& port) {
+ ipaddr = curr_master_addr_;
+ port = masters_map_[curr_master_addr_];
+ if (Utils::NeedDnsXfs(ipaddr)) {
+ TubeMQService::Instance()->GetXfsMasterAddress(curr_master_addr_, ipaddr);
+ }
+ LOG_TRACE("getCurrentMasterAddr address is %s:%d", ipaddr.c_str(), port);
+}
+
+bool BaseConsumer::needGenMasterCertificateInfo(bool force) {
+ bool needAdd = false;
+ if (config_.IsAuthenticEnabled()) {
+ if (force) {
+ needAdd = true;
+ nextauth_2_master.Set(false);
+ } else if (nextauth_2_master.Get()) {
+ if (nextauth_2_master.CompareAndSet(true, false)) {
+ needAdd = true;
+ }
+ }
+ }
+ return needAdd;
+}
+
+void BaseConsumer::genBrokerAuthenticInfo(AuthorizedInfo* p_authInfo, bool force) {
+ bool needAdd = false;
+ p_authInfo->set_visitauthorizedtoken(visit_token_.Get());
+ if (config_.IsAuthenticEnabled()) {
+ if (force) {
+ needAdd = true;
+ nextauth_2_broker.Set(false);
+ } else if (nextauth_2_broker.Get()) {
+ if (nextauth_2_broker.CompareAndSet(true, false)) {
+ needAdd = true;
+ }
+ }
+ if (needAdd) {
+ string auth_token =
+ Utils::GenBrokerAuthenticateToken(config_.GetUsrName(), config_.GetUsrPassWord());
+ p_authInfo->set_authauthorizedtoken(auth_token);
+ }
+ }
+}
+
+void BaseConsumer::genMasterAuthenticateToken(AuthenticateInfo* pauthinfo, const string& username,
+ const string usrpassword) {
+ //
+}
+
+void BaseConsumer::processAuthorizedToken(const MasterAuthorizedInfo& authorized_token_info) {
+ visit_token_.Set(authorized_token_info.visitauthorizedtoken());
+ if (authorized_token_info.has_authauthorizedtoken()) {
+ lock_guard<mutex> lck(auth_lock_);
+
+ if (authorized_info_ != authorized_token_info.authauthorizedtoken()) {
+ authorized_info_ = authorized_token_info.authauthorizedtoken();
+ }
+ }
+}
+
+void BaseConsumer::addBrokerHBTimer(const NodeInfo broker) {
+ SteadyTimerPtr timer;
+ int32_t hb_periodms = config_.GetHeartbeatPeriodMs();
+ LOG_TRACE("[addBrokerHBTimer] add hb timer for broker(%s), in!",
+ broker.GetAddrInfo().c_str());
+ lock_guard<mutex> lck(broker_timer_lock_);
+ if (broker_timer_map_.find(broker) == broker_timer_map_.end()) {
+ LOG_TRACE("[addBrokerHBTimer] found no hb timer for broker(%s), add!",
+ broker.GetAddrInfo().c_str());
+ timer = TubeMQService::Instance()->CreateTimer();
+ broker_timer_map_[broker] = timer;
+ timer->expires_after(std::chrono::milliseconds(hb_periodms / 2));
+ timer->async_wait([this, broker](const std::error_code& ec) {
+ if (ec) {
+ return;
+ }
+ processHeartBeat2Broker(broker);
+ });
+ } else {
+ LOG_TRACE("[addBrokerHBTimer] found have hb timer for broker(%s), not add!",
+ broker.GetAddrInfo().c_str());
+ }
+}
+
+void BaseConsumer::reSetBrokerHBTimer(const NodeInfo broker) {
+ SteadyTimerPtr timer;
+ list<PartitionExt> partition_list;
+ int32_t hb_periodms = config_.GetHeartbeatPeriodMs();
+ LOG_TRACE("[reSetBrokerHBTimer] reset hb timer for broker(%s), in!",
+ broker.GetAddrInfo().c_str());
+ lock_guard<mutex> lck(broker_timer_lock_);
+ rmtdata_cache_.GetPartitionByBroker(broker, partition_list);
+ if (partition_list.empty()) {
+ broker_timer_map_.erase(broker);
+ LOG_TRACE("[reSetBrokerHBTimer] no alive partitions for broker(%s), clear timer!",
+ broker.GetAddrInfo().c_str());
+ } else {
+ if (broker_timer_map_.find(broker) != broker_timer_map_.end()) {
+ timer = broker_timer_map_[broker];
+ timer->expires_after(std::chrono::milliseconds(hb_periodms));
+ timer->async_wait([this, broker](const std::error_code& ec) {
+ if (ec) {
+ return;
+ }
+ processHeartBeat2Broker(broker);
+ });
+ LOG_TRACE("[reSetBrokerHBTimer] have alive partitions for broker(%s), reset timer!",
+ broker.GetAddrInfo().c_str());
+ }
+ }
+}
+} // namespace tubemq
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.h b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.h
new file mode 100644
index 0000000..287532d
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.h
@@ -0,0 +1,146 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_IMP_CONSUMER_API_H_
+#define TUBEMQ_CLIENT_IMP_CONSUMER_API_H_
+
+#include <stdlib.h>
+
+#include <list>
+#include <mutex>
+#include <string>
+
+#include "BrokerService.pb.h"
+#include "MasterService.pb.h"
+#include "RPC.pb.h"
+#include "client_service.h"
+#include "client_subinfo.h"
+#include "meta_info.h"
+#include "rmt_data_cache.h"
+#include "tubemq_codec.h"
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+
+namespace tubemq {
+
+using std::mutex;
+using std::string;
+
+
+class BaseConsumer : public BaseClient, public std::enable_shared_from_this<BaseConsumer> {
+ public:
+ BaseConsumer();
+ ~BaseConsumer();
+ bool Start(string& err_info, const ConsumerConfig& config);
+ virtual void ShutDown();
+ bool GetMessage(ConsumerResult& result);
+ bool Confirm(const string& confirm_context, bool is_consumed, ConsumerResult& result);
+ bool GetCurConsumedInfo(map<string, ConsumeOffsetInfo>& consume_info_map);
+
+ private:
+ bool register2Master(int32_t& error_code, string& err_info, bool need_change);
+ void heartBeat2Master();
+ void processRebalanceEvent();
+ void close2Master();
+ void closeAllBrokers();
+
+ private:
+ string buildUUID();
+ bool isClientRunning();
+ bool IsConsumeReady(ConsumerResult& result);
+ int32_t getConsumeReadStatus(bool is_first_reg);
+ bool initMasterAddress(string& err_info, const string& master_info);
+ void getNextMasterAddr(string& ipaddr, int32_t& port);
+ void getCurrentMasterAddr(string& ipaddr, int32_t& port);
+ bool needGenMasterCertificateInfo(bool force);
+ void genBrokerAuthenticInfo(AuthorizedInfo* p_authInfo, bool force);
+ void processAuthorizedToken(const MasterAuthorizedInfo& authorized_token_info);
+ void addBrokerHBTimer(const NodeInfo broker);
+ void reSetBrokerHBTimer(const NodeInfo broker);
+
+ private:
+ void buidRegisterRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol);
+ void buidHeartRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol);
+ void buidCloseRequestC2M(TubeMQCodec::ReqProtocolPtr& req_protocol);
+ void buidRegisterRequestC2B(const PartitionExt& partition,
+ TubeMQCodec::ReqProtocolPtr& req_protocol);
+ void buidUnRegRequestC2B(const PartitionExt& partition,
+ TubeMQCodec::ReqProtocolPtr& req_protocol);
+ void buidHeartBeatC2B(const list<PartitionExt>& partitions,
+ TubeMQCodec::ReqProtocolPtr& req_protocol);
+ void buidGetMessageC2B(const PartitionExt& partition,
+ TubeMQCodec::ReqProtocolPtr& req_protocol);
+ void buidCommitC2B(const PartitionExt& partition, bool is_last_consumed,
+ TubeMQCodec::ReqProtocolPtr& req_protocol);
+ void genMasterAuthenticateToken(AuthenticateInfo* pauthinfo,
+ const string& username, const string usrpassword);
+ bool processRegisterResponseM2C(int32_t& error_code, string& err_info,
+ const TubeMQCodec::RspProtocolPtr& rsp_protocol);
+ bool processHBResponseM2C(int32_t& error_code, string& err_info,
+ const TubeMQCodec::RspProtocolPtr& rsp_protocol);
+ void processDisConnect2Broker(ConsumerEvent& event);
+ void processConnect2Broker(ConsumerEvent& event);
+ void unregister2Brokers(map<NodeInfo, list<PartitionExt> >& unreg_partitions, bool wait_rsp);
+ bool processRegResponseB2C(int32_t& error_code, string& err_info,
+ const TubeMQCodec::RspProtocolPtr& rsp_protocol);
+ void processHeartBeat2Broker(NodeInfo broker_info);
+ bool processGetMessageRspB2C(ConsumerResult& result, PeerInfo& peer_info,
+ bool filter_consume, const PartitionExt& partition_ext, const string& confirm_context,
+ const TubeMQCodec::RspProtocolPtr& rsp_protocol);
+ void convertMessages(int32_t& msg_size, list<Message>& message_list,
+ bool filter_consume, const string& topic_name, GetMessageResponseB2C& rsp_b2c);
+ inline int32_t nextHeartBeatPeriodms();
+ void asyncRegister2Master(bool need_change);
+
+ private:
+ int32_t client_indexid_;
+ string client_uuid_;
+ AtomicInteger status_;
+ ConsumerConfig config_;
+ ClientSubInfo sub_info_;
+ RmtDataCacheCsm rmtdata_cache_;
+ AtomicLong visit_token_;
+ mutable mutex auth_lock_;
+ string authorized_info_;
+ AtomicBoolean nextauth_2_master;
+ AtomicBoolean nextauth_2_broker;
+ string curr_master_addr_;
+ map<string, int32_t> masters_map_;
+ AtomicBoolean is_master_actived_;
+ AtomicInteger master_reg_status_;
+ int32_t master_sh_retry_cnt_;
+ int64_t last_master_hbtime_;
+ int32_t unreport_times_;
+ // master heartbeat timer
+ SteadyTimerPtr heart_beat_timer_;
+ AtomicInteger master_hb_status_;
+ std::shared_ptr<std::thread> rebalance_thread_ptr_;
+ // broker heartbeat timer
+ mutable mutex broker_timer_lock_;
+ map<NodeInfo, SteadyTimerPtr> broker_timer_map_;
+};
+
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_IMP_CONSUMER_API_H_
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
index 8e001a7..c6e0aaa 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.cc
@@ -17,47 +17,60 @@
* under the License.
*/
-#include "tubemq/client_service.h"
+#include "client_service.h"
#include <sstream>
-#include "tubemq/const_config.h"
-#include "tubemq/logger.h"
-#include "tubemq/utils.h"
-
-
+#include "const_config.h"
+#include "logger.h"
+#include "utils.h"
namespace tubemq {
using std::lock_guard;
using std::stringstream;
-
BaseClient::BaseClient(bool is_producer) {
- this->is_producer_ = is_producer;
+ is_producer_ = is_producer;
+ client_index_ = tb_config::kInvalidValue;
}
BaseClient::~BaseClient() {
// no code
}
-/*
-TubeMQService::TubeMQService() {
+TubeMQService* TubeMQService::_instance = NULL;
+
+static mutex tubemq_mutex_service_;
+
+TubeMQService* TubeMQService::Instance() {
+ if (NULL == _instance) {
+ lock_guard<mutex> lck(tubemq_mutex_service_);
+ if (NULL == _instance) {
+ _instance = new TubeMQService;
+ }
+ }
+ return _instance;
+}
+
+TubeMQService::TubeMQService()
+ : timer_executor_(std::make_shared<ExecutorPool>(2)),
+ network_executor_(std::make_shared<ExecutorPool>(4)) {
service_status_.Set(0);
client_index_base_.Set(0);
+ last_check_time_ = 0;
}
TubeMQService::~TubeMQService() {
string err_info;
Stop(err_info);
}
-*/
bool TubeMQService::Start(string& err_info, string conf_file) {
// check configure file
bool result = false;
Fileini fileini;
- string sector = "TubeMQ";
+ string sector = "TubeMQ";
result = Utils::ValidConfigFile(err_info, conf_file);
if (!result) {
@@ -71,60 +84,92 @@ bool TubeMQService::Start(string& err_info, string conf_file) {
if (!result) {
return result;
}
- if (!service_status_.CompareAndSet(0,1)) {
+ if (!service_status_.CompareAndSet(0, 1)) {
err_info = "TubeMQ Service has startted or Stopped!";
return false;
}
iniLogger(fileini, sector);
+ iniPoolThreads(fileini, sector);
+ iniXfsThread(fileini, sector);
service_status_.Set(2);
err_info = "Ok!";
+ LOG_INFO("[TubeMQService] TubeMQ service startted!");
+
return true;
}
bool TubeMQService::Stop(string& err_info) {
if (service_status_.CompareAndSet(2, -1)) {
+ LOG_INFO("[TubeMQService] TubeMQ service begin to stop!");
+ if (dns_xfs_thread_.joinable()) {
+ dns_xfs_thread_.join();
+ }
shutDownClinets();
- timer_executor_.Close();
- network_executor_.Close();
+ timer_executor_->Close();
+ network_executor_->Close();
+ connection_pool_ = nullptr;
+ thread_pool_ = nullptr;
+ LOG_INFO("[TubeMQService] TubeMQ service stopped!");
}
err_info = "OK!";
return true;
}
-bool TubeMQService::IsRunning() {
- return (service_status_.Get() == 2);
-}
+bool TubeMQService::IsRunning() { return (service_status_.Get() == 2); }
void TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
string err_info;
int32_t log_num = 10;
int32_t log_size = 10;
int32_t log_level = 4;
- string log_path = "../log/";
+ string log_path = "../log/tubemq";
fileini.GetValue(err_info, sector, "log_num", log_num, 10);
fileini.GetValue(err_info, sector, "log_size", log_size, 100);
- fileini.GetValue(err_info, sector, "log_path", log_path, "../log/");
+ fileini.GetValue(err_info, sector, "log_path", log_path, "../log/tubemq");
fileini.GetValue(err_info, sector, "log_level", log_level, 4);
- log_level = TUBEMQ_MID(log_level, 0, 4);
+ log_level = TUBEMQ_MID(log_level, 4, 0);
GetLogger().Init(log_path, Logger::Level(log_level), log_size, log_num);
}
+void TubeMQService::iniXfsThread(const Fileini& fileini, const string& sector) {
+ string err_info;
+ int32_t dns_xfs_period_ms = 30 * 1000;
+ fileini.GetValue(err_info, sector, "dns_xfs_period_ms", dns_xfs_period_ms, 30 * 1000);
+ TUBEMQ_MID(dns_xfs_period_ms, tb_config::kMaxIntValue, 10000);
+ dns_xfs_thread_ = std::thread(&TubeMQService::thread_task_dnsxfs, this, dns_xfs_period_ms);
+}
+
+void TubeMQService::iniPoolThreads(const Fileini& fileini, const string& sector) {
+ string err_info;
+ int32_t timer_threads = 2;
+ int32_t network_threads = 4;
+ int32_t signal_threads = 8;
+ fileini.GetValue(err_info, sector, "timer_threads", timer_threads, 2);
+ TUBEMQ_MID(timer_threads, 50, 2);
+ fileini.GetValue(err_info, sector, "network_threads", network_threads, 4);
+ TUBEMQ_MID(network_threads, 50, 4);
+ fileini.GetValue(err_info, sector, "signal_threads", signal_threads, 8);
+ TUBEMQ_MID(signal_threads, 50, 4);
+ timer_executor_->Resize(timer_threads);
+ network_executor_->Resize(network_threads);
+ thread_pool_ = std::make_shared<ThreadPool>(signal_threads);
+ connection_pool_ = std::make_shared<ConnectionPool>(network_executor_);
+}
int32_t TubeMQService::GetClientObjCnt() {
lock_guard<mutex> lck(mutex_);
return clients_map_.size();
}
-
bool TubeMQService::AddClientObj(string& err_info, BaseClient* client_obj) {
- if (service_status_.Get() != 0) {
+ if (!IsRunning()) {
err_info = "Service not startted!";
return false;
}
int32_t client_index = client_index_base_.IncrementAndGet();
lock_guard<mutex> lck(mutex_);
+ clients_map_[client_index] = client_obj;
client_obj->SetClientIndex(client_index);
- this->clients_map_[client_index] = client_obj;
err_info = "Ok";
return true;
}
@@ -132,7 +177,7 @@ bool TubeMQService::AddClientObj(string& err_info, BaseClient* client_obj) {
BaseClient* TubeMQService::GetClientObj(int32_t client_index) const {
BaseClient* client_obj = NULL;
map<int32_t, BaseClient*>::const_iterator it;
-
+
lock_guard<mutex> lck(mutex_);
it = clients_map_.find(client_index);
if (it != clients_map_.end()) {
@@ -141,17 +186,14 @@ BaseClient* TubeMQService::GetClientObj(int32_t client_index) const {
return client_obj;
}
-BaseClient* TubeMQService::RmvClientObj(int32_t client_index) {
- BaseClient* client_obj = NULL;
+void TubeMQService::RmvClientObj(BaseClient* client_obj) {
map<int32_t, BaseClient*>::iterator it;
-
- lock_guard<mutex> lck(mutex_);
- it = clients_map_.find(client_index);
- if (it != clients_map_.end()) {
- client_obj = it->second;
+ if (client_obj != NULL) {
+ lock_guard<mutex> lck(mutex_);
+ int32_t client_index = client_obj->GetClientIndex();
clients_map_.erase(client_index);
+ client_obj->SetClientIndex(tb_config::kInvalidValue);
}
- return client_obj;
}
void TubeMQService::shutDownClinets() const {
@@ -162,5 +204,93 @@ void TubeMQService::shutDownClinets() const {
}
}
+bool TubeMQService::AddMasterAddress(string& err_info, const string& master_info) {
+ map<string, int32_t>::iterator it;
+ map<string, int32_t> tmp_addr_map;
+ Utils::Split(master_info, tmp_addr_map, delimiter::kDelimiterComma, delimiter::kDelimiterColon);
+ if (tmp_addr_map.empty()) {
+ err_info = "Illegal parameter: master_info is blank!";
+ return false;
+ }
+ for (it = tmp_addr_map.begin(); it != tmp_addr_map.end();) {
+ if (!Utils::NeedDnsXfs(it->first)) {
+ tmp_addr_map.erase(it++);
+ }
+ }
+ if (tmp_addr_map.empty()) {
+ err_info = "Ok";
+ return true;
+ }
+ if (addNeedDnsXfsAddr(tmp_addr_map)) {
+ updMasterAddrByDns();
+ }
+ err_info = "Ok";
+ return true;
+}
+
+void TubeMQService::GetXfsMasterAddress(const string& source, string& target) {
+ target = source;
+ lock_guard<mutex> lck(mutex_);
+ if (master_source_.find(source) != master_source_.end()) {
+ target = master_target_[source];
+ }
+}
+
+void TubeMQService::thread_task_dnsxfs(int dns_xfs_period_ms) {
+ LOG_INFO("[TubeMQService] DSN transfer thread startted!");
+ while (true) {
+ if (TubeMQService::Instance()->GetServiceStatus() <= 0) {
+ break;
+ }
+ if ((Utils::GetCurrentTimeMillis() - last_check_time_) >= dns_xfs_period_ms) {
+ TubeMQService::Instance()->updMasterAddrByDns();
+ last_check_time_ = Utils::GetCurrentTimeMillis();
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ LOG_INFO("[TubeMQService] DSN transfer thread stopped!");
+}
+
+bool TubeMQService::hasXfsTask(map<string, int32_t>& src_addr_map) {
+ lock_guard<mutex> lck(mutex_);
+ if (!master_source_.empty()) {
+ src_addr_map = master_source_;
+ return true;
+ }
+ return false;
+}
+
+bool TubeMQService::addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map) {
+ bool added = false;
+ map<string, int32_t>::iterator it;
+ if (!src_addr_map.empty()) {
+ lock_guard<mutex> lck(mutex_);
+ for (it = src_addr_map.begin(); it != src_addr_map.end(); it++) {
+ if (master_source_.find(it->first) == master_source_.end()) {
+ added = true;
+ master_source_[it->first] = it->second;
+ }
+ }
+ }
+ return added;
+}
+
+void TubeMQService::updMasterAddrByDns() {
+ map<string, int32_t> tmp_src_addr_map;
+ map<string, string> tmp_tgt_addr_map;
+ map<string, int32_t>::iterator it;
+ if (!hasXfsTask(tmp_src_addr_map)) {
+ return;
+ }
+ Utils::XfsAddrByDns(tmp_src_addr_map, tmp_tgt_addr_map);
+ lock_guard<mutex> lck(mutex_);
+ if (tmp_tgt_addr_map.empty()) {
+ for (it = tmp_src_addr_map.begin(); it != tmp_src_addr_map.end(); it++) {
+ master_target_[it->first] = it->first;
+ }
+ } else {
+ master_target_ = tmp_tgt_addr_map;
+ }
+}
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
new file mode 100644
index 0000000..c6e0aaa
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
@@ -0,0 +1,296 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "client_service.h"
+
+#include <sstream>
+
+#include "const_config.h"
+#include "logger.h"
+#include "utils.h"
+
+namespace tubemq {
+
+using std::lock_guard;
+using std::stringstream;
+
+BaseClient::BaseClient(bool is_producer) {
+ is_producer_ = is_producer;
+ client_index_ = tb_config::kInvalidValue;
+}
+
+BaseClient::~BaseClient() {
+ // no code
+}
+
+TubeMQService* TubeMQService::_instance = NULL;
+
+static mutex tubemq_mutex_service_;
+
+TubeMQService* TubeMQService::Instance() {
+ if (NULL == _instance) {
+ lock_guard<mutex> lck(tubemq_mutex_service_);
+ if (NULL == _instance) {
+ _instance = new TubeMQService;
+ }
+ }
+ return _instance;
+}
+
+TubeMQService::TubeMQService()
+ : timer_executor_(std::make_shared<ExecutorPool>(2)),
+ network_executor_(std::make_shared<ExecutorPool>(4)) {
+ service_status_.Set(0);
+ client_index_base_.Set(0);
+ last_check_time_ = 0;
+}
+
+TubeMQService::~TubeMQService() {
+ string err_info;
+ Stop(err_info);
+}
+
+bool TubeMQService::Start(string& err_info, string conf_file) {
+ // check configure file
+ bool result = false;
+ Fileini fileini;
+ string sector = "TubeMQ";
+
+ result = Utils::ValidConfigFile(err_info, conf_file);
+ if (!result) {
+ return result;
+ }
+ result = fileini.Loadini(err_info, conf_file);
+ if (!result) {
+ return result;
+ }
+ result = Utils::GetLocalIPV4Address(err_info, local_host_);
+ if (!result) {
+ return result;
+ }
+ if (!service_status_.CompareAndSet(0, 1)) {
+ err_info = "TubeMQ Service has startted or Stopped!";
+ return false;
+ }
+ iniLogger(fileini, sector);
+ iniPoolThreads(fileini, sector);
+ iniXfsThread(fileini, sector);
+ service_status_.Set(2);
+ err_info = "Ok!";
+ LOG_INFO("[TubeMQService] TubeMQ service startted!");
+
+ return true;
+}
+
+bool TubeMQService::Stop(string& err_info) {
+ if (service_status_.CompareAndSet(2, -1)) {
+ LOG_INFO("[TubeMQService] TubeMQ service begin to stop!");
+ if (dns_xfs_thread_.joinable()) {
+ dns_xfs_thread_.join();
+ }
+ shutDownClinets();
+ timer_executor_->Close();
+ network_executor_->Close();
+ connection_pool_ = nullptr;
+ thread_pool_ = nullptr;
+ LOG_INFO("[TubeMQService] TubeMQ service stopped!");
+ }
+ err_info = "OK!";
+ return true;
+}
+
+bool TubeMQService::IsRunning() { return (service_status_.Get() == 2); }
+
+void TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
+ string err_info;
+ int32_t log_num = 10;
+ int32_t log_size = 10;
+ int32_t log_level = 4;
+ string log_path = "../log/tubemq";
+ fileini.GetValue(err_info, sector, "log_num", log_num, 10);
+ fileini.GetValue(err_info, sector, "log_size", log_size, 100);
+ fileini.GetValue(err_info, sector, "log_path", log_path, "../log/tubemq");
+ fileini.GetValue(err_info, sector, "log_level", log_level, 4);
+ log_level = TUBEMQ_MID(log_level, 4, 0);
+ GetLogger().Init(log_path, Logger::Level(log_level), log_size, log_num);
+}
+
+void TubeMQService::iniXfsThread(const Fileini& fileini, const string& sector) {
+ string err_info;
+ int32_t dns_xfs_period_ms = 30 * 1000;
+ fileini.GetValue(err_info, sector, "dns_xfs_period_ms", dns_xfs_period_ms, 30 * 1000);
+ TUBEMQ_MID(dns_xfs_period_ms, tb_config::kMaxIntValue, 10000);
+ dns_xfs_thread_ = std::thread(&TubeMQService::thread_task_dnsxfs, this, dns_xfs_period_ms);
+}
+
+void TubeMQService::iniPoolThreads(const Fileini& fileini, const string& sector) {
+ string err_info;
+ int32_t timer_threads = 2;
+ int32_t network_threads = 4;
+ int32_t signal_threads = 8;
+ fileini.GetValue(err_info, sector, "timer_threads", timer_threads, 2);
+ TUBEMQ_MID(timer_threads, 50, 2);
+ fileini.GetValue(err_info, sector, "network_threads", network_threads, 4);
+ TUBEMQ_MID(network_threads, 50, 4);
+ fileini.GetValue(err_info, sector, "signal_threads", signal_threads, 8);
+ TUBEMQ_MID(signal_threads, 50, 4);
+ timer_executor_->Resize(timer_threads);
+ network_executor_->Resize(network_threads);
+ thread_pool_ = std::make_shared<ThreadPool>(signal_threads);
+ connection_pool_ = std::make_shared<ConnectionPool>(network_executor_);
+}
+
+int32_t TubeMQService::GetClientObjCnt() {
+ lock_guard<mutex> lck(mutex_);
+ return clients_map_.size();
+}
+
+bool TubeMQService::AddClientObj(string& err_info, BaseClient* client_obj) {
+ if (!IsRunning()) {
+ err_info = "Service not startted!";
+ return false;
+ }
+ int32_t client_index = client_index_base_.IncrementAndGet();
+ lock_guard<mutex> lck(mutex_);
+ clients_map_[client_index] = client_obj;
+ client_obj->SetClientIndex(client_index);
+ err_info = "Ok";
+ return true;
+}
+
+BaseClient* TubeMQService::GetClientObj(int32_t client_index) const {
+ BaseClient* client_obj = NULL;
+ map<int32_t, BaseClient*>::const_iterator it;
+
+ lock_guard<mutex> lck(mutex_);
+ it = clients_map_.find(client_index);
+ if (it != clients_map_.end()) {
+ client_obj = it->second;
+ }
+ return client_obj;
+}
+
+void TubeMQService::RmvClientObj(BaseClient* client_obj) {
+ map<int32_t, BaseClient*>::iterator it;
+ if (client_obj != NULL) {
+ lock_guard<mutex> lck(mutex_);
+ int32_t client_index = client_obj->GetClientIndex();
+ clients_map_.erase(client_index);
+ client_obj->SetClientIndex(tb_config::kInvalidValue);
+ }
+}
+
+void TubeMQService::shutDownClinets() const {
+ map<int32_t, BaseClient*>::const_iterator it;
+ lock_guard<mutex> lck(mutex_);
+ for (it = clients_map_.begin(); it != clients_map_.end(); it++) {
+ it->second->ShutDown();
+ }
+}
+
+bool TubeMQService::AddMasterAddress(string& err_info, const string& master_info) {
+ map<string, int32_t>::iterator it;
+ map<string, int32_t> tmp_addr_map;
+ Utils::Split(master_info, tmp_addr_map, delimiter::kDelimiterComma, delimiter::kDelimiterColon);
+ if (tmp_addr_map.empty()) {
+ err_info = "Illegal parameter: master_info is blank!";
+ return false;
+ }
+ for (it = tmp_addr_map.begin(); it != tmp_addr_map.end();) {
+ if (!Utils::NeedDnsXfs(it->first)) {
+ tmp_addr_map.erase(it++);
+ }
+ }
+ if (tmp_addr_map.empty()) {
+ err_info = "Ok";
+ return true;
+ }
+ if (addNeedDnsXfsAddr(tmp_addr_map)) {
+ updMasterAddrByDns();
+ }
+ err_info = "Ok";
+ return true;
+}
+
+void TubeMQService::GetXfsMasterAddress(const string& source, string& target) {
+ target = source;
+ lock_guard<mutex> lck(mutex_);
+ if (master_source_.find(source) != master_source_.end()) {
+ target = master_target_[source];
+ }
+}
+
+void TubeMQService::thread_task_dnsxfs(int dns_xfs_period_ms) {
+ LOG_INFO("[TubeMQService] DSN transfer thread startted!");
+ while (true) {
+ if (TubeMQService::Instance()->GetServiceStatus() <= 0) {
+ break;
+ }
+ if ((Utils::GetCurrentTimeMillis() - last_check_time_) >= dns_xfs_period_ms) {
+ TubeMQService::Instance()->updMasterAddrByDns();
+ last_check_time_ = Utils::GetCurrentTimeMillis();
+ }
+ std::this_thread::sleep_for(std::chrono::milliseconds(500));
+ }
+ LOG_INFO("[TubeMQService] DSN transfer thread stopped!");
+}
+
+bool TubeMQService::hasXfsTask(map<string, int32_t>& src_addr_map) {
+ lock_guard<mutex> lck(mutex_);
+ if (!master_source_.empty()) {
+ src_addr_map = master_source_;
+ return true;
+ }
+ return false;
+}
+
+bool TubeMQService::addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map) {
+ bool added = false;
+ map<string, int32_t>::iterator it;
+ if (!src_addr_map.empty()) {
+ lock_guard<mutex> lck(mutex_);
+ for (it = src_addr_map.begin(); it != src_addr_map.end(); it++) {
+ if (master_source_.find(it->first) == master_source_.end()) {
+ added = true;
+ master_source_[it->first] = it->second;
+ }
+ }
+ }
+ return added;
+}
+
+void TubeMQService::updMasterAddrByDns() {
+ map<string, int32_t> tmp_src_addr_map;
+ map<string, string> tmp_tgt_addr_map;
+ map<string, int32_t>::iterator it;
+ if (!hasXfsTask(tmp_src_addr_map)) {
+ return;
+ }
+ Utils::XfsAddrByDns(tmp_src_addr_map, tmp_tgt_addr_map);
+ lock_guard<mutex> lck(mutex_);
+ if (tmp_tgt_addr_map.empty()) {
+ for (it = tmp_src_addr_map.begin(); it != tmp_src_addr_map.end(); it++) {
+ master_target_[it->first] = it->first;
+ }
+ } else {
+ master_target_ = tmp_tgt_addr_map;
+ }
+}
+
+} // namespace tubemq