You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@tubemq.apache.org by gx...@apache.org on 2020/09/15 11:47:05 UTC
[incubator-tubemq] 49/50: [TUBEMQ-350]C++ SDK client code adj (#262)
This is an automated email from the ASF dual-hosted git repository.
gxcheng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git
commit 1cc56e53bfec60ce7eb5fd9cd4c3d95091463212
Author: charlely <41...@users.noreply.github.com>
AuthorDate: Tue Sep 15 12:26:53 2020 +0800
[TUBEMQ-350]C++ SDK client code adj (#262)
Co-authored-by: charleli <ch...@tencent.com>
---
.../tubemq-client-cpp/src/CMakeLists.txt | 2 +-
.../tubemq-client-cpp/src/baseconsumer.cc | 64 ++--
.../tubemq-client-cpp/src/client_connection.cc | 287 ++++++++++++++
.../tubemq-client-cpp/src/client_connection.h | 101 +++++
.../tubemq-client-cpp/src/client_service.h | 417 ++++++---------------
.../tubemq-client-cpp/src/client_subinfo.cc | 55 +--
.../tubemq-client-cpp/src/client_subinfo.h | 79 ++++
.../tubemq-client-cpp/src/connection.h | 14 +-
.../tubemq-client-cpp/src/connection_pool.h | 5 +-
.../tubemq-client-cpp/src/const_config.h | 120 ++++++
.../tubemq-client-cpp/src/const_rpc.h | 91 +++++
.../tubemq-client-cpp/src/executor_pool.cc | 21 +-
.../tubemq-client-cpp/src/executor_pool.h | 95 +++++
.../tubemq-client-cpp/src/file_ini.cc | 26 +-
.../tubemq-client-cpp/src/file_ini.h | 51 +++
.../tubemq-client-cpp/src/flowctrl_def.cc | 244 ++++++------
.../tubemq-client-cpp/src/flowctrl_def.h | 144 +++++++
.../tubemq-client-cpp/src/logger.cc | 9 +-
tubemq-client-twins/tubemq-client-cpp/src/logger.h | 118 ++++++
.../tubemq-client-cpp/src/meta_info.cc | 417 +++++++++++----------
.../tubemq-client-cpp/src/meta_info.h | 188 ++++++++++
.../tubemq-client-cpp/src/noncopyable.h | 37 ++
.../tubemq-client-cpp/src/rmt_data_cache.cc | 274 +++++++++-----
.../tubemq-client-cpp/src/rmt_data_cache.h | 166 ++++++++
.../src/{thread_pool.h => singleton.h} | 60 ++-
.../tubemq-client-cpp/src/thread_pool.h | 5 +-
.../tubemq-client-cpp/src/tubemq_client.cc | 184 +++++++++
.../tubemq-client-cpp/src/tubemq_codec.h | 21 +-
.../tubemq-client-cpp/src/tubemq_config.cc | 342 +++++++++--------
.../tubemq-client-cpp/src/tubemq_message.cc | 126 ++++---
.../tubemq-client-cpp/src/tubemq_return.cc | 123 +++---
.../tubemq-client-cpp/src/unique_seq_id.h | 41 ++
tubemq-client-twins/tubemq-client-cpp/src/utils.cc | 238 +++++++++++-
tubemq-client-twins/tubemq-client-cpp/src/utils.h | 71 ++++
.../tubemq-client-cpp/src/version.h | 33 ++
35 files changed, 3115 insertions(+), 1154 deletions(-)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt b/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
index 5995d5c..938ca2a 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
@@ -23,7 +23,7 @@ cmake_minimum_required (VERSION 3.1)
AUX_SOURCE_DIRECTORY(. CURRENT_DIR_SRCS)
ADD_LIBRARY(tubemq STATIC ${CURRENT_DIR_SRCS})
-TARGET_LINK_LIBRARIES (tubemq)
+TARGET_LINK_LIBRARIES (tubemq libprotobuf.a liblog4cplus.a tubemq_proto)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
index 215892d..714b353 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/baseconsumer.cc
@@ -137,7 +137,7 @@ bool BaseConsumer::GetMessage(ConsumerResult& result) {
string err_info;
PartitionExt partition_ext;
string confirm_context;
-
+
if (!IsConsumeReady(result)) {
return false;
}
@@ -146,7 +146,9 @@ bool BaseConsumer::GetMessage(ConsumerResult& result) {
result.SetFailureResult(error_code, err_info);
return false;
}
- long curr_offset = tb_config::kInvalidValue;
+
+ int64_t curr_offset = tb_config::kInvalidValue;
+
bool filter_consume = sub_info_.IsFilterConsume(partition_ext.GetTopic());
PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
partition_ext.GetPartitionKey(), curr_offset);
@@ -206,9 +208,11 @@ bool BaseConsumer::IsConsumeReady(ConsumerResult& result) {
if (err_code::kErrSuccess == ret_code) {
return true;
}
- if ((config_.GetMaxPartCheckPeriodMs() > 0)
- && (Utils::GetCurrentTimeMillis() - start_time
- > config_.GetMaxPartCheckPeriodMs())) {
+
+ if ((config_.GetMaxPartCheckPeriodMs() >= 0)
+ && (Utils::GetCurrentTimeMillis() - start_time
+ >= config_.GetMaxPartCheckPeriodMs())) {
+
switch (ret_code) {
case err_code::kErrNoPartAssigned: {
result.SetFailureResult(ret_code,
@@ -277,7 +281,9 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
string part_key = Utils::Trim(confirm_context.substr(0, pos1));
string booked_time_str =
Utils::Trim(confirm_context.substr(pos1 + token1.size(), confirm_context.size()));
- long booked_time = atol(booked_time_str.c_str());
+
+ int64_t booked_time = atol(booked_time_str.c_str());
+
pos1 = part_key.find(token2);
if (string::npos == pos1) {
result.SetFailureResult(err_code::kErrBadRequest,
@@ -305,7 +311,9 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
"Not found the partition by confirm_context!");
return false;
}
- long curr_offset = tb_config::kInvalidValue;
+
+ int64_t curr_offset = tb_config::kInvalidValue;
+
PeerInfo peer_info(partition_ext.GetBrokerHost(), partition_ext.GetPartitionId(),
partition_ext.GetPartitionKey(), curr_offset);
auto request = std::make_shared<RequestContext>();
@@ -344,7 +352,9 @@ bool BaseConsumer::Confirm(const string& confirm_context, bool is_consumed,
if (rsp->success_) {
CommitOffsetResponseB2C rsp_b2c;
ret_result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
- (int)(rsp->rsp_body_.data().length()));
+
+ (int32_t)(rsp->rsp_body_.data().length()));
+
if (ret_result) {
if (rsp_b2c.success()) {
curr_offset = rsp_b2c.curroffset();
@@ -439,7 +449,8 @@ bool BaseConsumer::register2Master(int32_t& error_code, string& err_info, bool n
error_code = error.Value();
err_info = error.Message();
}
- if (error_code == err_code::kErrConsumeGroupForbidden
+
+ if (error_code == err_code::kErrConsumeGroupForbidden
|| error_code == err_code::kErrConsumeContentForbidden) {
// set regist process status to existed
master_reg_status_.CompareAndSet(1, 0);
@@ -805,7 +816,8 @@ void BaseConsumer::processHeartBeat2Broker(NodeInfo broker_info) {
if (rsp->success_) {
HeartBeatResponseB2C rsp_b2c;
bool result = rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(),
- (int)(rsp->rsp_body_.data().length()));
+ (int32_t)(rsp->rsp_body_.data().length()));
+
if (result) {
set<string> partition_keys;
if (rsp_b2c.success()) {
@@ -1066,7 +1078,8 @@ bool BaseConsumer::processRegisterResponseM2C(int32_t& error_code, string& err_i
}
RegisterResponseM2C rsp_m2c;
bool result = rsp_m2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
- (int)(rsp_protocol->rsp_body_.data().length()));
+ (int32_t)(rsp_protocol->rsp_body_.data().length()));
+
if (!result) {
error_code = err_code::kErrParseFailure;
err_info = "Parse RegisterResponseM2C response failure!";
@@ -1223,7 +1236,8 @@ bool BaseConsumer::processRegResponseB2C(int32_t& error_code, string& err_info,
}
RegisterResponseB2C rsp_b2c;
bool result = rsp_b2c.ParseFromArray(rsp_protocol->rsp_body_.data().c_str(),
- (int)(rsp_protocol->rsp_body_.data().length()));
+ (int32_t)(rsp_protocol->rsp_body_.data().length()));
+
if (!result) {
error_code = err_code::kErrParseFailure;
err_info = "Parse RegisterResponseB2C response failure!";
@@ -1259,10 +1273,8 @@ void BaseConsumer::convertMessages(int32_t& msg_size, list<Message>& message_lis
int32_t payload_length = tsfMsg.payloaddata().length();
int32_t calc_checksum = Utils::Crc32(tsfMsg.payloaddata());
if (in_check_sum != calc_checksum) {
-
LOG_TRACE("[CONSUMER] convertMessages [%d], Crc32 failure, in=%d, calc=%d, client=%s",
i, in_check_sum, calc_checksum, client_uuid_.c_str());
-
continue;
}
int read_pos = 0;
@@ -1272,10 +1284,8 @@ void BaseConsumer::convertMessages(int32_t& msg_size, list<Message>& message_lis
memcpy(&payload_data[0], tsfMsg.payloaddata().c_str(), payload_length);
if ((flag & tb_config::kMsgFlagIncProperties) == 1) {
if (payload_length < 4) {
-
LOG_TRACE("[CONSUMER] convertMessages [%d], payload_length(%d) < 4, client=%s",
i, payload_length, client_uuid_.c_str());
-
continue;
}
int32_t attr_len = ntohl(*(int*)(&payload_data[0]));
@@ -1322,6 +1332,8 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
bool filter_consume, const PartitionExt& partition_ext,
const string& confirm_context,
const TubeMQCodec::RspProtocolPtr& rsp) {
+
+ // #lizard forgives
string err_info;
if (!rsp->success_) {
@@ -1334,8 +1346,9 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
return false;
}
GetMessageResponseB2C rsp_b2c;
- bool ret_result =
- rsp_b2c.ParseFromArray(rsp->rsp_body_.data().c_str(), (int)(rsp->rsp_body_.data().length()));
+ bool ret_result = rsp_b2c.ParseFromArray(
+ rsp->rsp_body_.data().c_str(), (int32_t)(rsp->rsp_body_.data().length()));
+
if (!ret_result) {
rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false);
result.SetFailureResult(err_code::kErrServerError,
@@ -1343,16 +1356,18 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
partition_ext.GetTopic(), peer_info);
LOG_TRACE("[CONSUMER] processGetMessageRspB2C parse failure, client=%s", client_uuid_.c_str());
-
return false;
}
switch (rsp_b2c.errcode()) {
case err_code::kErrSuccess: {
bool esc_limit = (rsp_b2c.has_escflowctrl() && rsp_b2c.escflowctrl());
- long data_dltval =
+
+ int64_t data_dltval =
rsp_b2c.has_currdatadlt() ? rsp_b2c.currdatadlt() : tb_config::kInvalidValue;
- long curr_offset = rsp_b2c.has_curroffset() ? rsp_b2c.curroffset() : tb_config::kInvalidValue;
+ int64_t curr_offset = rsp_b2c.has_curroffset() ?
+ rsp_b2c.curroffset() : tb_config::kInvalidValue;
+
bool req_slow = rsp_b2c.has_requireslow() ? rsp_b2c.requireslow() : false;
int msg_size = 0;
list<Message> message_list;
@@ -1377,7 +1392,8 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
case err_code::kErrConsumeSpeedLimit: {
// Process with server side speed limit
- long def_dlttime = rsp_b2c.has_minlimittime() ? rsp_b2c.minlimittime()
+
+ int64_t def_dlttime = rsp_b2c.has_minlimittime() ? rsp_b2c.minlimittime()
: config_.GetMsgNotFoundWaitPeriodMs();
rmtdata_cache_.RelPartition(err_info, filter_consume, confirm_context, false,
tb_config::kInvalidValue, rsp_b2c.errcode(), false, 0,
@@ -1393,7 +1409,7 @@ bool BaseConsumer::processGetMessageRspB2C(ConsumerResult& result, PeerInfo& pee
case err_code::kErrServiceUnavilable:
default: {
// Slow down the request based on the limitation configuration when meet these errors
- long limit_dlt = 300;
+ int64_t limit_dlt = 300;
switch (rsp_b2c.errcode()) {
case err_code::kErrForbidden: {
limit_dlt = 2000;
@@ -1455,8 +1471,6 @@ int32_t BaseConsumer::getConsumeReadStatus(bool is_first_reg) {
LOG_INFO("[Consumer From Max Offset Always], clientId=%s", client_uuid_.c_str());
}
}
- LOG_INFO("[getConsumeReadStatus], readStatus=%d, is_first_reg=%d, config_.GetConsumePosition()=%d",
- readStatus, is_first_reg, config_.GetConsumePosition());
return readStatus;
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
new file mode 100644
index 0000000..c795d35
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.cc
@@ -0,0 +1,287 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "client_connection.h"
+
+using namespace tubemq;
+
+void ClientConnection::AsyncWrite(RequestContextPtr& req) {
+ auto self = shared_from_this();
+ executor_->Post([self, this, req]() {
+ if (request_list_.find(req->request_id_) != request_list_.end()) {
+ LOG_ERROR("Write requestid[%d] is repeat", req->request_id_);
+ return;
+ }
+ auto& transport_req = request_list_[req->request_id_];
+ transport_req.req_ = req;
+ bool queue_empty = write_queue_.empty();
+ write_queue_.push_back(req->request_id_);
+ if (req->timeout_ > 0) {
+ transport_req.deadline_ = executor_->CreateSteadyTimer();
+ transport_req.deadline_->expires_after(std::chrono::milliseconds(req->timeout_));
+ transport_req.deadline_->async_wait(std::bind(&ClientConnection::requestTimeoutHandle,
+ shared_from_this(), std::placeholders::_1,
+ transport_req.req_));
+ }
+ if (IsConnected() && queue_empty) {
+ asyncWrite();
+ }
+ });
+}
+
+void ClientConnection::Close() {
+ auto self = shared_from_this();
+ executor_->Post([self, this]() { close(); });
+}
+
+void ClientConnection::requestTimeoutHandle(const std::error_code& ec, RequestContextPtr req) {
+ if (ec) {
+ return;
+ }
+ auto request_id = req->request_id_;
+ auto err = ErrorCode(err_code::kErrNetWorkTimeout, "Request is timeout");
+ requestCallback(request_id, &err);
+}
+
+void ClientConnection::close(const std::error_code* err) {
+ if (IsStop()) {
+ return;
+ }
+ status_ = kDisconnected;
+ LOG_INFO("%scloseed", ToString().c_str());
+ socket_->close();
+ if (notifier_ != nullptr) {
+ notifier_(err);
+ }
+ releaseAllRequest(err);
+}
+
+void ClientConnection::releaseAllRequest(const std::error_code* err) {
+ std::string msg = "connect close ";
+ if (err != nullptr) {
+ msg += "error_code, value:";
+ msg += std::to_string(err->value());
+ msg += ", msg:";
+ msg += err->message();
+ msg += ", category:";
+ msg += err->category().name();
+ }
+
+ auto terr = ErrorCode(err_code::kErrNetworkError, msg);
+ for (auto& it : request_list_) {
+ it.second.req_->promise_.SetFailed(terr);
+ it.second.deadline_->cancel();
+ }
+ request_list_.clear();
+ write_queue_.clear();
+ recv_buffer_->Reset();
+}
+
+void ClientConnection::connect(const asio::ip::tcp::resolver::results_type& endpoints) {
+ if (IsStop()) {
+ return;
+ }
+ status_ = kConnecting;
+ deadline_->expires_after(std::chrono::milliseconds(kConnnectMaxTimeMs));
+ deadline_->async_wait(std::bind(&ClientConnection::checkDeadline, this, std::placeholders::_1));
+ asio::async_connect(
+ *socket_, endpoints, [this](std::error_code ec, asio::ip::tcp::endpoint endpoint) {
+ deadline_->cancel();
+ if (ec) {
+ status_ = kDisconnected;
+ LOG_ERROR("%s[%s:%d]async connect error:%d, %s, %s", ToString().c_str(), ip_.c_str(),
+ port_, ec.value(), ec.message().c_str(), ec.category().name());
+ close(&ec);
+ return;
+ }
+ status_ = kConnected;
+ socket_->set_option(asio::ip::tcp::no_delay(true));
+ // socket_->set_option(asio::ip::tcp::socket::reuse_address(true));
+ contextString();
+ LOG_INFO("%sis connected", ToString().c_str());
+
+ asyncWrite();
+ asyncRead();
+ });
+}
+
+void ClientConnection::checkDeadline(const std::error_code& ec) {
+ if (ec) {
+ return;
+ }
+ if (IsStop()) {
+ return;
+ }
+ LOG_ERROR("%s connect timeout", ToString().c_str());
+ close();
+}
+
+void ClientConnection::contextString() {
+ std::stringstream stream;
+ stream << "[" << socket_->local_endpoint() << " -> " << socket_->remote_endpoint() << "] ";
+ context_string_ += stream.str();
+}
+
+void ClientConnection::asyncRead() {
+ if (IsStop()) {
+ return;
+ }
+ if (recv_buffer_->capacity() > rpc_config::kRpcRecvBufferMaxBytes) {
+ LOG_ERROR("%sbuffer capacity over config:%d", ToString().c_str(),
+ rpc_config::kRpcRecvBufferMaxBytes);
+ close();
+ return;
+ }
+ recv_buffer_->EnsureWritableBytes(rpc_config::kRpcEnsureWriteableBytes);
+ auto self = shared_from_this();
+ socket_->async_read_some(
+ asio::buffer(recv_buffer_->WriteBegin(), recv_buffer_->WritableBytes()),
+ [self, this](std::error_code ec, std::size_t len) {
+ if (ec) {
+ LOG_ERROR("[%s]async read error:%d, %s, %s", ToString().c_str(), ec.value(),
+ ec.message().c_str(), ec.category().name());
+ close(&ec);
+ return;
+ }
+ if (len == 0) {
+ LOG_ERROR("[%s]async read zero", ToString().c_str());
+ close(&ec);
+ return;
+ }
+ recv_time_ = std::time(nullptr);
+ recv_buffer_->WriteBytes(len);
+ checkPackageDone();
+ LOG_TRACE("[%s]async read done, len:%ld, package_length_:%ld, recvbuffer:%s",
+ ToString().c_str(), len, package_length_, recv_buffer_->String().c_str());
+ asyncRead();
+ });
+}
+
+void ClientConnection::checkPackageDone() {
+ if (check_ == nullptr) {
+ recv_buffer_->Reset();
+ LOG_ERROR("check codec func not set");
+ return;
+ }
+ if (package_length_ > recv_buffer_->length()) {
+ return;
+ }
+ uint32_t request_id = 0;
+ bool has_request_id = false;
+ Any check_out;
+ auto buff = recv_buffer_->Slice();
+ size_t package_length = 0;
+ auto result = check_(buff, check_out, request_id, has_request_id, package_length);
+ if (result < 0) {
+ package_length_ = 0;
+ LOG_ERROR("%s, check codec package result:%d", ToString().c_str(), result);
+ close();
+ return;
+ }
+ if (result == 0) {
+ package_length_ = package_length;
+ return;
+ }
+ ++read_package_number_;
+ package_length_ = 0;
+ recv_buffer_->Skip(result);
+ if (!has_request_id) {
+ auto it = request_list_.begin();
+ if (it == request_list_.end()) {
+ LOG_ERROR("%s, not find request", ToString().c_str());
+ return;
+ }
+ requestCallback(it->first, nullptr, &check_out);
+ return;
+ }
+ requestCallback(request_id, nullptr, &check_out);
+}
+
+void ClientConnection::requestCallback(uint32_t request_id, ErrorCode* err, Any* check_out) {
+ auto it = request_list_.find(request_id);
+ if (it == request_list_.end()) {
+ LOG_INFO("%srequest[%d] not find from request_list_", ToString().c_str(), request_id);
+ return;
+ }
+ auto req = &it->second;
+ req->deadline_->cancel();
+ if (err != nullptr) {
+ LOG_ERROR("%srequest[%d] error:%d, msg:%s", ToString().c_str(), request_id, err->Value(),
+ err->Message().c_str());
+ req->req_->promise_.SetFailed(*err);
+ request_list_.erase(it);
+ return;
+ }
+ if (check_out != nullptr) {
+ ResponseContext rsp;
+ BufferPtr* buff = any_cast<BufferPtr>(check_out);
+ if (buff != nullptr) {
+ req->req_->codec_->Decode(*buff, rsp.rsp_);
+ } else {
+ rsp.rsp_ = *check_out;
+ }
+ req->req_->promise_.SetValue(rsp);
+ } else {
+ req->req_->promise_.SetFailed(ErrorCode(err_code::kErrNetworkError, "response is null"));
+ }
+ request_list_.erase(it);
+}
+
+TransportRequest* ClientConnection::nextTransportRequest() {
+ uint32_t request_id;
+ TransportRequest* transport_req = nullptr;
+ while (!write_queue_.empty()) {
+ request_id = write_queue_.front();
+ write_queue_.pop_front();
+ auto it = request_list_.find(request_id);
+ if (it == request_list_.end()) {
+ continue;
+ }
+ transport_req = &it->second;
+ break;
+ }
+ return transport_req;
+}
+
+void ClientConnection::asyncWrite() {
+ if (IsStop()) {
+ return;
+ }
+ auto transport_req = nextTransportRequest();
+ if (transport_req == nullptr) {
+ return;
+ }
+ auto self = shared_from_this();
+ auto& req = transport_req->req_;
+ asio::async_write(
+ *socket_,
+ asio::buffer(transport_req->req_->buf_->data(), transport_req->req_->buf_->length()),
+ [self, this, req](std::error_code ec, std::size_t length) {
+ if (ec) {
+ close(&ec);
+ LOG_ERROR("[%s]async write error:%d, message:%s, category:%s", ToString().c_str(),
+ ec.value(), ec.message().c_str(), ec.category().name());
+ return;
+ }
+ ++write_package_number_;
+ LOG_TRACE("[%s]async write done, request_id:%d", ToString().c_str(), req->request_id_);
+ asyncWrite();
+ });
+}
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
new file mode 100644
index 0000000..b6c7a36
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_connection.h
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_CLIENT_CONNECTION_
+#define _TUBEMQ_CLIENT_CONNECTION_
+
+#include <stdlib.h>
+
+#include <chrono>
+#include <deque>
+#include <exception>
+#include <functional>
+#include <iostream>
+#include <list>
+#include <memory>
+#include <sstream>
+#include <string>
+#include <unordered_map>
+
+#include "asio.hpp"
+#include "connection.h"
+#include "const_rpc.h"
+#include "executor_pool.h"
+#include "logger.h"
+#include "noncopyable.h"
+#include "transport.h"
+
+namespace tubemq {
+
+struct TransportRequest {
+ RequestContextPtr req_;
+ SteadyTimerPtr deadline_;
+};
+
+class ClientConnection : public Connection, public std::enable_shared_from_this<ClientConnection> {
+ public:
+ // executor: ExecutorPool.Get()
+ // endpoints: executor->CreateTcpResolver()->resolve("ip", port);
+ ClientConnection(ExecutorPtr& executor, const std::string& ip, uint16_t port)
+ : Connection(ip, port),
+ executor_(executor),
+ socket_(std::move(executor->CreateTcpSocket())),
+ recv_buffer_(std::make_shared<Buffer>(rpc_config::kRpcConnectInitBufferSize)),
+ deadline_(std::move(executor->CreateSteadyTimer())) {
+ auto endpoints = executor_->CreateTcpResolver()->resolve(ip_, std::to_string(port_));
+ connect(endpoints);
+ }
+
+ virtual ~ClientConnection() {}
+
+ virtual void AsyncWrite(RequestContextPtr& req);
+
+ virtual void Close();
+
+ private:
+ void requestTimeoutHandle(const std::error_code& ec, RequestContextPtr req);
+
+ void close(const std::error_code* err = nullptr);
+
+ void releaseAllRequest(const std::error_code* err = nullptr);
+ void connect(const asio::ip::tcp::resolver::results_type& endpoints);
+ void checkDeadline(const std::error_code& ec);
+ void contextString();
+ void asyncRead();
+ void checkPackageDone();
+ void requestCallback(uint32_t request_id, ErrorCode* err = nullptr, Any* check_out = nullptr);
+ TransportRequest* nextTransportRequest();
+ void asyncWrite();
+
+ private:
+ using BufferQueue = std::deque<uint32_t>;
+ static const uint32_t kConnnectMaxTimeMs{1000 * 20}; // ms
+ static const uint32_t kReadMaxTimeMs{1000 * 30}; // ms
+ ExecutorPtr executor_;
+ TcpSocketPtr socket_;
+ BufferPtr recv_buffer_;
+ BufferQueue write_queue_;
+ std::unordered_map<uint32_t, TransportRequest> request_list_; // requestid->request context
+ SteadyTimerPtr deadline_;
+};
+using ClientConnectionPtr = std::shared_ptr<ClientConnection>;
+
+} // namespace tubemq
+
+#endif // _TUBEMQ_CLIENT_CONNECTION_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
index c6e0aaa..11f5b42 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_service.h
@@ -1,296 +1,121 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-#include "client_service.h"
-
-#include <sstream>
-
-#include "const_config.h"
-#include "logger.h"
-#include "utils.h"
-
-namespace tubemq {
-
-using std::lock_guard;
-using std::stringstream;
-
-BaseClient::BaseClient(bool is_producer) {
- is_producer_ = is_producer;
- client_index_ = tb_config::kInvalidValue;
-}
-
-BaseClient::~BaseClient() {
- // no code
-}
-
-TubeMQService* TubeMQService::_instance = NULL;
-
-static mutex tubemq_mutex_service_;
-
-TubeMQService* TubeMQService::Instance() {
- if (NULL == _instance) {
- lock_guard<mutex> lck(tubemq_mutex_service_);
- if (NULL == _instance) {
- _instance = new TubeMQService;
- }
- }
- return _instance;
-}
-
-TubeMQService::TubeMQService()
- : timer_executor_(std::make_shared<ExecutorPool>(2)),
- network_executor_(std::make_shared<ExecutorPool>(4)) {
- service_status_.Set(0);
- client_index_base_.Set(0);
- last_check_time_ = 0;
-}
-
-TubeMQService::~TubeMQService() {
- string err_info;
- Stop(err_info);
-}
-
-bool TubeMQService::Start(string& err_info, string conf_file) {
- // check configure file
- bool result = false;
- Fileini fileini;
- string sector = "TubeMQ";
-
- result = Utils::ValidConfigFile(err_info, conf_file);
- if (!result) {
- return result;
- }
- result = fileini.Loadini(err_info, conf_file);
- if (!result) {
- return result;
- }
- result = Utils::GetLocalIPV4Address(err_info, local_host_);
- if (!result) {
- return result;
- }
- if (!service_status_.CompareAndSet(0, 1)) {
- err_info = "TubeMQ Service has startted or Stopped!";
- return false;
- }
- iniLogger(fileini, sector);
- iniPoolThreads(fileini, sector);
- iniXfsThread(fileini, sector);
- service_status_.Set(2);
- err_info = "Ok!";
- LOG_INFO("[TubeMQService] TubeMQ service startted!");
-
- return true;
-}
-
-bool TubeMQService::Stop(string& err_info) {
- if (service_status_.CompareAndSet(2, -1)) {
- LOG_INFO("[TubeMQService] TubeMQ service begin to stop!");
- if (dns_xfs_thread_.joinable()) {
- dns_xfs_thread_.join();
- }
- shutDownClinets();
- timer_executor_->Close();
- network_executor_->Close();
- connection_pool_ = nullptr;
- thread_pool_ = nullptr;
- LOG_INFO("[TubeMQService] TubeMQ service stopped!");
- }
- err_info = "OK!";
- return true;
-}
-
-bool TubeMQService::IsRunning() { return (service_status_.Get() == 2); }
-
-void TubeMQService::iniLogger(const Fileini& fileini, const string& sector) {
- string err_info;
- int32_t log_num = 10;
- int32_t log_size = 10;
- int32_t log_level = 4;
- string log_path = "../log/tubemq";
- fileini.GetValue(err_info, sector, "log_num", log_num, 10);
- fileini.GetValue(err_info, sector, "log_size", log_size, 100);
- fileini.GetValue(err_info, sector, "log_path", log_path, "../log/tubemq");
- fileini.GetValue(err_info, sector, "log_level", log_level, 4);
- log_level = TUBEMQ_MID(log_level, 4, 0);
- GetLogger().Init(log_path, Logger::Level(log_level), log_size, log_num);
-}
-
-void TubeMQService::iniXfsThread(const Fileini& fileini, const string& sector) {
- string err_info;
- int32_t dns_xfs_period_ms = 30 * 1000;
- fileini.GetValue(err_info, sector, "dns_xfs_period_ms", dns_xfs_period_ms, 30 * 1000);
- TUBEMQ_MID(dns_xfs_period_ms, tb_config::kMaxIntValue, 10000);
- dns_xfs_thread_ = std::thread(&TubeMQService::thread_task_dnsxfs, this, dns_xfs_period_ms);
-}
-
-void TubeMQService::iniPoolThreads(const Fileini& fileini, const string& sector) {
- string err_info;
- int32_t timer_threads = 2;
- int32_t network_threads = 4;
- int32_t signal_threads = 8;
- fileini.GetValue(err_info, sector, "timer_threads", timer_threads, 2);
- TUBEMQ_MID(timer_threads, 50, 2);
- fileini.GetValue(err_info, sector, "network_threads", network_threads, 4);
- TUBEMQ_MID(network_threads, 50, 4);
- fileini.GetValue(err_info, sector, "signal_threads", signal_threads, 8);
- TUBEMQ_MID(signal_threads, 50, 4);
- timer_executor_->Resize(timer_threads);
- network_executor_->Resize(network_threads);
- thread_pool_ = std::make_shared<ThreadPool>(signal_threads);
- connection_pool_ = std::make_shared<ConnectionPool>(network_executor_);
-}
-
-int32_t TubeMQService::GetClientObjCnt() {
- lock_guard<mutex> lck(mutex_);
- return clients_map_.size();
-}
-
-bool TubeMQService::AddClientObj(string& err_info, BaseClient* client_obj) {
- if (!IsRunning()) {
- err_info = "Service not startted!";
- return false;
- }
- int32_t client_index = client_index_base_.IncrementAndGet();
- lock_guard<mutex> lck(mutex_);
- clients_map_[client_index] = client_obj;
- client_obj->SetClientIndex(client_index);
- err_info = "Ok";
- return true;
-}
-
-BaseClient* TubeMQService::GetClientObj(int32_t client_index) const {
- BaseClient* client_obj = NULL;
- map<int32_t, BaseClient*>::const_iterator it;
-
- lock_guard<mutex> lck(mutex_);
- it = clients_map_.find(client_index);
- if (it != clients_map_.end()) {
- client_obj = it->second;
- }
- return client_obj;
-}
-
-void TubeMQService::RmvClientObj(BaseClient* client_obj) {
- map<int32_t, BaseClient*>::iterator it;
- if (client_obj != NULL) {
- lock_guard<mutex> lck(mutex_);
- int32_t client_index = client_obj->GetClientIndex();
- clients_map_.erase(client_index);
- client_obj->SetClientIndex(tb_config::kInvalidValue);
- }
-}
-
-void TubeMQService::shutDownClinets() const {
- map<int32_t, BaseClient*>::const_iterator it;
- lock_guard<mutex> lck(mutex_);
- for (it = clients_map_.begin(); it != clients_map_.end(); it++) {
- it->second->ShutDown();
- }
-}
-
-bool TubeMQService::AddMasterAddress(string& err_info, const string& master_info) {
- map<string, int32_t>::iterator it;
- map<string, int32_t> tmp_addr_map;
- Utils::Split(master_info, tmp_addr_map, delimiter::kDelimiterComma, delimiter::kDelimiterColon);
- if (tmp_addr_map.empty()) {
- err_info = "Illegal parameter: master_info is blank!";
- return false;
- }
- for (it = tmp_addr_map.begin(); it != tmp_addr_map.end();) {
- if (!Utils::NeedDnsXfs(it->first)) {
- tmp_addr_map.erase(it++);
- }
- }
- if (tmp_addr_map.empty()) {
- err_info = "Ok";
- return true;
- }
- if (addNeedDnsXfsAddr(tmp_addr_map)) {
- updMasterAddrByDns();
- }
- err_info = "Ok";
- return true;
-}
-
-void TubeMQService::GetXfsMasterAddress(const string& source, string& target) {
- target = source;
- lock_guard<mutex> lck(mutex_);
- if (master_source_.find(source) != master_source_.end()) {
- target = master_target_[source];
- }
-}
-
-void TubeMQService::thread_task_dnsxfs(int dns_xfs_period_ms) {
- LOG_INFO("[TubeMQService] DSN transfer thread startted!");
- while (true) {
- if (TubeMQService::Instance()->GetServiceStatus() <= 0) {
- break;
- }
- if ((Utils::GetCurrentTimeMillis() - last_check_time_) >= dns_xfs_period_ms) {
- TubeMQService::Instance()->updMasterAddrByDns();
- last_check_time_ = Utils::GetCurrentTimeMillis();
- }
- std::this_thread::sleep_for(std::chrono::milliseconds(500));
- }
- LOG_INFO("[TubeMQService] DSN transfer thread stopped!");
-}
-
-bool TubeMQService::hasXfsTask(map<string, int32_t>& src_addr_map) {
- lock_guard<mutex> lck(mutex_);
- if (!master_source_.empty()) {
- src_addr_map = master_source_;
- return true;
- }
- return false;
-}
-
-bool TubeMQService::addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map) {
- bool added = false;
- map<string, int32_t>::iterator it;
- if (!src_addr_map.empty()) {
- lock_guard<mutex> lck(mutex_);
- for (it = src_addr_map.begin(); it != src_addr_map.end(); it++) {
- if (master_source_.find(it->first) == master_source_.end()) {
- added = true;
- master_source_[it->first] = it->second;
- }
- }
- }
- return added;
-}
-
-void TubeMQService::updMasterAddrByDns() {
- map<string, int32_t> tmp_src_addr_map;
- map<string, string> tmp_tgt_addr_map;
- map<string, int32_t>::iterator it;
- if (!hasXfsTask(tmp_src_addr_map)) {
- return;
- }
- Utils::XfsAddrByDns(tmp_src_addr_map, tmp_tgt_addr_map);
- lock_guard<mutex> lck(mutex_);
- if (tmp_tgt_addr_map.empty()) {
- for (it = tmp_src_addr_map.begin(); it != tmp_src_addr_map.end(); it++) {
- master_target_[it->first] = it->first;
- }
- } else {
- master_target_ = tmp_tgt_addr_map;
- }
-}
-
-} // namespace tubemq
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+
+#ifndef TUBEMQ_CLIENT_BASE_CLIENT_H_
+#define TUBEMQ_CLIENT_BASE_CLIENT_H_
+
+#include <stdint.h>
+
+#include <map>
+#include <mutex>
+#include <string>
+#include <thread>
+
+#include "connection_pool.h"
+#include "file_ini.h"
+#include "noncopyable.h"
+#include "rmt_data_cache.h"
+#include "thread_pool.h"
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_message.h"
+#include "tubemq/tubemq_return.h"
+
+namespace tubemq {
+
+using std::map;
+using std::mutex;
+using std::string;
+using std::thread;
+
+class BaseClient {
+ public:
+ explicit BaseClient(bool is_producer);
+ virtual ~BaseClient();
+ virtual void ShutDown() {}
+ void SetClientIndex(int32_t client_index) { client_index_ = client_index; }
+ bool IsProducer() { return is_producer_; }
+ const int32_t GetClientIndex() { return client_index_; }
+
+ protected:
+ bool is_producer_;
+ int32_t client_index_;
+};
+
+class TubeMQService : public noncopyable {
+ public:
+ static TubeMQService* Instance();
+ bool Start(string& err_info, string conf_file = "../conf/tubemqclient.conf");
+ bool Stop(string& err_info);
+ bool IsRunning();
+ const int32_t GetServiceStatus() const { return service_status_.Get(); }
+ int32_t GetClientObjCnt();
+ bool AddClientObj(string& err_info, BaseClient* client_obj);
+ BaseClient* GetClientObj(int32_t client_index) const;
+ void RmvClientObj(BaseClient* client_obj);
+ const string& GetLocalHost() const { return local_host_; }
+ ExecutorPoolPtr GetTimerExecutorPool() { return timer_executor_; }
+ SteadyTimerPtr CreateTimer() { return timer_executor_->Get()->CreateSteadyTimer(); }
+ ExecutorPoolPtr GetNetWorkExecutorPool() { return network_executor_; }
+ ConnectionPoolPtr GetConnectionPool() { return connection_pool_; }
+ template <class function>
+ void Post(function f) {
+ if (thread_pool_ != nullptr) {
+ thread_pool_->Post(f);
+ }
+ }
+ bool AddMasterAddress(string& err_info, const string& master_info);
+ void GetXfsMasterAddress(const string& source, string& target);
+
+ protected:
+ void updMasterAddrByDns();
+
+ private:
+ TubeMQService();
+ ~TubeMQService();
+ void iniLogger(const Fileini& fileini, const string& sector);
+ void iniPoolThreads(const Fileini& fileini, const string& sector);
+ void iniXfsThread(const Fileini& fileini, const string& sector);
+ void thread_task_dnsxfs(int dns_xfs_period_ms);
+ void shutDownClinets() const;
+ bool hasXfsTask(map<string, int32_t>& src_addr_map);
+ bool addNeedDnsXfsAddr(map<string, int32_t>& src_addr_map);
+
+ private:
+ static TubeMQService* _instance;
+ string local_host_;
+ AtomicInteger service_status_;
+ AtomicInteger client_index_base_;
+ mutable mutex mutex_;
+ map<int32_t, BaseClient*> clients_map_;
+ ExecutorPoolPtr timer_executor_;
+ ExecutorPoolPtr network_executor_;
+ ConnectionPoolPtr connection_pool_;
+ std::shared_ptr<ThreadPool> thread_pool_;
+ thread dns_xfs_thread_;
+ mutable mutex dns_mutex_;
+ int64_t last_check_time_;
+ map<string, int32_t> master_source_;
+ map<string, string> master_target_;
+};
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_BASE_CLIENT_H_
\ No newline at end of file
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
index a6c0fc1..8df82fd 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.cc
@@ -17,56 +17,15 @@
* under the License.
*/
-#include "tubemq/client_subinfo.h"
-#include "tubemq/const_config.h"
-#include "tubemq/utils.h"
+#include "client_subinfo.h"
+#include "const_config.h"
+#include "utils.h"
-namespace tubemq {
-
-
-MasterAddrInfo::MasterAddrInfo() {
- curr_master_addr_ = "";
- master_addr_.clear();
-}
-
-bool MasterAddrInfo::InitMasterAddress(string& err_info,
- const string& master_info) {
- master_addr_.clear();
- Utils::Split(master_info, master_addr_, delimiter::kDelimiterComma,
- delimiter::kDelimiterColon);
- if (master_addr_.empty()) {
- err_info = "Illegal parameter: master_info is blank!";
- return false;
- }
-
- map<string, int32_t>::iterator it = master_addr_.begin();
- curr_master_addr_ = it->first;
- err_info = "Ok";
- return true;
-}
-void MasterAddrInfo::GetNextMasterAddr(string& ipaddr, int32_t& port) {
- map<string, int32_t>::iterator it;
- it = master_addr_.find(curr_master_addr_);
- if(it != master_addr_.end()) {
- it++;
- if (it == master_addr_.end()) {
- it = master_addr_.begin();
- }
- } else {
- it = master_addr_.begin();
- }
- port = it->second;
- ipaddr = it->first;
- curr_master_addr_ = it->first;
-}
+namespace tubemq {
-void MasterAddrInfo::GetCurrentMasterAddr(string& ipaddr, int32_t& port) {
- ipaddr = curr_master_addr_;
- port = master_addr_[curr_master_addr_];
-}
ClientSubInfo::ClientSubInfo() {
@@ -154,13 +113,15 @@ bool ClientSubInfo::IsFilterConsume(const string& topic) {
void ClientSubInfo::GetAssignedPartOffset(const string& partition_key, int64_t& offset) {
map<string, int64_t>::iterator it;
- if (first_registered_.Get() && bound_consume_ && not_allocated_.Get()) {
+ offset = tb_config::kInvalidValue;
+ if (!first_registered_.Get()
+ && bound_consume_
+ && not_allocated_.Get()) {
it = assigned_part_map_.find(partition_key);
if (it != assigned_part_map_.end()) {
offset = it->second;
}
}
- offset = tb_config::kInvalidValue;
}
const map<string, set<string> >& ClientSubInfo::GetTopicFilterMap() const {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.h b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.h
new file mode 100644
index 0000000..1b1e7a1
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/client_subinfo.h
@@ -0,0 +1,79 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_SUBINFO_H_
+#define TUBEMQ_CLIENT_SUBINFO_H_
+
+#include <stdint.h>
+#include <list>
+#include <map>
+#include <mutex>
+#include <set>
+#include <string>
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_config.h"
+
+namespace tubemq {
+
+using std::list;
+using std::map;
+using std::set;
+using std::string;
+
+
+class ClientSubInfo {
+ public:
+ ClientSubInfo();
+ void SetConsumeTarget(const ConsumerConfig& config);
+ bool CompAndSetNotAllocated(bool expect, bool update);
+ void BookFstRegistered() { first_registered_.Set(true); }
+ bool IsBoundConsume() const { return bound_consume_; }
+ bool IsNotAllocated() const { return not_allocated_.Get(); }
+ const int64_t GetSubscribedTime() const { return subscribed_time_; }
+ const string& GetSessionKey() const { return session_key_; }
+ const uint32_t GetSourceCnt() const { return source_count_; }
+ bool SelectBig() { return select_big_; }
+ bool IsFilterConsume(const string& topic);
+ void GetAssignedPartOffset(const string& partition_key, int64_t& offset);
+ const string& GetBoundPartInfo() const { return bound_partions_; }
+ const list<string>& GetSubTopics() const { return topics_; }
+ const list<string>& GetTopicConds() const { return topic_conds_; }
+ const map<string, set<string> >& GetTopicFilterMap() const;
+
+ private:
+ bool bound_consume_;
+ AtomicBoolean first_registered_;
+ AtomicBoolean not_allocated_;
+ int64_t subscribed_time_;
+ map<string, set<string> > topic_and_filter_map_;
+ list<string> topics_;
+ list<string> topic_conds_;
+ map<string, bool> topic_filter_map_;
+ // bound info
+ string session_key_;
+ uint32_t source_count_;
+ bool select_big_;
+ map<string, int64_t> assigned_part_map_;
+ string bound_partions_;
+};
+
+} // namespace tubemq
+
+
+#endif // TUBEMQ_CLIENT_SUBINFO_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/connection.h b/tubemq-client-twins/tubemq-client-cpp/src/connection.h
index 30db70b..d4423f5 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/connection.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/connection.h
@@ -50,7 +50,6 @@ class Connection : noncopyable {
: connect_id_(unique_id_.Next()),
status_(kConnecting),
recv_time_(std::time(nullptr)),
- package_length_(0),
create_time_(std::time(nullptr)) {
formatContextString();
}
@@ -60,7 +59,6 @@ class Connection : noncopyable {
connect_id_(unique_id_.Next()),
status_(kConnecting),
recv_time_(std::time(nullptr)),
- package_length_(0),
create_time_(std::time(nullptr)) {
formatContextString();
}
@@ -82,7 +80,13 @@ class Connection : noncopyable {
inline std::time_t GetRecvTime() const { return recv_time_; }
- inline const std::string& ToString() const { return context_string_; }
+ inline const std::string ToString() const {
+ std::stringstream stream;
+ stream << "[recvtime:" << recv_time_ << "]"
+ << "[read_package:" << read_package_number_ << "]"
+ << "[write_package:" << write_package_number_ << "]";
+ return context_string_ + stream.str();
+ }
private:
void formatContextString() {
@@ -101,7 +105,9 @@ class Connection : noncopyable {
std::atomic<Status> status_;
std::string context_string_; // for log
std::time_t recv_time_;
- size_t package_length_;
+ size_t package_length_ = 0;
+ size_t read_package_number_ = 0;
+ size_t write_package_number_ = 0;
private:
std::time_t create_time_;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/connection_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/connection_pool.h
index f3ec067..8ca1881 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/connection_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/connection_pool.h
@@ -40,6 +40,9 @@
#include "transport.h"
namespace tubemq {
+
+using std::string;
+
class ConnectionPool : noncopyable {
public:
explicit ConnectionPool(ExecutorPoolPtr& executor_pool)
@@ -70,8 +73,8 @@ class ConnectionPool : noncopyable {
}
if (it->second->GetRecvTime() + rpc_config::kRpcInvalidConnectOverTime < std::time(nullptr)) {
it->second->Close();
- it = connection_pool_.erase(it);
LOG_ERROR("connection pool clear overtime connect:%s", it->second->ToString().c_str());
+ it = connection_pool_.erase(it);
continue;
}
++it;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/const_config.h b/tubemq-client-twins/tubemq-client-cpp/src/const_config.h
new file mode 100644
index 0000000..d22f7e7
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/const_config.h
@@ -0,0 +1,120 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_CONST_CONFIG_H_
+#define TUBEMQ_CLIENT_CONST_CONFIG_H_
+
+#include <stdint.h>
+
+#include <algorithm>
+#include <map>
+#include <string>
+
+namespace tubemq {
+
+using std::string;
+
+#define TUBEMQ_MAX(a, b) std::max((a), (b))
+#define TUBEMQ_MIN(a, b) std::min((a), (b))
+#define TUBEMQ_MID(data, max, min) TUBEMQ_MAX(min, TUBEMQ_MIN((max), (data)))
+
+// configuration value setting
+namespace tb_config {
+// rpc timeout define
+static const int32_t kRpcTimoutDefMs = 15000;
+static const int32_t kRpcTimoutMaxMs = 300000;
+static const int32_t kRpcTimoutMinMs = 8000;
+
+// heartbeat period define
+static const int32_t kHeartBeatPeriodDefMs = 10000;
+static const int32_t kHeartBeatFailRetryTimesDef = 5;
+static const int32_t kHeartBeatSleepPeriodDefMs = 60000;
+// max masterAddrInfo length
+static const int32_t kMasterAddrInfoMaxLength = 1024;
+
+// max TopicName length
+static const int32_t kTopicNameMaxLength = 64;
+// max Consume GroupName length
+static const int32_t kGroupNameMaxLength = 1024;
+// max filter item length
+static const int32_t kFilterItemMaxLength = 256;
+// max allowed filter item count
+static const int32_t kFilterItemMaxCount = 500;
+// max session key length
+static const int32_t kSessionKeyMaxLength = 1024;
+
+// max subscribe info report times
+static const int32_t kSubInfoReportMaxIntervalTimes = 6;
+// default message not found response wait period
+static const int32_t kMsgNotfoundWaitPeriodMsDef = 400;
+// default confirm wait period if rebalance meeting
+static const int32_t kRebConfirmWaitPeriodMsDef = 3000;
+// max confirm wait period anyway
+static const int32_t kConfirmWaitPeriodMsMax = 60000;
+// default rebalance wait if shutdown meeting
+static const int32_t kRebWaitPeriodWhenShutdownMs = 10000;
+// default partition status check period
+static const int32_t kMaxPartCheckPeriodMsDef = 60 * 1000;
+// default partition status check slice
+static const int32_t kPartCheckSliceMsDef = 300;
+
+// max int value
+static const int32_t kMaxIntValue = 0x7fffffff;
+// max long value
+static const int64_t kMaxLongValue = 0x7fffffffffffffffL;
+
+// default broker port
+static const uint32_t kBrokerPortDef = 8123;
+// default broker TLS port
+static const uint32_t kBrokerTlsPortDef = 8124;
+
+// invalid value
+static const int32_t kInvalidValue = -2;
+
+// message flag's properties settings
+static const int32_t kMsgFlagIncProperties = 0x01;
+
+// reserved property key Filter Item
+static const char kRsvPropKeyFilterItem[] = "$msgType$";
+// reserved property key message send time
+static const char kRsvPropKeyMsgTime[] = "$msgTime$";
+
+} // namespace tb_config
+
+namespace delimiter {
+static const char kDelimiterDot[] = ".";
+static const char kDelimiterEqual[] = "=";
+static const char kDelimiterAnd[] = "&";
+static const char kDelimiterComma[] = ",";
+static const char kDelimiterColon[] = ":";
+static const char kDelimiterAt[] = "@";
+static const char kDelimiterPound[] = "#";
+static const char kDelimiterSemicolon[] = ";";
+// Double slash
+static const char kDelimiterDbSlash[] = "//";
+// left square bracket
+static const char kDelimiterLftSB[] = "[";
+// right square bracket
+static const char kDelimiterRgtSB[] = "]";
+
+} // namespace delimiter
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_CONST_CONFIG_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/const_rpc.h b/tubemq-client-twins/tubemq-client-cpp/src/const_rpc.h
new file mode 100644
index 0000000..313d536
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/const_rpc.h
@@ -0,0 +1,91 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_CONST_RPC_H_
+#define TUBEMQ_CLIENT_CONST_RPC_H_
+
+namespace tubemq {
+
+#include <stdint.h>
+
+namespace rpc_config {
+
+// constant define
+static const uint32_t kRpcPrtBeginToken = 0xFF7FF4FE;
+static const uint32_t kRpcMaxBufferSize = 8192;
+static const uint32_t kRpcMaxFrameListCnt = (uint32_t)((1024 * 1024 * 8) / kRpcMaxBufferSize);
+
+// rpc protocol version
+static const uint32_t kRpcProtocolVersion = 2;
+
+// rps network const
+static const uint32_t kRpcConnectInitBufferSize = 64 * 1024;
+static const uint32_t kRpcEnsureWriteableBytes = 16 * 1024;
+static constexpr uint32_t kRpcRecvBufferMaxBytes =
+ uint32_t(kRpcMaxFrameListCnt * kRpcMaxBufferSize * 2 + 1024);
+static const uint32_t kRpcInvalidConnectOverTime = 60 * 3; // second
+
+// msg type flag
+static const int32_t kRpcFlagMsgRequest = 0x0;
+static const int32_t kRpcFlagMsgResponse = 0x1;
+
+// service type
+static const int32_t kMasterService = 1;
+static const int32_t kBrokerReadService = 2;
+static const int32_t kBrokerWriteService = 3;
+static const int32_t kBrokerAdminService = 4;
+static const int32_t kMasterAdminService = 5;
+
+// request method
+// master rpc method
+static const int32_t kMasterMethoddProducerRegister = 1;
+static const int32_t kMasterMethoddProducerHeatbeat = 2;
+static const int32_t kMasterMethoddProducerClose = 3;
+static const int32_t kMasterMethoddConsumerRegister = 4;
+static const int32_t kMasterMethoddConsumerHeatbeat = 5;
+static const int32_t kMasterMethoddConsumerClose = 6;
+
+// broker rpc method
+static const int32_t kBrokerMethoddProducerRegister = 11;
+static const int32_t kBrokerMethoddProducerHeatbeat = 12;
+static const int32_t kBrokerMethoddProducerSendMsg = 13;
+static const int32_t kBrokerMethoddProducerClose = 14;
+static const int32_t kBrokerMethoddConsumerRegister = 15;
+static const int32_t kBrokerMethoddConsumerHeatbeat = 16;
+static const int32_t kBrokerMethoddConsumerGetMsg = 17;
+static const int32_t kBrokerMethoddConsumerCommit = 18;
+static const int32_t kBrokerMethoddConsumerClose = 19;
+static const int32_t kMethodInvalid = 99999;
+
+// register operate type
+static const int32_t kRegOpTypeRegister = 31;
+static const int32_t kRegOpTypeUnReg = 32;
+
+// rpc connect node timeout
+static const int32_t kRpcConnectTimeoutMs = 3000;
+
+static const int32_t kConsumeStatusNormal = 0;
+static const int32_t kConsumeStatusFromMax = 1;
+static const int32_t kConsumeStatusFromMaxAlways = 2;
+
+} // namespace rpc_config
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_CONST_RPC_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc
index a2536ae..71a84ac 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc
@@ -17,11 +17,13 @@
* under the License.
*/
-#include "tubemq/executor_pool.h"
+#include "executor_pool.h"
-#include <asio.hpp>
#include <functional>
#include <memory>
+#include <asio.hpp>
+
+
namespace tubemq {
@@ -39,19 +41,20 @@ Executor::~Executor() {
void Executor::StartWorker(std::shared_ptr<asio::io_context> io_context) { io_context_->run(); }
-SocketPtr Executor::CreateSocket() { return SocketPtr(new asio::ip::tcp::socket(*io_context_)); }
+TcpSocketPtr Executor::CreateTcpSocket() {
+ return std::make_shared<asio::ip::tcp::socket>(*io_context_);
+}
-TlsSocketPtr Executor::CreateTlsSocket(SocketPtr &socket, asio::ssl::context &ctx) {
- return std::shared_ptr<asio::ssl::stream<asio::ip::tcp::socket &> >(
- new asio::ssl::stream<asio::ip::tcp::socket &>(*socket, ctx));
+TlsSocketPtr Executor::CreateTlsSocket(TcpSocketPtr &socket, asio::ssl::context &ctx) {
+ return std::make_shared<asio::ssl::stream<asio::ip::tcp::socket &>>(*socket, ctx);
}
TcpResolverPtr Executor::CreateTcpResolver() {
- return TcpResolverPtr(new asio::ip::tcp::resolver(*io_context_));
+ return std::make_shared<asio::ip::tcp::resolver>(*io_context_);
}
SteadyTimerPtr Executor::CreateSteadyTimer() {
- return SteadyTimerPtr(new asio::steady_timer(*io_context_));
+ return std::make_shared<asio::steady_timer>(*io_context_);
}
void Executor::Close() {
@@ -61,8 +64,6 @@ void Executor::Close() {
}
}
-void Executor::Post(Executor::func task) { io_context_->post(task); }
-
ExecutorPool::ExecutorPool(int nthreads) : executors_(nthreads), executorIdx_(0), mutex_() {}
ExecutorPtr ExecutorPool::Get() {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.h
new file mode 100644
index 0000000..e858985
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.h
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_EXECUTOR_POOL_
+#define _TUBEMQ_EXECUTOR_POOL_
+
+#include <stdlib.h>
+
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+#include <vector>
+
+#include <asio.hpp>
+#include <asio/ssl.hpp>
+#include "noncopyable.h"
+
+namespace tubemq {
+
+using TcpSocketPtr = std::shared_ptr<asio::ip::tcp::socket>;
+using TlsSocketPtr = std::shared_ptr<asio::ssl::stream<asio::ip::tcp::socket &> >;
+using TcpResolverPtr = std::shared_ptr<asio::ip::tcp::resolver>;
+using SteadyTimerPtr = std::shared_ptr<asio::steady_timer>;
+
+class Executor : noncopyable {
+ public:
+ Executor();
+ ~Executor();
+
+ TcpSocketPtr CreateTcpSocket();
+ TlsSocketPtr CreateTlsSocket(TcpSocketPtr &socket, asio::ssl::context &ctx);
+ TcpResolverPtr CreateTcpResolver();
+ SteadyTimerPtr CreateSteadyTimer();
+
+ inline void Post(std::function<void(void)> task) { io_context_->post(task); }
+
+ std::shared_ptr<asio::io_context> GetIoContext() { return io_context_; }
+
+ // Close executor and exit thread
+ void Close();
+
+ private:
+ void StartWorker(std::shared_ptr<asio::io_context> io_context);
+ std::shared_ptr<asio::io_context> io_context_;
+ using io_context_work = asio::executor_work_guard<asio::io_context::executor_type>;
+ io_context_work work_;
+ std::thread worker_;
+};
+
+typedef std::shared_ptr<Executor> ExecutorPtr;
+
+class ExecutorPool : noncopyable {
+ public:
+ explicit ExecutorPool(int nthreads = 2);
+
+ ExecutorPtr Get();
+
+ // Resize executor thread
+ void Resize(int nthreads) {
+ Lock lock(mutex_);
+ executors_.resize(nthreads);
+ }
+
+ void Close();
+
+ private:
+ typedef std::vector<ExecutorPtr> ExecutorList;
+ ExecutorList executors_;
+ uint32_t executorIdx_;
+ std::mutex mutex_;
+ typedef std::unique_lock<std::mutex> Lock;
+};
+
+typedef std::shared_ptr<ExecutorPool> ExecutorPoolPtr;
+
+} // namespace tubemq
+
+#endif // _TUBEMQ_EXECUTOR_POOL_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
index 9af90fc..09cc76d 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.cc
@@ -17,15 +17,15 @@
* under the License.
*/
-#include "tubemq/file_ini.h"
+#include "file_ini.h"
#include <stdlib.h>
#include <fstream>
#include <sstream>
-#include "tubemq/const_config.h"
-#include "tubemq/utils.h"
+#include "const_config.h"
+#include "utils.h"
namespace tubemq {
@@ -34,13 +34,13 @@ using std::ifstream;
Fileini::Fileini() {
- this->init_flag_ = false;
- this->ini_map_.clear();
+ init_flag_ = false;
+ ini_map_.clear();
}
Fileini::~Fileini() {
- this->init_flag_ = false;
- this->ini_map_.clear();
+ init_flag_ = false;
+ ini_map_.clear();
}
bool Fileini::Loadini(string& err_info, const string& file_name) {
@@ -95,8 +95,8 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
continue;
}
map<string, map<string, string> >::iterator it_sec;
- it_sec = this->ini_map_.find(sector);
- if (it_sec == this->ini_map_.end()) {
+ it_sec = ini_map_.find(sector);
+ if (it_sec == ini_map_.end()) {
map<string, string> tmp_key_val_map;
tmp_key_val_map[key] = value;
ini_map_[sector] = tmp_key_val_map;
@@ -108,7 +108,7 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
conf_file.close();
conf_file.clear();
// set parser status
- this->init_flag_ = true;
+ init_flag_ = true;
// end
err_info = "Ok";
return true;
@@ -116,7 +116,7 @@ bool Fileini::Loadini(string& err_info, const string& file_name) {
bool Fileini::GetValue(string& err_info, const string& sector, const string& key, string& value,
const string& def) const {
- if (!this->init_flag_) {
+ if (!init_flag_) {
err_info = "Please load configure file first!";
return false;
}
@@ -125,8 +125,8 @@ bool Fileini::GetValue(string& err_info, const string& sector, const string& key
// search key's value in sector
map<string, map<string, string> >::const_iterator it_sec;
map<string, string>::const_iterator it_keyval;
- it_sec = this->ini_map_.find(sector);
- if (it_sec == this->ini_map_.end()) {
+ it_sec = ini_map_.find(sector);
+ if (it_sec == ini_map_.end()) {
value = def;
return true;
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/file_ini.h b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.h
new file mode 100644
index 0000000..1bede63
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/file_ini.h
@@ -0,0 +1,51 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_FILE_INI_H_
+#define TUBEMQ_CLIENT_FILE_INI_H_
+
+#include <stdint.h>
+
+#include <map>
+#include <string>
+
+namespace tubemq {
+
+using std::map;
+using std::string;
+
+class Fileini {
+ public:
+ Fileini();
+ ~Fileini();
+ bool Loadini(string& err_info, const string& file_name);
+ bool GetValue(string& err_info, const string& sector, const string& key,
+ string& value, const string& def) const;
+ bool GetValue(string& err_info, const string& sector, const string& key,
+ int32_t& value, int32_t def) const;
+
+ private:
+ bool init_flag_;
+ // sector key value
+ map<string, map<string, string> > ini_map_;
+};
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_FILE_INI_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
index ef3357f..2b96f52 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.cc
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "tubemq/flowctrl_def.h"
+#include "flowctrl_def.h"
#include <stdio.h>
#include <time.h>
@@ -25,9 +25,9 @@
#include <sstream>
-#include "tubemq/const_config.h"
-#include "tubemq/logger.h"
-#include "tubemq/utils.h"
+#include "const_config.h"
+#include "logger.h"
+#include "utils.h"
namespace tubemq {
@@ -35,172 +35,172 @@ using std::stringstream;
using std::lock_guard;
FlowCtrlResult::FlowCtrlResult() {
- this->datasize_limit_ = tb_config::kMaxIntValue;
- this->freqms_limit_ = 0;
+ datasize_limit_ = tb_config::kMaxIntValue;
+ freqms_limit_ = 0;
}
FlowCtrlResult::FlowCtrlResult(int64_t datasize_limit, int32_t freqms_limit) {
- this->datasize_limit_ = datasize_limit;
- this->freqms_limit_ = freqms_limit;
+ datasize_limit_ = datasize_limit;
+ freqms_limit_ = freqms_limit;
}
FlowCtrlResult& FlowCtrlResult::operator=(const FlowCtrlResult& target) {
if (this == &target) return *this;
- this->datasize_limit_ = target.datasize_limit_;
- this->freqms_limit_ = target.freqms_limit_;
+ datasize_limit_ = target.datasize_limit_;
+ freqms_limit_ = target.freqms_limit_;
return *this;
}
void FlowCtrlResult::SetDataDltAndFreqLimit(int64_t datasize_limit, int32_t freqms_limit) {
- this->datasize_limit_ = datasize_limit;
- this->freqms_limit_ = freqms_limit;
+ datasize_limit_ = datasize_limit;
+ freqms_limit_ = freqms_limit;
}
void FlowCtrlResult::SetDataSizeLimit(int64_t datasize_limit) {
- this->datasize_limit_ = datasize_limit;
+ datasize_limit_ = datasize_limit;
}
-void FlowCtrlResult::SetFreqMsLimit(int32_t freqms_limit) { this->freqms_limit_ = freqms_limit; }
+void FlowCtrlResult::SetFreqMsLimit(int32_t freqms_limit) { freqms_limit_ = freqms_limit; }
-int64_t FlowCtrlResult::GetDataSizeLimit() { return this->datasize_limit_; }
+int64_t FlowCtrlResult::GetDataSizeLimit() { return datasize_limit_; }
-int32_t FlowCtrlResult::GetFreqMsLimit() { return this->freqms_limit_; }
+int32_t FlowCtrlResult::GetFreqMsLimit() { return freqms_limit_; }
FlowCtrlItem::FlowCtrlItem() {
- this->type_ = 0;
- this->start_time_ = 2500;
- this->end_time_ = tb_config::kInvalidValue;
- this->datadlt_m_ = tb_config::kInvalidValue;
- this->datasize_limit_ = tb_config::kInvalidValue;
- this->freqms_limit_ = tb_config::kInvalidValue;
- this->zero_cnt_ = tb_config::kInvalidValue;
+ type_ = 0;
+ start_time_ = 2500;
+ end_time_ = tb_config::kInvalidValue;
+ datadlt_m_ = tb_config::kInvalidValue;
+ datasize_limit_ = tb_config::kInvalidValue;
+ freqms_limit_ = tb_config::kInvalidValue;
+ zero_cnt_ = tb_config::kInvalidValue;
}
FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t zero_cnt, int32_t freqms_limit) {
- this->type_ = type;
- this->start_time_ = 2500;
- this->end_time_ = tb_config::kInvalidValue;
- this->datadlt_m_ = tb_config::kInvalidValue;
- this->datasize_limit_ = tb_config::kInvalidValue;
- this->freqms_limit_ = freqms_limit;
- this->zero_cnt_ = zero_cnt;
+ type_ = type;
+ start_time_ = 2500;
+ end_time_ = tb_config::kInvalidValue;
+ datadlt_m_ = tb_config::kInvalidValue;
+ datasize_limit_ = tb_config::kInvalidValue;
+ freqms_limit_ = freqms_limit;
+ zero_cnt_ = zero_cnt;
}
FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
int32_t min_data_filter_freqms) {
- this->type_ = type;
- this->start_time_ = 2500;
- this->end_time_ = tb_config::kInvalidValue;
- this->datadlt_m_ = tb_config::kInvalidValue;
- this->datasize_limit_ = datasize_limit;
- this->freqms_limit_ = freqms_limit;
- this->zero_cnt_ = min_data_filter_freqms;
+ type_ = type;
+ start_time_ = 2500;
+ end_time_ = tb_config::kInvalidValue;
+ datadlt_m_ = tb_config::kInvalidValue;
+ datasize_limit_ = datasize_limit;
+ freqms_limit_ = freqms_limit;
+ zero_cnt_ = min_data_filter_freqms;
}
FlowCtrlItem::FlowCtrlItem(int32_t type, int32_t start_time, int32_t end_time, int64_t datadlt_m,
int64_t datasize_limit, int32_t freqms_limit) {
- this->type_ = type;
- this->start_time_ = start_time;
- this->end_time_ = end_time;
- this->datadlt_m_ = datadlt_m;
- this->datasize_limit_ = datasize_limit;
- this->freqms_limit_ = freqms_limit;
- this->zero_cnt_ = tb_config::kInvalidValue;
+ type_ = type;
+ start_time_ = start_time;
+ end_time_ = end_time;
+ datadlt_m_ = datadlt_m;
+ datasize_limit_ = datasize_limit;
+ freqms_limit_ = freqms_limit;
+ zero_cnt_ = tb_config::kInvalidValue;
}
FlowCtrlItem& FlowCtrlItem::operator=(const FlowCtrlItem& target) {
if (this == &target) return *this;
- this->type_ = target.type_;
- this->start_time_ = target.start_time_;
- this->end_time_ = target.end_time_;
- this->datadlt_m_ = target.datadlt_m_;
- this->datasize_limit_ = target.datasize_limit_;
- this->freqms_limit_ = target.freqms_limit_;
- this->zero_cnt_ = target.zero_cnt_;
+ type_ = target.type_;
+ start_time_ = target.start_time_;
+ end_time_ = target.end_time_;
+ datadlt_m_ = target.datadlt_m_;
+ datasize_limit_ = target.datasize_limit_;
+ freqms_limit_ = target.freqms_limit_;
+ zero_cnt_ = target.zero_cnt_;
return *this;
}
int32_t FlowCtrlItem::GetFreLimit(int32_t msg_zero_cnt) const {
- if (this->type_ != 1) {
+ if (type_ != 1) {
return -1;
}
- if (msg_zero_cnt >= this->zero_cnt_) {
- return this->freqms_limit_;
+ if (msg_zero_cnt >= zero_cnt_) {
+ return freqms_limit_;
}
return -1;
}
void FlowCtrlItem::ResetFlowCtrlValue(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
int32_t min_data_filter_freqms) {
- this->type_ = type;
- this->start_time_ = 2500;
- this->end_time_ = tb_config::kInvalidValue;
- this->datadlt_m_ = tb_config::kInvalidValue;
- this->datasize_limit_ = datasize_limit;
- this->freqms_limit_ = freqms_limit;
- this->zero_cnt_ = min_data_filter_freqms;
+ type_ = type;
+ start_time_ = 2500;
+ end_time_ = tb_config::kInvalidValue;
+ datadlt_m_ = tb_config::kInvalidValue;
+ datasize_limit_ = datasize_limit;
+ freqms_limit_ = freqms_limit;
+ zero_cnt_ = min_data_filter_freqms;
}
void FlowCtrlItem::Clear() {
- this->type_ = 0;
- this->start_time_ = 2500;
- this->end_time_ = tb_config::kInvalidValue;
- this->datadlt_m_ = tb_config::kInvalidValue;
- this->datasize_limit_ = tb_config::kInvalidValue;
- this->freqms_limit_ = tb_config::kInvalidValue;
- this->zero_cnt_ = tb_config::kInvalidValue;
+ type_ = 0;
+ start_time_ = 2500;
+ end_time_ = tb_config::kInvalidValue;
+ datadlt_m_ = tb_config::kInvalidValue;
+ datasize_limit_ = tb_config::kInvalidValue;
+ freqms_limit_ = tb_config::kInvalidValue;
+ zero_cnt_ = tb_config::kInvalidValue;
}
bool FlowCtrlItem::GetDataLimit(int64_t datadlt_m, int32_t curr_time,
FlowCtrlResult& flowctrl_result) const {
- if (this->type_ != 0 || datadlt_m <= this->datadlt_m_) {
+ if (type_ != 0 || datadlt_m <= datadlt_m_) {
return false;
}
- if (curr_time < this->start_time_ || curr_time > this->end_time_) {
+ if (curr_time < start_time_ || curr_time > end_time_) {
return false;
}
- flowctrl_result.SetDataDltAndFreqLimit(this->datasize_limit_, this->freqms_limit_);
+ flowctrl_result.SetDataDltAndFreqLimit(datasize_limit_, freqms_limit_);
return true;
}
FlowCtrlRuleHandler::FlowCtrlRuleHandler() {
- this->flowctrl_id_.GetAndSet(tb_config::kInvalidValue);
- this->flowctrl_info_ = "";
- this->min_zero_cnt_.Set(tb_config::kMaxIntValue);
- this->qrypriority_id_.Set(tb_config::kInvalidValue);
- this->min_datadlt_limt_.Set(tb_config::kMaxLongValue);
- this->datalimit_start_time_.Set(2500);
- this->datalimit_end_time_.Set(tb_config::kInvalidValue);
- this->last_update_time_ = Utils::GetCurrentTimeMillis();
+ flowctrl_id_.GetAndSet(tb_config::kInvalidValue);
+ flowctrl_info_ = "";
+ min_zero_cnt_.Set(tb_config::kMaxIntValue);
+ qrypriority_id_.Set(tb_config::kInvalidValue);
+ min_datadlt_limt_.Set(tb_config::kMaxLongValue);
+ datalimit_start_time_.Set(2500);
+ datalimit_end_time_.Set(tb_config::kInvalidValue);
+ last_update_time_ = Utils::GetCurrentTimeMillis();
}
FlowCtrlRuleHandler::~FlowCtrlRuleHandler() {
- //
+ //
}
void FlowCtrlRuleHandler::UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id,
int64_t flowctrl_id, const string& flowctrl_info) {
map<int32_t, vector<FlowCtrlItem> > tmp_flowctrl_map;
- if (flowctrl_id == this->flowctrl_id_.Get()) {
+ if (flowctrl_id == flowctrl_id_.Get()) {
return;
}
- int64_t curr_flowctrl_id = this->flowctrl_id_.Get();
+ int64_t curr_flowctrl_id = flowctrl_id_.Get();
if (flowctrl_info.length() > 0) {
parseFlowCtrlInfo(flowctrl_info, tmp_flowctrl_map);
}
lock_guard<mutex> lck(config_lock_);
- this->flowctrl_id_.Set(flowctrl_id);
- this->qrypriority_id_.Set(qrypriority_id);
+ flowctrl_id_.Set(flowctrl_id);
+ qrypriority_id_.Set(qrypriority_id);
clearStatisData();
if (tmp_flowctrl_map.empty()) {
- this->flowctrl_rules_.clear();
- this->flowctrl_info_ = "";
+ flowctrl_rules_.clear();
+ flowctrl_info_ = "";
} else {
- this->flowctrl_rules_ = tmp_flowctrl_map;
- this->flowctrl_info_ = flowctrl_info;
+ flowctrl_rules_ = tmp_flowctrl_map;
+ flowctrl_info_ = flowctrl_info;
initialStatisData();
}
- this->last_update_time_ = Utils::GetCurrentTimeMillis();
+ last_update_time_ = Utils::GetCurrentTimeMillis();
if (is_default) {
LOG_INFO("[Flow Ctrl] Default FlowCtrl's flowctrl_id from %ld to %ld\n", curr_flowctrl_id,
flowctrl_id);
@@ -215,55 +215,55 @@ void FlowCtrlRuleHandler::initialStatisData() {
vector<FlowCtrlItem>::iterator it_vec;
map<int, vector<FlowCtrlItem> >::iterator it_map;
- it_map = this->flowctrl_rules_.find(0);
- if (it_map != this->flowctrl_rules_.end()) {
+ it_map = flowctrl_rules_.find(0);
+ if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetType() != 0) {
continue;
}
- if (it_vec->GetDltInM() < this->min_datadlt_limt_.Get()) {
- this->min_datadlt_limt_.Set(it_vec->GetDltInM());
+ if (it_vec->GetDltInM() < min_datadlt_limt_.Get()) {
+ min_datadlt_limt_.Set(it_vec->GetDltInM());
}
- if (it_vec->GetStartTime() < this->datalimit_start_time_.Get()) {
- this->datalimit_start_time_.Set(it_vec->GetStartTime());
+ if (it_vec->GetStartTime() < datalimit_start_time_.Get()) {
+ datalimit_start_time_.Set(it_vec->GetStartTime());
}
- if (it_vec->GetEndTime() > this->datalimit_end_time_.Get()) {
- this->datalimit_end_time_.Set(it_vec->GetEndTime());
+ if (it_vec->GetEndTime() > datalimit_end_time_.Get()) {
+ datalimit_end_time_.Set(it_vec->GetEndTime());
}
}
}
- it_map = this->flowctrl_rules_.find(1);
- if (it_map != this->flowctrl_rules_.end()) {
+ it_map = flowctrl_rules_.find(1);
+ if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetType() != 1) {
continue;
}
- if (it_vec->GetZeroCnt() < this->min_zero_cnt_.Get()) {
- this->min_zero_cnt_.Set(it_vec->GetZeroCnt());
+ if (it_vec->GetZeroCnt() < min_zero_cnt_.Get()) {
+ min_zero_cnt_.Set(it_vec->GetZeroCnt());
}
}
}
- it_map = this->flowctrl_rules_.find(3);
- if (it_map != this->flowctrl_rules_.end()) {
+ it_map = flowctrl_rules_.find(3);
+ if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetType() != 3) {
continue;
}
it_vec->GetDataSizeLimit();
- this->filter_ctrl_item_.ResetFlowCtrlValue(3, (int)(it_vec->GetDataSizeLimit()),
+ filter_ctrl_item_.ResetFlowCtrlValue(3, (int32_t)(it_vec->GetDataSizeLimit()),
it_vec->GetFreqMsLimit(), it_vec->GetZeroCnt());
}
}
}
void FlowCtrlRuleHandler::clearStatisData() {
- this->min_zero_cnt_.GetAndSet(tb_config::kMaxIntValue);
- this->min_datadlt_limt_.GetAndSet(tb_config::kMaxLongValue);
- this->qrypriority_id_.Set(tb_config::kInvalidValue);
- this->datalimit_start_time_.Set(2500);
- this->datalimit_end_time_.Set(tb_config::kInvalidValue);
- this->filter_ctrl_item_.Clear();
+ min_zero_cnt_.GetAndSet(tb_config::kMaxIntValue);
+ min_datadlt_limt_.GetAndSet(tb_config::kMaxLongValue);
+ qrypriority_id_.Set(tb_config::kInvalidValue);
+ datalimit_start_time_.Set(2500);
+ datalimit_end_time_.Set(tb_config::kInvalidValue);
+ filter_ctrl_item_.Clear();
}
bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt,
@@ -276,16 +276,16 @@ bool FlowCtrlRuleHandler::GetCurDataLimit(int64_t last_datadlt,
time_t cur_time = time(NULL);
gmtime_r(&cur_time, &utc_tm);
int curr_time = (utc_tm.tm_hour + 8) % 24 * 100 + utc_tm.tm_min;
- if ((last_datadlt < this->min_datadlt_limt_.Get())
- || (curr_time < this->datalimit_start_time_.Get())
- || (curr_time > this->datalimit_end_time_.Get())) {
+ if ((last_datadlt < min_datadlt_limt_.Get())
+ || (curr_time < datalimit_start_time_.Get())
+ || (curr_time > datalimit_end_time_.Get())) {
return false;
}
// search total flowctrl rule
lock_guard<mutex> lck(config_lock_);
- it_map = this->flowctrl_rules_.find(0);
- if (it_map != this->flowctrl_rules_.end()) {
- for (it_vec = it_map->second.begin();it_vec != it_map->second.end(); ++it_vec) {
+ it_map = flowctrl_rules_.find(0);
+ if (it_map != flowctrl_rules_.end()) {
+ for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
if (it_vec->GetDataLimit(last_datadlt, curr_time, flowctrl_result)) {
result = true;
break;
@@ -301,13 +301,13 @@ int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt,
vector<FlowCtrlItem>::const_iterator it_vec;
map<int, vector<FlowCtrlItem> >::const_iterator it_map;
// check min zero count
- if (msg_zero_cnt < this->min_zero_cnt_.Get()) {
+ if (msg_zero_cnt < min_zero_cnt_.Get()) {
return limit_data;
}
// search rule allow value
lock_guard<mutex> lck(config_lock_);
- it_map = this->flowctrl_rules_.find(1);
- if (it_map != this->flowctrl_rules_.end()) {
+ it_map = flowctrl_rules_.find(1);
+ if (it_map != flowctrl_rules_.end()) {
for (it_vec = it_map->second.begin(); it_vec != it_map->second.end(); ++it_vec) {
limit_data = it_vec->GetFreLimit(msg_zero_cnt);
if (limit_data >= 0) {
@@ -321,17 +321,15 @@ int32_t FlowCtrlRuleHandler::GetCurFreqLimitTime(int32_t msg_zero_cnt,
void FlowCtrlRuleHandler::GetFilterCtrlItem(FlowCtrlItem& result) const {
result.Clear();
lock_guard<mutex> lck(config_lock_);
- result = this->filter_ctrl_item_;
+ result = filter_ctrl_item_;
}
void FlowCtrlRuleHandler::GetFlowCtrlInfo(string& flowctrl_info) const {
flowctrl_info.clear();
lock_guard<mutex> lck(config_lock_);
- flowctrl_info = this->flowctrl_info_;
+ flowctrl_info = flowctrl_info_;
}
-
-
bool FlowCtrlRuleHandler::compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2) {
if (o1.GetStartTime() >= o2.GetStartTime()) {
return true;
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.h b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.h
new file mode 100644
index 0000000..0131c6f
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/flowctrl_def.h
@@ -0,0 +1,144 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_FLOW_CONTROL_H_
+#define TUBEMQ_CLIENT_FLOW_CONTROL_H_
+
+#include <rapidjson/document.h>
+#include <stdint.h>
+
+#include <algorithm>
+#include <list>
+#include <map>
+#include <mutex>
+#include <string>
+#include <vector>
+
+#include "tubemq/tubemq_atomic.h"
+
+namespace tubemq {
+
+using std::map;
+using std::mutex;
+using std::string;
+using std::vector;
+
+class FlowCtrlResult {
+ public:
+ FlowCtrlResult();
+ FlowCtrlResult(int64_t datasize_limit, int32_t freqms_limit);
+ FlowCtrlResult& operator=(const FlowCtrlResult& target);
+ void SetDataDltAndFreqLimit(int64_t datasize_limit, int32_t freqms_limit);
+ void SetDataSizeLimit(int64_t datasize_limit);
+ void SetFreqMsLimit(int32_t freqms_limit);
+ int64_t GetDataSizeLimit();
+ int32_t GetFreqMsLimit();
+
+ private:
+ int64_t datasize_limit_;
+ int32_t freqms_limit_;
+};
+
+class FlowCtrlItem {
+ public:
+ FlowCtrlItem();
+ FlowCtrlItem(int32_t type, int32_t zero_cnt, int32_t freqms_limit);
+ FlowCtrlItem(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
+ int32_t min_data_filter_freqms);
+ FlowCtrlItem(int32_t type, int32_t start_time, int32_t end_time, int64_t datadlt_m,
+ int64_t datasize_limit, int32_t freqms_limit);
+ FlowCtrlItem& operator=(const FlowCtrlItem& target);
+ void Clear();
+ void ResetFlowCtrlValue(int32_t type, int32_t datasize_limit, int32_t freqms_limit,
+ int32_t min_data_filter_freqms);
+ int32_t GetFreLimit(int32_t msg_zero_cnt) const;
+ bool GetDataLimit(int64_t datadlt_m,
+ int32_t curr_time, FlowCtrlResult& flowctrl_result) const;
+ const int32_t GetType() const { return type_; }
+ const int32_t GetZeroCnt() const { return zero_cnt_; }
+ const int32_t GetStartTime() const { return start_time_; }
+ const int32_t GetEndTime() const { return end_time_; }
+ const int64_t GetDataSizeLimit() const { return datasize_limit_; }
+ const int32_t GetFreqMsLimit() const { return freqms_limit_; }
+ const int64_t GetDltInM() const { return datadlt_m_; }
+
+ private:
+ int32_t type_;
+ int32_t start_time_;
+ int32_t end_time_;
+ int64_t datadlt_m_;
+ int64_t datasize_limit_;
+ int32_t freqms_limit_;
+ int32_t zero_cnt_;
+};
+
+class FlowCtrlRuleHandler {
+ public:
+ FlowCtrlRuleHandler();
+ ~FlowCtrlRuleHandler();
+ void UpdateDefFlowCtrlInfo(bool is_default, int32_t qrypriority_id, int64_t flowctrl_id,
+ const string& flowctrl_info);
+ bool GetCurDataLimit(int64_t last_datadlt, FlowCtrlResult& flowctrl_result) const;
+ int32_t GetCurFreqLimitTime(int32_t msg_zero_cnt, int32_t received_limit) const;
+ void GetFilterCtrlItem(FlowCtrlItem& result) const;
+ void GetFlowCtrlInfo(string& flowctrl_info) const;
+ int32_t GetMinZeroCnt() const { return this->min_zero_cnt_.Get(); }
+ int32_t GetQryPriorityId() const { return this->qrypriority_id_.Get(); }
+ void SetQryPriorityId(int32_t qrypriority_id) { this->qrypriority_id_.Set(qrypriority_id); }
+ const int64_t GetFlowCtrlId() const { return this->flowctrl_id_.Get(); }
+
+ private:
+ void initialStatisData();
+ void clearStatisData();
+ static bool compareFeqQueue(const FlowCtrlItem& queue1, const FlowCtrlItem& queue2);
+ static bool compareDataLimitQueue(const FlowCtrlItem& o1, const FlowCtrlItem& o2);
+ bool parseStringMember(string& err_info, const rapidjson::Value& root, const char* key,
+ string& value, bool compare_value, string required_val);
+ bool parseLongMember(string& err_info, const rapidjson::Value& root, const char* key,
+ int64_t& value, bool compare_value, int64_t required_val);
+ bool parseIntMember(string& err_info, const rapidjson::Value& root, const char* key,
+ int32_t& value, bool compare_value, int32_t required_val);
+ bool parseFlowCtrlInfo(const string& flowctrl_info,
+ map<int32_t, vector<FlowCtrlItem> >& flowctrl_info_map);
+ bool parseDataLimit(string& err_info, const rapidjson::Value& root,
+ vector<FlowCtrlItem>& flowCtrlItems);
+ bool parseFreqLimit(string& err_info, const rapidjson::Value& root,
+ vector<FlowCtrlItem>& flowctrl_items);
+ bool parseLowFetchLimit(string& err_info, const rapidjson::Value& root,
+ vector<FlowCtrlItem>& flowctrl_items);
+ bool parseTimeMember(string& err_info, const rapidjson::Value& root, const char* key,
+ int32_t& value);
+
+ private:
+ mutable mutex config_lock_;
+ string flowctrl_info_;
+ FlowCtrlItem filter_ctrl_item_;
+ map<int32_t, vector<FlowCtrlItem> > flowctrl_rules_;
+ int64_t last_update_time_;
+ AtomicLong flowctrl_id_;
+ AtomicInteger qrypriority_id_;
+ AtomicInteger min_zero_cnt_;
+ AtomicLong min_datadlt_limt_;
+ AtomicInteger datalimit_start_time_;
+ AtomicInteger datalimit_end_time_;
+};
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_FLOW_CONTROL_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/logger.cc b/tubemq-client-twins/tubemq-client-cpp/src/logger.cc
index f374d9b..2e51e25 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/logger.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/logger.cc
@@ -17,7 +17,7 @@
* under the License.
*/
-#include "tubemq/logger.h"
+#include "logger.h"
#include <log4cplus/fileappender.h>
#include <log4cplus/layout.h>
@@ -27,14 +27,14 @@
#include <string>
-#include "tubemq/singleton.h"
+#include "singleton.h"
namespace tubemq {
-Logger& GetLogger() { return Singleton<Logger>::Instance(); }
-
static const uint32_t kMBSize = 1024 * 1024;
+Logger& GetLogger() { return Singleton<Logger>::Instance(); }
+
bool Logger::Init(const std::string& path, Logger::Level level, uint32_t file_max_size,
uint32_t file_num) {
base_path_ = path;
@@ -67,6 +67,7 @@ void Logger::setup() {
bool immediate_fush = true;
std::string pattern = "[%D{%Y-%m-%d %H:%M:%S.%q}][tid:%t]%m%n";
auto logger_d = log4cplus::Logger::getInstance(instance_);
+ logger_d.removeAllAppenders();
logger_d.setLogLevel(log4cplus::TRACE_LOG_LEVEL);
log4cplus::helpers::SharedObjectPtr<log4cplus::Appender> append_d(
new log4cplus::RollingFileAppender(base_path_ + ".log", file_max_size_ * kMBSize, file_num_,
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/logger.h b/tubemq-client-twins/tubemq-client-cpp/src/logger.h
new file mode 100644
index 0000000..64933a6
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/logger.h
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_LOG_FILE_
+#define _TUBEMQ_LOG_FILE_
+
+#include <stdint.h>
+
+#include <string>
+#include <vector>
+
+namespace tubemq {
+class Logger;
+
+Logger& GetLogger();
+
+#define LOG_LEVEL(level, fmt, ...) \
+ { \
+ if (tubemq::GetLogger().IsEnable(level)) { \
+ tubemq::GetLogger().Write("[%s:%d][%s]" fmt, __func__, __LINE__, \
+ tubemq::Logger::Level2String(level), ##__VA_ARGS__); \
+ } \
+ }
+
+#define LOG_TRACE(fmt, ...) \
+ LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kTrace, fmt, ##__VA_ARGS__)
+#define LOG_DEBUG(fmt, ...) \
+ LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kDebug, fmt, ##__VA_ARGS__)
+#define LOG_INFO(fmt, ...) \
+ LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kInfo, fmt, ##__VA_ARGS__)
+#define LOG_WARN(fmt, ...) \
+ LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kWarn, fmt, ##__VA_ARGS__)
+#define LOG_ERROR(fmt, ...) \
+ LOG_TUBEMQ(tubemq::GetLogger(), tubemq::Logger::kError, fmt, ##__VA_ARGS__)
+
+#define LOG_TUBEMQ(logger, level, fmt, ...) \
+ { \
+ if (logger.IsEnable(level)) { \
+ logger.Write("[%s:%d][%s]" fmt, __func__, __LINE__, tubemq::Logger::Level2String(level), \
+ ##__VA_ARGS__); \
+ } \
+ }
+
+class Logger {
+ public:
+ enum Level {
+ kTrace = 0,
+ kDebug = 1,
+ kInfo = 2,
+ kWarn = 3,
+ kError = 4,
+ };
+
+ // size: MB
+ Logger()
+ : file_max_size_(100),
+ file_num_(10),
+ level_(kError),
+ base_path_("tubemq"),
+ instance_("TubeMQ") {}
+
+ ~Logger(void) {}
+
+ // path example: ../log/tubemq
+ // size: MB
+ bool Init(const std::string& path, Level level, uint32_t file_max_size = 100,
+ uint32_t file_num = 10);
+
+ bool Write(const char* sFormat, ...) __attribute__((format(printf, 2, 3)));
+ inline bool WriteStream(const std::string& msg) { return writeStream(msg.c_str()); }
+
+ inline void SetInstance(const std::string& instance) { instance_ = instance; }
+ inline bool IsEnable(Level level) {
+ if (level_ <= level) {
+ return true;
+ } else {
+ return false;
+ }
+ }
+
+ static const char* Level2String(Level level) {
+ static const char* level_names[] = {
+ "TRACE", "DEBUG", "INFO", "WARN", "ERROR",
+ };
+ return level_names[level];
+ }
+
+ private:
+ void setup();
+ bool writeStream(const char* msg);
+
+ private:
+ uint32_t file_max_size_;
+ uint16_t file_num_;
+ uint8_t level_;
+
+ std::string base_path_;
+ std::string instance_;
+ std::string err_msg_;
+};
+} // namespace tubemq
+#endif // _TUBEMQ_LOG_FILE_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
index 4f03dfc..38f6f46 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.cc
@@ -17,16 +17,16 @@
* under the License.
*/
-#include "tubemq/meta_info.h"
+#include "meta_info.h"
#include <stdlib.h>
#include <sstream>
#include <vector>
-#include "tubemq/const_config.h"
+#include "const_config.h"
#include "tubemq/tubemq_errcode.h"
-#include "tubemq/utils.h"
+#include "utils.h"
namespace tubemq {
@@ -34,9 +34,9 @@ using std::stringstream;
using std::vector;
NodeInfo::NodeInfo() {
- this->node_id_ = 0;
- this->node_host_ = " ";
- this->node_port_ = tb_config::kBrokerPortDef;
+ node_id_ = 0;
+ node_host_ = " ";
+ node_port_ = tb_config::kBrokerPortDef;
buildStrInfo();
}
@@ -45,34 +45,34 @@ NodeInfo::NodeInfo(bool is_broker, const string& node_info) {
vector<string> result;
Utils::Split(node_info, result, delimiter::kDelimiterColon);
if (is_broker) {
- this->node_id_ = atoi(result[0].c_str());
- this->node_host_ = result[1];
- this->node_port_ = tb_config::kBrokerPortDef;
+ node_id_ = atoi(result[0].c_str());
+ node_host_ = result[1];
+ node_port_ = tb_config::kBrokerPortDef;
if (result.size() >= 3) {
- this->node_port_ = atoi(result[2].c_str());
+ node_port_ = atoi(result[2].c_str());
}
} else {
- this->node_id_ = 0;
- this->node_host_ = result[0];
- this->node_port_ = tb_config::kBrokerPortDef;
+ node_id_ = 0;
+ node_host_ = result[0];
+ node_port_ = tb_config::kBrokerPortDef;
if (result.size() >= 2) {
- this->node_port_ = atoi(result[1].c_str());
+ node_port_ = atoi(result[1].c_str());
}
}
buildStrInfo();
}
NodeInfo::NodeInfo(const string& node_host, uint32_t node_port) {
- this->node_id_ = tb_config::kInvalidValue;
- this->node_host_ = node_host;
- this->node_port_ = node_port;
+ node_id_ = tb_config::kInvalidValue;
+ node_host_ = node_host;
+ node_port_ = node_port;
buildStrInfo();
}
NodeInfo::NodeInfo(int node_id, const string& node_host, uint32_t node_port) {
- this->node_id_ = node_id;
- this->node_host_ = node_host;
- this->node_port_ = node_port;
+ node_id_ = node_id;
+ node_host_ = node_host;
+ node_port_ = node_port;
buildStrInfo();
}
@@ -82,11 +82,12 @@ NodeInfo::~NodeInfo() {
NodeInfo& NodeInfo::operator=(const NodeInfo& target) {
if (this != &target) {
- this->node_id_ = target.node_id_;
- this->node_host_ = target.node_host_;
- this->node_port_ = target.node_port_;
- this->addr_info_ = target.addr_info_;
- this->node_info_ = target.node_info_;
+ node_id_ = target.node_id_;
+ node_host_ = target.node_host_;
+ node_port_ = target.node_port_;
+ addr_info_ = target.addr_info_;
+ node_info_ = target.node_info_;
+ buildStrInfo();
}
return *this;
}
@@ -95,50 +96,50 @@ bool NodeInfo::operator==(const NodeInfo& target) {
if (this == &target) {
return true;
}
- if (this->node_info_ == target.node_info_) {
+ if (node_info_ == target.node_info_) {
return true;
}
return false;
}
bool NodeInfo::operator<(const NodeInfo& target) const {
- return this->node_info_ < target.node_info_;
+ return node_info_ < target.node_info_;
}
-const uint32_t NodeInfo::GetNodeId() const { return this->node_id_; }
+const uint32_t NodeInfo::GetNodeId() const { return node_id_; }
-const string& NodeInfo::GetHost() const { return this->node_host_; }
+const string& NodeInfo::GetHost() const { return node_host_; }
-const uint32_t NodeInfo::GetPort() const { return this->node_port_; }
+const uint32_t NodeInfo::GetPort() const { return node_port_; }
-const string& NodeInfo::GetAddrInfo() const { return this->addr_info_; }
+const string& NodeInfo::GetAddrInfo() const { return addr_info_; }
-const string& NodeInfo::GetNodeInfo() const { return this->node_info_; }
+const string& NodeInfo::GetNodeInfo() const { return node_info_; }
void NodeInfo::buildStrInfo() {
stringstream ss1;
- ss1 << this->node_host_;
+ ss1 << node_host_;
ss1 << delimiter::kDelimiterColon;
- ss1 << this->node_port_;
- this->addr_info_ = ss1.str();
+ ss1 << node_port_;
+ addr_info_ = ss1.str();
stringstream ss2;
- ss2 << this->node_id_;
+ ss2 << node_id_;
ss2 << delimiter::kDelimiterColon;
- ss2 << this->addr_info_;
- this->node_info_ = ss2.str();
+ ss2 << addr_info_;
+ node_info_ = ss2.str();
}
Partition::Partition() {
- this->topic_ = " ";
- this->partition_id_ = 0;
+ topic_ = " ";
+ partition_id_ = 0;
buildPartitionKey();
}
// partition_info = broker_info#topic:partitionId
Partition::Partition(const string& partition_info) {
// initial process
- this->topic_ = " ";
- this->partition_id_ = 0;
+ topic_ = " ";
+ partition_id_ = 0;
// parse partition_info string
string::size_type pos = 0;
string seg_key = delimiter::kDelimiterPound;
@@ -148,7 +149,8 @@ Partition::Partition(const string& partition_info) {
if (pos != string::npos) {
string broker_info = partition_info.substr(0, pos);
broker_info = Utils::Trim(broker_info);
- this->broker_info_ = NodeInfo(true, broker_info);
+ NodeInfo tmp_node(true, broker_info);
+ broker_info_ = tmp_node;
string part_str = partition_info.substr(pos + seg_key.size(), partition_info.size());
part_str = Utils::Trim(part_str);
pos = part_str.find(token_key);
@@ -157,8 +159,8 @@ Partition::Partition(const string& partition_info) {
string part_id_str = part_str.substr(pos + token_key.size(), part_str.size());
topic_str = Utils::Trim(topic_str);
part_id_str = Utils::Trim(part_id_str);
- this->topic_ = topic_str;
- this->partition_id_ = atoi(part_id_str.c_str());
+ topic_ = topic_str;
+ partition_id_ = atoi(part_id_str.c_str());
}
}
buildPartitionKey();
@@ -167,21 +169,21 @@ Partition::Partition(const string& partition_info) {
// part_str = topic:partition_id
Partition::Partition(const NodeInfo& broker_info, const string& part_str) {
vector<string> result;
- this->topic_ = " ";
- this->partition_id_ = 0;
- this->broker_info_ = broker_info;
+ topic_ = " ";
+ partition_id_ = 0;
+ broker_info_ = broker_info;
Utils::Split(part_str, result, delimiter::kDelimiterColon);
if (result.size() >= 2) {
- this->topic_ = result[0];
- this->partition_id_ = atoi(result[1].c_str());
+ topic_ = result[0];
+ partition_id_ = atoi(result[1].c_str());
}
buildPartitionKey();
}
Partition::Partition(const NodeInfo& broker_info, const string& topic, uint32_t partition_id) {
- this->topic_ = topic;
- this->partition_id_ = partition_id;
- this->broker_info_ = broker_info;
+ topic_ = topic;
+ partition_id_ = partition_id;
+ broker_info_ = broker_info;
buildPartitionKey();
}
@@ -191,11 +193,12 @@ Partition::~Partition() {
Partition& Partition::operator=(const Partition& target) {
if (this != &target) {
- this->topic_ = target.topic_;
- this->partition_id_ = target.partition_id_;
- this->broker_info_ = target.broker_info_;
- this->partition_key_ = target.partition_key_;
- this->partition_info_ = target.partition_info_;
+ topic_ = target.topic_;
+ partition_id_ = target.partition_id_;
+ broker_info_ = target.broker_info_;
+ partition_key_ = target.partition_key_;
+ partition_info_ = target.partition_info_;
+ buildPartitionKey();
}
return *this;
}
@@ -204,44 +207,44 @@ bool Partition::operator==(const Partition& target) {
if (this == &target) {
return true;
}
- if (this->partition_info_ == target.partition_info_) {
+ if (partition_info_ == target.partition_info_) {
return true;
}
return false;
}
-const uint32_t Partition::GetBrokerId() const { return this->broker_info_.GetNodeId(); }
+const uint32_t Partition::GetBrokerId() const { return broker_info_.GetNodeId(); }
-const string& Partition::GetBrokerHost() const { return this->broker_info_.GetHost(); }
+const string& Partition::GetBrokerHost() const { return broker_info_.GetHost(); }
-const uint32_t Partition::GetBrokerPort() const { return this->broker_info_.GetPort(); }
+const uint32_t Partition::GetBrokerPort() const { return broker_info_.GetPort(); }
-const string& Partition::GetPartitionKey() const { return this->partition_key_; }
+const string& Partition::GetPartitionKey() const { return partition_key_; }
-const string& Partition::GetTopic() const { return this->topic_; }
+const string& Partition::GetTopic() const { return topic_; }
-const NodeInfo& Partition::GetBrokerInfo() const { return this->broker_info_; }
+const NodeInfo& Partition::GetBrokerInfo() const { return broker_info_; }
-const uint32_t Partition::GetPartitionId() const { return this->partition_id_; }
+const uint32_t Partition::GetPartitionId() const { return partition_id_; }
-const string& Partition::ToString() const { return this->partition_info_; }
+const string& Partition::ToString() const { return partition_info_; }
void Partition::buildPartitionKey() {
stringstream ss1;
- ss1 << this->broker_info_.GetNodeId();
+ ss1 << broker_info_.GetNodeId();
ss1 << delimiter::kDelimiterColon;
- ss1 << this->topic_;
+ ss1 << topic_;
ss1 << delimiter::kDelimiterColon;
- ss1 << this->partition_id_;
- this->partition_key_ = ss1.str();
+ ss1 << partition_id_;
+ partition_key_ = ss1.str();
stringstream ss2;
- ss2 << this->broker_info_.GetNodeInfo();
+ ss2 << broker_info_.GetNodeInfo();
ss2 << delimiter::kDelimiterPound;
- ss2 << this->topic_;
+ ss2 << topic_;
ss2 << delimiter::kDelimiterColon;
- ss2 << this->partition_id_;
- this->partition_info_ = ss2.str();
+ ss2 << partition_id_;
+ partition_info_ = ss2.str();
}
PartitionExt::PartitionExt() : Partition() {
@@ -266,24 +269,24 @@ PartitionExt& PartitionExt::operator=(const PartitionExt& target) {
// parent class
Partition::operator=(target);
// child class
- this->is_last_consumed_ = target.is_last_consumed_;
- this->cur_flowctrl_ = target.cur_flowctrl_;
- this->cur_freqctrl_ = target.cur_freqctrl_;
- this->next_stage_updtime_ = target.next_stage_updtime_;
- this->next_slice_updtime_ = target.next_slice_updtime_;
- this->limit_slice_msgsize_ = target.limit_slice_msgsize_;
- this->cur_stage_msgsize_ = target.cur_stage_msgsize_;
- this->cur_slice_msgsize_ = target.cur_slice_msgsize_;
- this->total_zero_cnt_ = target.total_zero_cnt_;
- this->booked_time_ = target.booked_time_;
- this->booked_errcode_ = target.booked_errcode_;
- this->booked_esc_limit_ = target.booked_esc_limit_;
- this->booked_msgsize_ = target.booked_msgsize_;
- this->booked_dlt_limit_ = target.booked_dlt_limit_;
- this->booked_curdata_dlt_ = target.booked_curdata_dlt_;
- this->booked_require_slow_ = target.booked_require_slow_;
- this->booked_errcode_ = target.booked_errcode_;
- this->booked_errcode_ = target.booked_errcode_;
+ is_last_consumed_ = target.is_last_consumed_;
+ cur_flowctrl_ = target.cur_flowctrl_;
+ cur_freqctrl_ = target.cur_freqctrl_;
+ next_stage_updtime_ = target.next_stage_updtime_;
+ next_slice_updtime_ = target.next_slice_updtime_;
+ limit_slice_msgsize_ = target.limit_slice_msgsize_;
+ cur_stage_msgsize_ = target.cur_stage_msgsize_;
+ cur_slice_msgsize_ = target.cur_slice_msgsize_;
+ total_zero_cnt_ = target.total_zero_cnt_;
+ booked_time_ = target.booked_time_;
+ booked_errcode_ = target.booked_errcode_;
+ booked_esc_limit_ = target.booked_esc_limit_;
+ booked_msgsize_ = target.booked_msgsize_;
+ booked_dlt_limit_ = target.booked_dlt_limit_;
+ booked_curdata_dlt_ = target.booked_curdata_dlt_;
+ booked_require_slow_ = target.booked_require_slow_;
+ booked_errcode_ = target.booked_errcode_;
+ booked_errcode_ = target.booked_errcode_;
}
return *this;
}
@@ -291,21 +294,21 @@ PartitionExt& PartitionExt::operator=(const PartitionExt& target) {
void PartitionExt::BookConsumeData(int32_t errcode, int32_t msg_size,
bool req_esc_limit, int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow) {
- this->booked_time_ = Utils::GetCurrentTimeMillis();
- this->booked_errcode_ = errcode;
- this->booked_esc_limit_ = req_esc_limit;
- this->booked_msgsize_ = msg_size;
- this->booked_dlt_limit_ = rsp_dlt_limit;
- this->booked_curdata_dlt_ = last_datadlt;
- this->booked_require_slow_ = require_slow;
+ booked_time_ = Utils::GetCurrentTimeMillis();
+ booked_errcode_ = errcode;
+ booked_esc_limit_ = req_esc_limit;
+ booked_msgsize_ = msg_size;
+ booked_dlt_limit_ = rsp_dlt_limit;
+ booked_curdata_dlt_ = last_datadlt;
+ booked_require_slow_ = require_slow;
}
int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed) {
- int64_t dlt_time = Utils::GetCurrentTimeMillis() - this->booked_time_;
+ int64_t dlt_time = Utils::GetCurrentTimeMillis() - booked_time_;
return ProcConsumeResult(def_flowctrl_handler, group_flowctrl_handler, filter_consume,
- last_consumed, this->booked_errcode_, this->booked_msgsize_, this->booked_esc_limit_,
- this->booked_dlt_limit_, this->booked_curdata_dlt_, this->booked_require_slow_) - dlt_time;
+ last_consumed, booked_errcode_, booked_msgsize_, booked_esc_limit_,
+ booked_dlt_limit_, booked_curdata_dlt_, booked_require_slow_) - dlt_time;
}
int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
@@ -314,7 +317,7 @@ int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_
int64_t last_datadlt, bool require_slow) {
// #lizard forgives
// record consume status
- this->is_last_consumed_ = last_consumed;
+ is_last_consumed_ = last_consumed;
// Update strategy data values
updateStrategyData(def_flowctrl_handler, group_flowctrl_handler, msg_size, last_datadlt);
// Perform different strategies based on error codes
@@ -322,36 +325,36 @@ int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_
case err_code::kErrNotFound:
case err_code::kErrSuccess:
if (msg_size == 0 && errcode != err_code::kErrSuccess) {
- this->total_zero_cnt_ += 1;
+ total_zero_cnt_ += 1;
} else {
- this->total_zero_cnt_ = 0;
+ total_zero_cnt_ = 0;
}
- if (this->total_zero_cnt_ > 0) {
+ if (total_zero_cnt_ > 0) {
if (group_flowctrl_handler.GetMinZeroCnt() != tb_config::kMaxIntValue) {
return (int64_t)(group_flowctrl_handler.GetCurFreqLimitTime(
- this->total_zero_cnt_, (int32_t)rsp_dlt_limit));
+ total_zero_cnt_, (int32_t)rsp_dlt_limit));
} else {
return (int64_t)def_flowctrl_handler.GetCurFreqLimitTime(
- this->total_zero_cnt_, (int32_t)rsp_dlt_limit);
+ total_zero_cnt_, (int32_t)rsp_dlt_limit);
}
}
if (req_esc_limit) {
return 0;
} else {
- if (this->cur_stage_msgsize_ >= this->cur_flowctrl_.GetDataSizeLimit()
- || this->cur_slice_msgsize_ >= this->limit_slice_msgsize_) {
- return this->cur_flowctrl_.GetFreqMsLimit() > rsp_dlt_limit
- ? this->cur_flowctrl_.GetFreqMsLimit() : rsp_dlt_limit;
+ if (cur_stage_msgsize_ >= cur_flowctrl_.GetDataSizeLimit()
+ || cur_slice_msgsize_ >= limit_slice_msgsize_) {
+ return cur_flowctrl_.GetFreqMsLimit() > rsp_dlt_limit
+ ? cur_flowctrl_.GetFreqMsLimit() : rsp_dlt_limit;
}
if (errcode == err_code::kErrSuccess) {
- if (filter_consume && this->cur_freqctrl_.GetFreqMsLimit() >= 0) {
+ if (filter_consume && cur_freqctrl_.GetFreqMsLimit() >= 0) {
if (require_slow) {
- return this->cur_freqctrl_.GetZeroCnt();
+ return cur_freqctrl_.GetZeroCnt();
} else {
- return this->cur_freqctrl_.GetFreqMsLimit();
+ return cur_freqctrl_.GetFreqMsLimit();
}
- } else if (!filter_consume && this->cur_freqctrl_.GetDataSizeLimit() >=0) {
- return this->cur_freqctrl_.GetDataSizeLimit();
+ } else if (!filter_consume && cur_freqctrl_.GetDataSizeLimit() >=0) {
+ return cur_freqctrl_.GetDataSizeLimit();
}
}
return rsp_dlt_limit;
@@ -364,68 +367,68 @@ int64_t PartitionExt::ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_
}
void PartitionExt::SetLastConsumed(bool last_consumed) {
- this->is_last_consumed_ = last_consumed;
+ is_last_consumed_ = last_consumed;
}
-bool PartitionExt::IsLastConsumed() {
- return this->is_last_consumed_;
+bool PartitionExt::IsLastConsumed() const {
+ return is_last_consumed_;
}
void PartitionExt::resetParameters() {
- this->is_last_consumed_ = false;
- this->cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 20);
- this->next_stage_updtime_ = 0;
- this->next_slice_updtime_ = 0;
- this->limit_slice_msgsize_ = 0;
- this->cur_stage_msgsize_ = 0;
- this->cur_slice_msgsize_ = 0;
- this->total_zero_cnt_ = 0;
- this->booked_time_ = 0;
- this->booked_errcode_ = 0;
- this->booked_esc_limit_ = false;
- this->booked_msgsize_ = 0;
- this->booked_dlt_limit_ = 0;
- this->booked_curdata_dlt_ = 0;
- this->booked_require_slow_ = false;
+ is_last_consumed_ = false;
+ cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 20);
+ next_stage_updtime_ = 0;
+ next_slice_updtime_ = 0;
+ limit_slice_msgsize_ = 0;
+ cur_stage_msgsize_ = 0;
+ cur_slice_msgsize_ = 0;
+ total_zero_cnt_ = 0;
+ booked_time_ = 0;
+ booked_errcode_ = 0;
+ booked_esc_limit_ = false;
+ booked_msgsize_ = 0;
+ booked_dlt_limit_ = 0;
+ booked_curdata_dlt_ = 0;
+ booked_require_slow_ = false;
}
void PartitionExt::updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_handler,
const FlowCtrlRuleHandler& group_flowctrl_handler, int32_t msg_size, int64_t last_datadlt) {
bool result = false;
// Accumulated data received
- this->cur_stage_msgsize_ += msg_size;
- this->cur_slice_msgsize_ += msg_size;
+ cur_stage_msgsize_ += msg_size;
+ cur_slice_msgsize_ += msg_size;
int64_t curr_time = Utils::GetCurrentTimeMillis();
// Update strategy data values
- if (curr_time > this->next_stage_updtime_) {
- this->cur_stage_msgsize_ = 0;
- this->cur_slice_msgsize_ = 0;
+ if (curr_time > next_stage_updtime_) {
+ cur_stage_msgsize_ = 0;
+ cur_slice_msgsize_ = 0;
if (last_datadlt >= 0) {
- result = group_flowctrl_handler.GetCurDataLimit(last_datadlt, this->cur_flowctrl_);
+ result = group_flowctrl_handler.GetCurDataLimit(last_datadlt, cur_flowctrl_);
if (!result) {
- result = def_flowctrl_handler.GetCurDataLimit(last_datadlt, this->cur_flowctrl_);
+ result = def_flowctrl_handler.GetCurDataLimit(last_datadlt, cur_flowctrl_);
if (!result) {
- this->cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 0);
+ cur_flowctrl_.SetDataDltAndFreqLimit(tb_config::kMaxLongValue, 0);
}
}
- group_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
- if (this->cur_freqctrl_.GetFreqMsLimit() < 0) {
- def_flowctrl_handler.GetFilterCtrlItem(this->cur_freqctrl_);
+ group_flowctrl_handler.GetFilterCtrlItem(cur_freqctrl_);
+ if (cur_freqctrl_.GetFreqMsLimit() < 0) {
+ def_flowctrl_handler.GetFilterCtrlItem(cur_freqctrl_);
}
curr_time = Utils::GetCurrentTimeMillis();
}
- this->limit_slice_msgsize_ = this->cur_flowctrl_.GetDataSizeLimit() / 12;
- this->next_stage_updtime_ = curr_time + 60000;
- this->next_slice_updtime_ = curr_time + 5000;
- } else if (curr_time > this->next_slice_updtime_) {
- this->cur_slice_msgsize_ = 0;
- this->next_slice_updtime_ = curr_time + 5000;
+ limit_slice_msgsize_ = cur_flowctrl_.GetDataSizeLimit() / 12;
+ next_stage_updtime_ = curr_time + 60000;
+ next_slice_updtime_ = curr_time + 5000;
+ } else if (curr_time > next_slice_updtime_) {
+ cur_slice_msgsize_ = 0;
+ next_slice_updtime_ = curr_time + 5000;
}
}
SubscribeInfo::SubscribeInfo() {
- this->consumer_id_ = " ";
- this->group_ = " ";
+ consumer_id_ = " ";
+ group_ = " ";
buildSubInfo();
}
@@ -434,8 +437,8 @@ SubscribeInfo::SubscribeInfo(const string& sub_info) {
string::size_type pos = 0;
string seg_key = delimiter::kDelimiterPound;
string at_key = delimiter::kDelimiterAt;
- this->consumer_id_ = " ";
- this->group_ = " ";
+ consumer_id_ = " ";
+ group_ = " ";
// parse sub_info
pos = sub_info.find(seg_key);
if (pos != string::npos) {
@@ -443,109 +446,111 @@ SubscribeInfo::SubscribeInfo(const string& sub_info) {
consumer_info = Utils::Trim(consumer_info);
string partition_info = sub_info.substr(pos + seg_key.size(), sub_info.size());
partition_info = Utils::Trim(partition_info);
- this->partitionext_ = PartitionExt(partition_info);
+ PartitionExt tmp_part(partition_info);
+ partitionext_ = tmp_part;
pos = consumer_info.find(at_key);
- this->consumer_id_ = consumer_info.substr(0, pos);
- this->consumer_id_ = Utils::Trim(this->consumer_id_);
- this->group_ = consumer_info.substr(pos + at_key.size(), consumer_info.size());
- this->group_ = Utils::Trim(this->group_);
+ consumer_id_ = consumer_info.substr(0, pos);
+ consumer_id_ = Utils::Trim(consumer_id_);
+ group_ = consumer_info.substr(pos + at_key.size(), consumer_info.size());
+ group_ = Utils::Trim(group_);
}
buildSubInfo();
}
SubscribeInfo::SubscribeInfo(const string& consumer_id,
const string& group_name, const PartitionExt& partition_ext) {
- this->consumer_id_ = consumer_id;
- this->group_ = group_name;
- this->partitionext_ = partition_ext;
+ consumer_id_ = consumer_id;
+ group_ = group_name;
+ partitionext_ = partition_ext;
buildSubInfo();
}
SubscribeInfo& SubscribeInfo::operator=(const SubscribeInfo& target) {
if (this != &target) {
- this->consumer_id_ = target.consumer_id_;
- this->group_ = target.group_;
- this->partitionext_ = target.partitionext_;
+ consumer_id_ = target.consumer_id_;
+ group_ = target.group_;
+ partitionext_ = target.partitionext_;
+ buildSubInfo();
}
return *this;
}
-const string& SubscribeInfo::GetConsumerId() const { return this->consumer_id_; }
+const string& SubscribeInfo::GetConsumerId() const { return consumer_id_; }
-const string& SubscribeInfo::GetGroup() const { return this->group_; }
+const string& SubscribeInfo::GetGroup() const { return group_; }
-const PartitionExt& SubscribeInfo::GetPartitionExt() const { return this->partitionext_; }
+const PartitionExt& SubscribeInfo::GetPartitionExt() const { return partitionext_; }
-const uint32_t SubscribeInfo::GgetBrokerId() const { return this->partitionext_.GetBrokerId(); }
+const uint32_t SubscribeInfo::GgetBrokerId() const { return partitionext_.GetBrokerId(); }
-const string& SubscribeInfo::GetBrokerHost() const { return this->partitionext_.GetBrokerHost(); }
+const string& SubscribeInfo::GetBrokerHost() const { return partitionext_.GetBrokerHost(); }
-const uint32_t SubscribeInfo::GetBrokerPort() const { return this->partitionext_.GetBrokerPort(); }
+const uint32_t SubscribeInfo::GetBrokerPort() const { return partitionext_.GetBrokerPort(); }
-const string& SubscribeInfo::GetTopic() const { return this->partitionext_.GetTopic(); }
+const string& SubscribeInfo::GetTopic() const { return partitionext_.GetTopic(); }
const uint32_t SubscribeInfo::GetPartitionId() const {
- return this->partitionext_.GetPartitionId();
+ return partitionext_.GetPartitionId();
}
-const string& SubscribeInfo::ToString() const { return this->sub_info_; }
+const string& SubscribeInfo::ToString() const { return sub_info_; }
void SubscribeInfo::buildSubInfo() {
stringstream ss;
- ss << this->consumer_id_;
+ ss << consumer_id_;
ss << delimiter::kDelimiterAt;
- ss << this->group_;
+ ss << group_;
ss << delimiter::kDelimiterPound;
- ss << this->partitionext_.ToString();
- this->sub_info_ = ss.str();
+ ss << partitionext_.ToString();
+ sub_info_ = ss.str();
}
ConsumerEvent::ConsumerEvent() {
- this->rebalance_id_ = tb_config::kInvalidValue;
- this->event_type_ = tb_config::kInvalidValue;
- this->event_status_ = tb_config::kInvalidValue;
+ rebalance_id_ = tb_config::kInvalidValue;
+ event_type_ = tb_config::kInvalidValue;
+ event_status_ = tb_config::kInvalidValue;
}
ConsumerEvent::ConsumerEvent(const ConsumerEvent& target) {
- this->rebalance_id_ = target.rebalance_id_;
- this->event_type_ = target.event_type_;
- this->event_status_ = target.event_status_;
- this->subscribe_list_ = target.subscribe_list_;
+ rebalance_id_ = target.rebalance_id_;
+ event_type_ = target.event_type_;
+ event_status_ = target.event_status_;
+ subscribe_list_ = target.subscribe_list_;
}
ConsumerEvent::ConsumerEvent(int64_t rebalance_id, int32_t event_type,
const list<SubscribeInfo>& subscribeInfo_lst, int32_t event_status) {
list<SubscribeInfo>::const_iterator it;
- this->rebalance_id_ = rebalance_id;
- this->event_type_ = event_type;
- this->event_status_ = event_status;
+ rebalance_id_ = rebalance_id;
+ event_type_ = event_type;
+ event_status_ = event_status;
for (it = subscribeInfo_lst.begin(); it != subscribeInfo_lst.end(); ++it) {
- this->subscribe_list_.push_back(*it);
+ subscribe_list_.push_back(*it);
}
}
ConsumerEvent& ConsumerEvent::operator=(const ConsumerEvent& target) {
if (this != &target) {
- this->rebalance_id_ = target.rebalance_id_;
- this->event_type_ = target.event_type_;
- this->event_status_ = target.event_status_;
- this->subscribe_list_ = target.subscribe_list_;
+ rebalance_id_ = target.rebalance_id_;
+ event_type_ = target.event_type_;
+ event_status_ = target.event_status_;
+ subscribe_list_ = target.subscribe_list_;
}
return *this;
}
-const int64_t ConsumerEvent::GetRebalanceId() const { return this->rebalance_id_; }
+const int64_t ConsumerEvent::GetRebalanceId() const { return rebalance_id_; }
-const int32_t ConsumerEvent::GetEventType() const { return this->event_type_; }
+const int32_t ConsumerEvent::GetEventType() const { return event_type_; }
-const int32_t ConsumerEvent::GetEventStatus() const { return this->event_status_; }
+const int32_t ConsumerEvent::GetEventStatus() const { return event_status_; }
-void ConsumerEvent::SetEventType(int32_t event_type) { this->event_type_ = event_type; }
+void ConsumerEvent::SetEventType(int32_t event_type) { event_type_ = event_type; }
-void ConsumerEvent::SetEventStatus(int32_t event_status) { this->event_status_ = event_status; }
+void ConsumerEvent::SetEventStatus(int32_t event_status) { event_status_ = event_status; }
const list<SubscribeInfo>& ConsumerEvent::GetSubscribeInfoList() const {
- return this->subscribe_list_;
+ return subscribe_list_;
}
string ConsumerEvent::ToString() {
@@ -553,13 +558,13 @@ string ConsumerEvent::ToString() {
stringstream ss;
list<SubscribeInfo>::const_iterator it;
ss << "ConsumerEvent [rebalanceId=";
- ss << this->rebalance_id_;
+ ss << rebalance_id_;
ss << ", type=";
- ss << this->event_type_;
+ ss << event_type_;
ss << ", status=";
- ss << this->event_status_;
+ ss << event_status_;
ss << ", subscribeInfoList=[";
- for (it = this->subscribe_list_.begin(); it != this->subscribe_list_.end(); ++it) {
+ for (it = subscribe_list_.begin(); it != subscribe_list_.end(); ++it) {
if (count++ > 0) {
ss << ",";
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/meta_info.h b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.h
new file mode 100644
index 0000000..6117d65
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/meta_info.h
@@ -0,0 +1,188 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_META_INFO_H_
+#define TUBEMQ_CLIENT_META_INFO_H_
+
+#include <stdint.h>
+
+#include <list>
+#include <string>
+
+#include "flowctrl_def.h"
+
+namespace tubemq {
+
+using std::list;
+using std::map;
+using std::string;
+
+class NodeInfo {
+ public:
+ NodeInfo();
+ NodeInfo(bool is_broker, const string& node_info);
+ NodeInfo(const string& node_host, uint32_t node_port);
+ NodeInfo(int32_t node_id, const string& node_host, uint32_t node_port);
+ ~NodeInfo();
+ NodeInfo& operator=(const NodeInfo& target);
+ bool operator==(const NodeInfo& target);
+ bool operator<(const NodeInfo& target) const;
+ const uint32_t GetNodeId() const;
+ const string& GetHost() const;
+ const uint32_t GetPort() const;
+ const string& GetAddrInfo() const;
+ const string& GetNodeInfo() const;
+
+ private:
+ void buildStrInfo();
+
+ private:
+ uint32_t node_id_;
+ string node_host_;
+ uint32_t node_port_;
+ // ip:port
+ string addr_info_;
+ // id:ip:port
+ string node_info_;
+};
+
+class Partition {
+ public:
+ Partition();
+ explicit Partition(const string& partition_info);
+ Partition(const NodeInfo& broker_info, const string& part_str);
+ Partition(const NodeInfo& broker_info, const string& topic, uint32_t partition_id);
+ ~Partition();
+ Partition& operator=(const Partition& target);
+ bool operator==(const Partition& target);
+ const uint32_t GetBrokerId() const;
+ const string& GetBrokerHost() const;
+ const uint32_t GetBrokerPort() const;
+ const string& GetPartitionKey() const;
+ const string& GetTopic() const;
+ const NodeInfo& GetBrokerInfo() const;
+ const uint32_t GetPartitionId() const;
+ const string& ToString() const;
+
+ private:
+ void buildPartitionKey();
+
+ private:
+ string topic_;
+ NodeInfo broker_info_;
+ uint32_t partition_id_;
+ string partition_key_;
+ string partition_info_;
+};
+
+class PartitionExt : public Partition {
+ public:
+ PartitionExt();
+ explicit PartitionExt(const string& partition_info);
+ PartitionExt(const NodeInfo& broker_info, const string& part_str);
+ ~PartitionExt();
+ PartitionExt& operator=(const PartitionExt& target);
+ void BookConsumeData(int32_t errcode, int32_t msg_size, bool req_esc_limit,
+ int64_t rsp_dlt_limit, int64_t last_datadlt, bool require_slow);
+ int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+ const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed);
+ int64_t ProcConsumeResult(const FlowCtrlRuleHandler& def_flowctrl_handler,
+ const FlowCtrlRuleHandler& group_flowctrl_handler, bool filter_consume, bool last_consumed,
+ int32_t errcode, int32_t msg_size, bool req_esc_limit, int64_t rsp_dlt_limit,
+ int64_t last_datadlt, bool require_slow);
+ void SetLastConsumed(bool last_consumed);
+ bool IsLastConsumed() const;
+
+ private:
+ void resetParameters();
+ void updateStrategyData(const FlowCtrlRuleHandler& def_flowctrl_handler,
+ const FlowCtrlRuleHandler& group_flowctrl_handler, int32_t msg_size, int64_t last_datadlt);
+
+ private:
+ bool is_last_consumed_;
+ FlowCtrlResult cur_flowctrl_;
+ FlowCtrlItem cur_freqctrl_;
+ int64_t next_stage_updtime_;
+ int64_t next_slice_updtime_;
+ int64_t limit_slice_msgsize_;
+ int64_t cur_stage_msgsize_;
+ int64_t cur_slice_msgsize_;
+ int32_t total_zero_cnt_;
+ int64_t booked_time_;
+ int32_t booked_errcode_;
+ bool booked_esc_limit_;
+ int32_t booked_msgsize_;
+ int64_t booked_dlt_limit_;
+ int64_t booked_curdata_dlt_;
+ bool booked_require_slow_;
+};
+
+class SubscribeInfo {
+ public:
+ SubscribeInfo();
+ explicit SubscribeInfo(const string& sub_info);
+ SubscribeInfo(const string& consumer_id,
+ const string& group_name, const PartitionExt& partition_ext);
+ SubscribeInfo& operator=(const SubscribeInfo& target);
+ const string& GetConsumerId() const;
+ const string& GetGroup() const;
+ const PartitionExt& GetPartitionExt() const;
+ const uint32_t GgetBrokerId() const;
+ const string& GetBrokerHost() const;
+ const uint32_t GetBrokerPort() const;
+ const string& GetTopic() const;
+ const uint32_t GetPartitionId() const;
+ const string& ToString() const;
+
+ private:
+ void buildSubInfo();
+
+ private:
+ string consumer_id_;
+ string group_;
+ PartitionExt partitionext_;
+ string sub_info_;
+};
+
+class ConsumerEvent {
+ public:
+ ConsumerEvent();
+ ConsumerEvent(const ConsumerEvent& target);
+ ConsumerEvent(int64_t rebalance_id, int32_t event_type,
+ const list<SubscribeInfo>& subscribeInfo_lst, int32_t event_status);
+ ConsumerEvent& operator=(const ConsumerEvent& target);
+ const int64_t GetRebalanceId() const;
+ const int32_t GetEventType() const;
+ const int32_t GetEventStatus() const;
+ void SetEventType(int32_t event_type);
+ void SetEventStatus(int32_t event_status);
+ const list<SubscribeInfo>& GetSubscribeInfoList() const;
+ string ToString();
+
+ private:
+ int64_t rebalance_id_;
+ int32_t event_type_;
+ int32_t event_status_;
+ list<SubscribeInfo> subscribe_list_;
+};
+
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_META_INFO_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/noncopyable.h b/tubemq-client-twins/tubemq-client-cpp/src/noncopyable.h
new file mode 100644
index 0000000..9afbf52
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/noncopyable.h
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBUMQ_NONCOPYABLE_H
+#define _TUBUMQ_NONCOPYABLE_H
+
+namespace tubemq {
+
+class noncopyable {
+ public:
+ noncopyable(const noncopyable&) = delete;
+ void operator=(const noncopyable&) = delete;
+
+ protected:
+ noncopyable() = default;
+ ~noncopyable() = default;
+};
+
+} // namespace tubemq
+
+#endif // _TUBUMQ_NONCOPYABLE_H
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
index f97535c..8bc9067 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.cc
@@ -17,75 +17,68 @@
* under the License.
*/
-#include "tubemq/rmt_data_cache.h"
+#include "rmt_data_cache.h"
#include <stdlib.h>
-#include <string>
-
-#include "tubemq/client_service.h"
-#include "tubemq/const_config.h"
-#include "tubemq/meta_info.h"
-#include "tubemq/utils.h"
+#include <string>
+#include "client_service.h"
+#include "const_config.h"
+#include "logger.h"
+#include "meta_info.h"
+#include "utils.h"
namespace tubemq {
using std::lock_guard;
using std::unique_lock;
-using namespace std::placeholders;
-
RmtDataCacheCsm::RmtDataCacheCsm() {
under_groupctrl_.Set(false);
last_checktime_.Set(0);
+ cur_part_cnt_.Set(0);
}
RmtDataCacheCsm::~RmtDataCacheCsm() {
- //
+ //
}
-void RmtDataCacheCsm::SetConsumerInfo(const string& client_id,
- const string& group_name) {
+void RmtDataCacheCsm::SetConsumerInfo(const string& client_id, const string& group_name) {
consumer_id_ = client_id;
group_name_ = group_name;
}
-void RmtDataCacheCsm::UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
- const string& flowctrl_info) {
+void RmtDataCacheCsm::UpdateDefFlowCtrlInfo(int64_t flowctrl_id, const string& flowctrl_info) {
if (flowctrl_id != def_flowctrl_handler_.GetFlowCtrlId()) {
- def_flowctrl_handler_.UpdateDefFlowCtrlInfo(true,
- tb_config::kInvalidValue, flowctrl_id, flowctrl_info);
+ def_flowctrl_handler_.UpdateDefFlowCtrlInfo(true, tb_config::kInvalidValue, flowctrl_id,
+ flowctrl_info);
}
}
-void RmtDataCacheCsm::UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
- int64_t flowctrl_id, const string& flowctrl_info) {
+void RmtDataCacheCsm::UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id, int64_t flowctrl_id,
+ const string& flowctrl_info) {
if (flowctrl_id != group_flowctrl_handler_.GetFlowCtrlId()) {
- group_flowctrl_handler_.UpdateDefFlowCtrlInfo(false,
- qyrpriority_id, flowctrl_id, flowctrl_info);
+ group_flowctrl_handler_.UpdateDefFlowCtrlInfo(false, qyrpriority_id, flowctrl_id,
+ flowctrl_info);
}
if (qyrpriority_id != group_flowctrl_handler_.GetQryPriorityId()) {
- this->group_flowctrl_handler_.SetQryPriorityId(qyrpriority_id);
+ group_flowctrl_handler_.SetQryPriorityId(qyrpriority_id);
}
// update current if under group flowctrl
int64_t cur_time = Utils::GetCurrentTimeMillis();
if (cur_time - last_checktime_.Get() > 10000) {
FlowCtrlResult flowctrl_result;
- this->under_groupctrl_.Set(
- group_flowctrl_handler_.GetCurDataLimit(
- tb_config::kMaxLongValue, flowctrl_result));
+ under_groupctrl_.Set(
+ group_flowctrl_handler_.GetCurDataLimit(tb_config::kMaxLongValue, flowctrl_result));
last_checktime_.Set(cur_time);
}
}
const int64_t RmtDataCacheCsm::GetGroupQryPriorityId() const {
- return this->group_flowctrl_handler_.GetQryPriorityId();
-}
-
-bool RmtDataCacheCsm::IsUnderGroupCtrl() {
- return this->under_groupctrl_.Get();
+ return group_flowctrl_handler_.GetQryPriorityId();
}
+bool RmtDataCacheCsm::IsUnderGroupCtrl() { return under_groupctrl_.Get(); }
void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
//
@@ -99,6 +92,7 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
lock_guard<mutex> lck(meta_lock_);
it_map = partitions_.find(partition_key);
if (it_map == partitions_.end()) {
+ cur_part_cnt_.GetAndIncrement();
partitions_[partition_key] = partition_ext;
it_topic = topic_partition_.find(partition_ext.GetTopic());
if (it_topic == topic_partition_.end()) {
@@ -126,8 +120,23 @@ void RmtDataCacheCsm::AddNewPartition(const PartitionExt& partition_ext) {
resetIdlePartition(partition_key, true);
}
-bool RmtDataCacheCsm::SelectPartition(string &err_info,
- PartitionExt& partition_ext, string& confirm_context) {
+int32_t RmtDataCacheCsm::GetCurConsumeStatus() {
+ lock_guard<mutex> lck(meta_lock_);
+ if (partitions_.empty()) {
+ return err_code::kErrNoPartAssigned;
+ }
+ if (index_partitions_.empty()) {
+ if (partition_useds_.empty()) {
+ return err_code::kErrAllPartInUse;
+ } else {
+ return err_code::kErrAllPartWaiting;
+ }
+ }
+ return err_code::kErrSuccess;
+}
+
+bool RmtDataCacheCsm::SelectPartition(int32_t& error_code, string& err_info,
+ PartitionExt& partition_ext, string& confirm_context) {
bool result = false;
int64_t booked_time = 0;
string partition_key;
@@ -135,14 +144,22 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
// lock operate
lock_guard<mutex> lck(meta_lock_);
if (partitions_.empty()) {
+ error_code = err_code::kErrNoPartAssigned;
err_info = "No partition info in local cache, please retry later!";
result = false;
} else {
if (index_partitions_.empty()) {
- err_info = "No idle partition to consume, please retry later!";
+ if (partition_useds_.empty()) {
+ error_code = err_code::kErrAllPartInUse;
+ err_info = "No idle partition to consume, please retry later!";
+ } else {
+ error_code = err_code::kErrAllPartWaiting;
+ err_info = "All partitions reach max position, please retry later!";
+ }
result = false;
} else {
result = false;
+ error_code = err_code::kErrAllPartInUse;
err_info = "No idle partition to consume data 2, please retry later!";
booked_time = Utils::GetCurrentTimeMillis();
partition_key = index_partitions_.front();
@@ -160,10 +177,19 @@ bool RmtDataCacheCsm::SelectPartition(string &err_info,
return result;
}
-void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
- int64_t curr_offset, int32_t err_code, bool esc_limit,
- int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt,
- bool require_slow) {
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t curr_offset) {
+ map<string, PartitionExt>::iterator it_part;
+ // book partition offset info
+ if (curr_offset >= 0) {
+ lock_guard<mutex> lck1(data_book_mutex_);
+ partition_offset_[partition_key] = curr_offset;
+ }
+}
+
+void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key, int64_t curr_offset,
+ int32_t error_code, bool esc_limit, int32_t msg_size,
+ int64_t limit_dlt, int64_t cur_data_dlt,
+ bool require_slow) {
map<string, PartitionExt>::iterator it_part;
// book partition offset info
if (curr_offset >= 0) {
@@ -174,44 +200,53 @@ void RmtDataCacheCsm::BookedPartionInfo(const string& partition_key,
lock_guard<mutex> lck2(meta_lock_);
it_part = partitions_.find(partition_key);
if (it_part != partitions_.end()) {
- it_part->second.BookConsumeData(err_code, msg_size,
- esc_limit, limit_dlt, cur_data_dlt, require_slow);
+ it_part->second.BookConsumeData(error_code, msg_size, esc_limit, limit_dlt, cur_data_dlt,
+ require_slow);
+ }
+}
+
+bool RmtDataCacheCsm::IsPartitionInUse(string partition_key, int64_t used_time) {
+ map<string, int64_t>::iterator it_used;
+ lock_guard<mutex> lck(meta_lock_);
+ it_used = partition_useds_.find(partition_key);
+ if (it_used == partition_useds_.end() || it_used->second != used_time) {
+ return false;
}
+ return true;
}
// success process release partition
-bool RmtDataCacheCsm::RelPartition(string &err_info, bool filter_consume,
- const string& confirm_context, bool is_consumed) {
+bool RmtDataCacheCsm::RelPartition(string& err_info, bool filter_consume,
+ const string& confirm_context, bool is_consumed) {
return inRelPartition(err_info, true, filter_consume, confirm_context, is_consumed);
}
// release partiton without response return
-bool RmtDataCacheCsm::RelPartition(string &err_info,
- const string& confirm_context, bool is_consumed) {
+bool RmtDataCacheCsm::RelPartition(string& err_info, const string& confirm_context,
+ bool is_consumed) {
return inRelPartition(err_info, true, false, confirm_context, is_consumed);
}
// release partiton with error response return
-bool RmtDataCacheCsm::RelPartition(string &err_info, bool filter_consume,
- const string& confirm_context, bool is_consumed,
- int64_t curr_offset, int32_t err_code, bool esc_limit,
- int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt) {
+bool RmtDataCacheCsm::RelPartition(string& err_info, bool filter_consume,
+ const string& confirm_context, bool is_consumed,
+ int64_t curr_offset, int32_t error_code, bool esc_limit,
+ int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt) {
int64_t booked_time;
- string partition_key;
+ string partition_key;
// parse confirm context
- bool result = parseConfirmContext(err_info,
- confirm_context, partition_key, booked_time);
+ bool result = parseConfirmContext(err_info, confirm_context, partition_key, booked_time);
if (!result) {
return false;
}
- BookedPartionInfo(partition_key, curr_offset, err_code,
- esc_limit, msg_size, limit_dlt, cur_data_dlt, false);
- return inRelPartition(err_info, true,
- filter_consume, confirm_context, is_consumed);
+ BookedPartionInfo(partition_key, curr_offset, error_code, esc_limit, msg_size, limit_dlt,
+ cur_data_dlt, false);
+ return inRelPartition(err_info, true, filter_consume, confirm_context, is_consumed);
}
void RmtDataCacheCsm::FilterPartitions(const list<SubscribeInfo>& subscribe_info_lst,
- list<PartitionExt>& subscribed_partitions, list<PartitionExt>& unsub_partitions) {
+ list<PartitionExt>& subscribed_partitions,
+ list<PartitionExt>& unsub_partitions) {
//
map<string, PartitionExt>::iterator it_part;
list<SubscribeInfo>::const_iterator it_lst;
@@ -244,8 +279,7 @@ void RmtDataCacheCsm::GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst)
}
}
-void RmtDataCacheCsm::GetAllBrokerPartitions(
- map<NodeInfo, list<PartitionExt> >& broker_parts) {
+void RmtDataCacheCsm::GetAllClosedBrokerParts(map<NodeInfo, list<PartitionExt> >& broker_parts) {
map<string, PartitionExt>::iterator it_part;
map<NodeInfo, list<PartitionExt> >::iterator it_broker;
@@ -287,7 +321,7 @@ void RmtDataCacheCsm::GetRegBrokers(list<NodeInfo>& brokers) {
}
void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
- list<PartitionExt>& partition_list) {
+ list<PartitionExt>& partition_list) {
set<string>::iterator it_key;
map<NodeInfo, set<string> >::iterator it_broker;
map<string, PartitionExt>::iterator it_part;
@@ -296,8 +330,7 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
lock_guard<mutex> lck(meta_lock_);
it_broker = broker_partition_.find(broker_info);
if (it_broker != broker_partition_.end()) {
- for (it_key = it_broker->second.begin();
- it_key != it_broker->second.end(); it_key++) {
+ for (it_key = it_broker->second.begin(); it_key != it_broker->second.end(); it_key++) {
it_part = partitions_.find(*it_key);
if (it_part != partitions_.end()) {
partition_list.push_back(it_part->second);
@@ -306,7 +339,6 @@ void RmtDataCacheCsm::GetPartitionByBroker(const NodeInfo& broker_info,
}
}
-
void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, int64_t> part_offset_map) {
map<string, int64_t>::iterator it;
@@ -317,16 +349,13 @@ void RmtDataCacheCsm::GetCurPartitionOffsets(map<string, int64_t> part_offset_ma
}
}
-
//
-bool RmtDataCacheCsm::RemovePartition(string &err_info,
- const string& confirm_context) {
+bool RmtDataCacheCsm::RemovePartition(string& err_info, const string& confirm_context) {
int64_t booked_time;
- string partition_key;
+ string partition_key;
set<string> partition_keys;
// parse confirm context
- bool result = parseConfirmContext(err_info,
- confirm_context, partition_key, booked_time);
+ bool result = parseConfirmContext(err_info, confirm_context, partition_key, booked_time);
if (!result) {
return false;
}
@@ -360,7 +389,8 @@ void RmtDataCacheCsm::RemovePartition(const set<string>& partition_keys) {
}
void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe_infos,
- bool is_processing_rollback, map<NodeInfo, list<PartitionExt> >& broker_parts) {
+ bool is_processing_rollback,
+ map<NodeInfo, list<PartitionExt> >& broker_parts) {
//
string part_key;
list<SubscribeInfo>::const_iterator it;
@@ -398,29 +428,56 @@ void RmtDataCacheCsm::RemoveAndGetPartition(const list<SubscribeInfo>& subscribe
}
}
+void RmtDataCacheCsm::handleExpiredPartitions(int64_t max_wait_period_ms) {
+ int64_t curr_time;
+ set<string> expired_keys;
+ set<string>::iterator it_lst;
+ map<string, int64_t>::iterator it_used;
+ map<string, PartitionExt>::iterator it_map;
+
+ lock_guard<mutex> lck(meta_lock_);
+ if (!partition_useds_.empty()) {
+ curr_time = Utils::GetCurrentTimeMillis();
+ for (it_used = partition_useds_.begin();
+ it_used != partition_useds_.end(); ++it_used) {
+ if (curr_time - it_used->second > max_wait_period_ms) {
+ expired_keys.insert(it_used->first);
+ it_map = partitions_.find(it_used->first);
+ if (it_map != partitions_.end()) {
+ it_map->second.SetLastConsumed(false);
+ }
+ }
+ }
+ if (!expired_keys.empty()) {
+ for (it_lst = expired_keys.begin();
+ it_lst != expired_keys.end(); it_lst++) {
+ resetIdlePartition(*it_lst, true);
+ }
+ }
+ }
+}
+
bool RmtDataCacheCsm::IsPartitionFirstReg(const string& partition_key) {
- bool result = false;
map<string, bool>::iterator it;
-
lock_guard<mutex> lck(data_book_mutex_);
it = part_reg_booked_.find(partition_key);
if (it == part_reg_booked_.end()) {
part_reg_booked_[partition_key] = true;
}
- return result;
+ return part_reg_booked_[partition_key];
}
void RmtDataCacheCsm::OfferEvent(const ConsumerEvent& event) {
unique_lock<mutex> lck(event_read_mutex_);
- this->rebalance_events_.push_back(event);
+ rebalance_events_.push_back(event);
event_read_cond_.notify_all();
}
void RmtDataCacheCsm::TakeEvent(ConsumerEvent& event) {
unique_lock<mutex> lck(event_read_mutex_);
- while (this->rebalance_events_.empty()) {
+ while (rebalance_events_.empty()) {
event_read_cond_.wait(lck);
}
event = rebalance_events_.front();
@@ -434,41 +491,65 @@ void RmtDataCacheCsm::ClearEvent() {
void RmtDataCacheCsm::OfferEventResult(const ConsumerEvent& event) {
lock_guard<mutex> lck(event_write_mutex_);
- this->rebalance_events_.push_back(event);
+ rebalance_results_.push_back(event);
}
bool RmtDataCacheCsm::PollEventResult(ConsumerEvent& event) {
bool result = false;
lock_guard<mutex> lck(event_write_mutex_);
- if (!rebalance_events_.empty()) {
- event = rebalance_events_.front();
- rebalance_events_.pop_front();
+ if (!rebalance_results_.empty()) {
+ event = rebalance_results_.front();
+ rebalance_results_.pop_front();
result = true;
}
return result;
}
-void RmtDataCacheCsm::HandleTimeout(const string partition_key,
- const asio::error_code& error) {
+void RmtDataCacheCsm::HandleTimeout(const string partition_key, const asio::error_code& error) {
if (!error) {
lock_guard<mutex> lck(meta_lock_);
resetIdlePartition(partition_key, true);
}
}
+int RmtDataCacheCsm::IncrAndGetHBError(NodeInfo broker) {
+ int count = 0;
+ map<NodeInfo, int>::iterator it_map;
+ lock_guard<mutex> lck(status_mutex_);
+ it_map = broker_status_.find(broker);
+ if (it_map == broker_status_.end()) {
+ broker_status_[broker] = 1;
+ count = 1;
+ } else {
+ count = ++it_map->second;
+ }
+ return count;
+}
+
+void RmtDataCacheCsm::ResetHBError(NodeInfo broker) {
+ map<NodeInfo, int>::iterator it_map;
+ lock_guard<mutex> lck(status_mutex_);
+ it_map = broker_status_.find(broker);
+ if (it_map != broker_status_.end()) {
+ it_map->second = 0;
+ }
+}
+
+
void RmtDataCacheCsm::addDelayTimer(const string& partition_key, int64_t delay_time) {
// add timer
- tuple<int64_t, SteadyTimerPtr> timer =
- std::make_tuple(Utils::GetCurrentTimeMillis(),
- TubeMQService::Instance().GetTimerExecutorPool().Get()->CreateSteadyTimer());
+ tuple<int64_t, SteadyTimerPtr> timer = std::make_tuple(
+ Utils::GetCurrentTimeMillis(),
+ TubeMQService::Instance()->CreateTimer());
std::get<1>(timer)->expires_after(std::chrono::milliseconds(delay_time));
- std::get<1>(timer)->async_wait(std::bind(&RmtDataCacheCsm::HandleTimeout, this, partition_key, _1));
- partition_timeouts_.insert(std::make_pair(partition_key, timer));
+ std::get<1>(timer)->async_wait(
+ std::bind(&RmtDataCacheCsm::HandleTimeout, this, partition_key, std::placeholders::_1));
+ partition_timeouts_.insert(std::make_pair(partition_key, timer));
}
void RmtDataCacheCsm::resetIdlePartition(const string& partition_key, bool need_reuse) {
map<string, PartitionExt>::iterator it_map;
- map<string, tuple<int64_t, SteadyTimerPtr> >::iterator it_timeout;
+ map<string, tuple<int64_t, SteadyTimerPtr> >::iterator it_timeout;
partition_useds_.erase(partition_key);
it_timeout = partition_timeouts_.find(partition_key);
if (it_timeout != partition_timeouts_.end()) {
@@ -483,16 +564,16 @@ void RmtDataCacheCsm::resetIdlePartition(const string& partition_key, bool need_
}
}
-void RmtDataCacheCsm::buildConfirmContext(const string& partition_key,
- int64_t booked_time, string& confirm_context) {
+void RmtDataCacheCsm::buildConfirmContext(const string& partition_key, int64_t booked_time,
+ string& confirm_context) {
confirm_context.clear();
confirm_context += partition_key;
confirm_context += delimiter::kDelimiterAt;
confirm_context += Utils::Long2str(booked_time);
}
-bool RmtDataCacheCsm::parseConfirmContext(string &err_info,
- const string& confirm_context, string& partition_key, int64_t& booked_time) {
+bool RmtDataCacheCsm::parseConfirmContext(string& err_info, const string& confirm_context,
+ string& partition_key, int64_t& booked_time) {
//
vector<string> result;
Utils::Split(confirm_context, result, delimiter::kDelimiterAt);
@@ -528,19 +609,19 @@ void RmtDataCacheCsm::rmvMetaInfo(const string& partition_key) {
}
partitions_.erase(partition_key);
part_subinfo_.erase(partition_key);
+ cur_part_cnt_.DecrementAndGet();
}
}
-bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
- bool filter_consume, const string& confirm_context, bool is_consumed) {
+bool RmtDataCacheCsm::inRelPartition(string& err_info, bool need_delay_check, bool filter_consume,
+ const string& confirm_context, bool is_consumed) {
int64_t delay_time;
int64_t booked_time;
- string partition_key;
+ string partition_key;
map<string, PartitionExt>::iterator it_part;
map<string, int64_t>::iterator it_used;
- // parse confirm context
- bool result = parseConfirmContext(err_info,
- confirm_context, partition_key, booked_time);
+ // parse confirm context
+ bool result = parseConfirmContext(err_info, confirm_context, partition_key, booked_time);
if (!result) {
return false;
}
@@ -565,8 +646,8 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
index_partitions_.remove(partition_key);
delay_time = 0;
if (need_delay_check) {
- delay_time = it_part->second.ProcConsumeResult(def_flowctrl_handler_,
- group_flowctrl_handler_, filter_consume, is_consumed);
+ delay_time = it_part->second.ProcConsumeResult(
+ def_flowctrl_handler_, group_flowctrl_handler_, filter_consume, is_consumed);
}
if (delay_time > 10) {
addDelayTimer(partition_key, delay_time);
@@ -585,5 +666,4 @@ bool RmtDataCacheCsm::inRelPartition(string &err_info, bool need_delay_check,
return result;
}
-
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.h b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.h
new file mode 100644
index 0000000..5b77eb7
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/rmt_data_cache.h
@@ -0,0 +1,166 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
+#define TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
+
+#include <stdint.h>
+
+#include <condition_variable>
+#include <list>
+#include <map>
+#include <mutex>
+#include <set>
+#include <string>
+#include <tuple>
+
+#include "executor_pool.h"
+#include "flowctrl_def.h"
+#include "meta_info.h"
+#include "tubemq/tubemq_atomic.h"
+#include "tubemq/tubemq_errcode.h"
+
+
+
+
+
+namespace tubemq {
+
+using std::condition_variable;
+using std::map;
+using std::set;
+using std::list;
+using std::mutex;
+using std::string;
+using std::tuple;
+
+
+
+// consumer remote data cache
+class RmtDataCacheCsm {
+ public:
+ RmtDataCacheCsm();
+ ~RmtDataCacheCsm();
+ void SetConsumerInfo(const string& client_id, const string& group_name);
+ void UpdateDefFlowCtrlInfo(int64_t flowctrl_id,
+ const string& flowctrl_info);
+ void UpdateGroupFlowCtrlInfo(int32_t qyrpriority_id,
+ int64_t flowctrl_id, const string& flowctrl_info);
+ const int64_t GetGroupQryPriorityId() const;
+ const int64_t GetDefFlowCtrlId() const { return def_flowctrl_handler_.GetFlowCtrlId(); }
+ const int64_t GetGroupFlowCtrlId() const { return group_flowctrl_handler_.GetFlowCtrlId(); }
+ bool IsUnderGroupCtrl();
+ int32_t GetCurConsumeStatus();
+ void handleExpiredPartitions(int64_t max_wait_period_ms);
+ int32_t GetCurPartCount() const { return cur_part_cnt_.Get(); }
+ bool IsPartitionInUse(string partition_key, int64_t used_time);
+ void AddNewPartition(const PartitionExt& partition_ext);
+ bool SelectPartition(int32_t& error_code, string &err_info,
+ PartitionExt& partition_ext, string& confirm_context);
+ void BookedPartionInfo(const string& partition_key, int64_t curr_offset);
+ void BookedPartionInfo(const string& partition_key, int64_t curr_offset,
+ int32_t error_code, bool esc_limit, int32_t msg_size,
+ int64_t limit_dlt, int64_t cur_data_dlt, bool require_slow);
+ bool RelPartition(string &err_info, bool filter_consume,
+ const string& confirm_context, bool is_consumed);
+ bool RelPartition(string &err_info, const string& confirm_context, bool is_consumed);
+ bool RelPartition(string &err_info, bool filter_consume,
+ const string& confirm_context, bool is_consumed,
+ int64_t curr_offset, int32_t error_code, bool esc_limit,
+ int32_t msg_size, int64_t limit_dlt, int64_t cur_data_dlt);
+ void FilterPartitions(const list<SubscribeInfo>& subscribe_info_lst,
+ list<PartitionExt>& subscribed_partitions, list<PartitionExt>& unsub_partitions);
+ void GetSubscribedInfo(list<SubscribeInfo>& subscribe_info_lst);
+ bool GetPartitionExt(const string& part_key, PartitionExt& partition_ext);
+ void GetRegBrokers(list<NodeInfo>& brokers);
+ void GetPartitionByBroker(const NodeInfo& broker_info,
+ list<PartitionExt>& partition_list);
+ void GetCurPartitionOffsets(map<string, int64_t> part_offset_map);
+ void GetAllClosedBrokerParts(map<NodeInfo, list<PartitionExt> >& broker_parts);
+ void RemovePartition(const list<PartitionExt>& partition_list);
+ void RemovePartition(const set<string>& partition_keys);
+ bool RemovePartition(string &err_info, const string& confirm_context);
+ void RemoveAndGetPartition(const list<SubscribeInfo>& subscribe_infos,
+ bool is_processing_rollback, map<NodeInfo, list<PartitionExt> >& broker_parts);
+ bool IsPartitionFirstReg(const string& partition_key);
+ void OfferEvent(const ConsumerEvent& event);
+ void TakeEvent(ConsumerEvent& event);
+ void ClearEvent();
+ void OfferEventResult(const ConsumerEvent& event);
+ bool PollEventResult(ConsumerEvent& event);
+ void HandleTimeout(const string partition_key, const asio::error_code& error);
+ int IncrAndGetHBError(NodeInfo broker);
+ void ResetHBError(NodeInfo broker);
+
+ private:
+ void addDelayTimer(const string& part_key, int64_t delay_time);
+ void resetIdlePartition(const string& partition_key, bool need_reuse);
+ void rmvMetaInfo(const string& partition_key);
+ void buildConfirmContext(const string& partition_key,
+ int64_t booked_time, string& confirm_context);
+ bool parseConfirmContext(string &err_info,
+ const string& confirm_context, string& partition_key, int64_t& booked_time);
+ bool inRelPartition(string &err_info, bool need_delay_check,
+ bool filter_consume, const string& confirm_context, bool is_consumed);
+
+ private:
+ //
+ string consumer_id_;
+ string group_name_;
+ // flow ctrl
+ AtomicInteger cur_part_cnt_;
+ FlowCtrlRuleHandler group_flowctrl_handler_;
+ FlowCtrlRuleHandler def_flowctrl_handler_;
+ AtomicBoolean under_groupctrl_;
+ AtomicLong last_checktime_;
+ // meta info
+ mutable mutex meta_lock_;
+ // partiton allocated map
+ map<string, PartitionExt> partitions_;
+ // topic partiton map
+ map<string, set<string> > topic_partition_;
+ // broker parition map
+ map<NodeInfo, set<string> > broker_partition_;
+ map<string, SubscribeInfo> part_subinfo_;
+ // for idle partitions occupy
+ list<string> index_partitions_;
+ // for partition used map
+ map<string, int64_t> partition_useds_;
+ // for partiton timer map
+ map<string, tuple<int64_t, SteadyTimerPtr> > partition_timeouts_;
+ // data book
+ mutable mutex data_book_mutex_;
+ // for partition offset cache
+ map<string, int64_t> partition_offset_;
+ // for partiton register booked
+ map<string, bool> part_reg_booked_;
+ // event
+ mutable mutex event_read_mutex_;
+ condition_variable event_read_cond_;
+ list<ConsumerEvent> rebalance_events_;
+ mutable mutex event_write_mutex_;
+ list<ConsumerEvent> rebalance_results_;
+ // status check
+ mutable mutex status_mutex_;
+ map<NodeInfo, int> broker_status_;
+};
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_RMT_DATA_CACHE_H_
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/singleton.h
similarity index 50%
copy from tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
copy to tubemq-client-twins/tubemq-client-cpp/src/singleton.h
index 1adcdf8..c945005 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/singleton.h
@@ -17,52 +17,46 @@
* under the License.
*/
-#ifndef _TUBEMQ_THREAD_POOL_
-#define _TUBEMQ_THREAD_POOL_
+#ifndef _TUBEMQ_SINGLETON_H
+#define _TUBEMQ_SINGLETON_H
+#include <assert.h>
#include <stdlib.h>
-#include <asio.hpp>
-#include <asio/ssl.hpp>
-#include <chrono>
-#include <functional>
-#include <memory>
#include <mutex>
#include <thread>
-#include <vector>
#include "noncopyable.h"
namespace tubemq {
-// ThreadPool use one io_context for thread pool
-class ThreadPool : noncopyable {
+
+template <typename T>
+class Singleton : noncopyable {
public:
- explicit ThreadPool(std::size_t size)
- : io_context_(size), work_(asio::make_work_guard(io_context_)) {
- for (size_t i = 0; i < size; ++i) {
- workers_.emplace_back([this] { io_context_.run(); });
- }
+ static T& Instance() {
+ std::call_once(once_, Singleton::init);
+ assert(value_ != nullptr);
+ return *value_;
}
- ~ThreadPool() {
- work_.reset();
- io_context_.stop();
- for (std::thread &worker : workers_) {
- worker.join();
- }
- workers_.clear();
- }
+ protected:
+ Singleton() {}
+ ~Singleton() {}
- template <class function>
- void Post(function f) {
- io_context_.post(f);
- }
+ private:
+ static void init() { value_ = new T(); }
private:
- asio::io_context io_context_;
- using io_context_work = asio::executor_work_guard<asio::io_context::executor_type>;
- io_context_work work_;
- std::vector<std::thread> workers_;
-}; // namespace tubemq
+ static std::once_flag once_;
+ static T* value_;
+};
+
+template <typename T>
+std::once_flag Singleton<T>::once_;
+
+template <typename T>
+T* Singleton<T>::value_ = nullptr;
+
} // namespace tubemq
-#endif // _TUBEMQ_THREAD_POOL_
+
+#endif // _TUBEMQ_SINGLETON_H
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h b/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
index 1adcdf8..803eb15 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/thread_pool.h
@@ -22,8 +22,7 @@
#include <stdlib.h>
-#include <asio.hpp>
-#include <asio/ssl.hpp>
+
#include <chrono>
#include <functional>
#include <memory>
@@ -31,6 +30,8 @@
#include <thread>
#include <vector>
+#include <asio.hpp>
+#include <asio/ssl.hpp>
#include "noncopyable.h"
namespace tubemq {
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc
new file mode 100644
index 0000000..9ba1aef
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_client.cc
@@ -0,0 +1,184 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tubemq/tubemq_client.h"
+
+#include <signal.h>
+#include <unistd.h>
+
+#include <sstream>
+
+#include "baseconsumer.h"
+#include "client_service.h"
+#include "const_config.h"
+#include "tubemq/tubemq_config.h"
+#include "tubemq/tubemq_errcode.h"
+
+
+
+namespace tubemq {
+
+using std::lock_guard;
+using std::stringstream;
+
+bool StartTubeMQService(string& err_info, const string& conf_file) {
+ signal(SIGPIPE, SIG_IGN);
+ return TubeMQService::Instance()->Start(err_info, conf_file);
+}
+
+bool StopTubeMQService(string& err_info) {
+ int32_t count = TubeMQService::Instance()->GetClientObjCnt();
+ if (count > 0) {
+ stringstream ss;
+ ss << "Check found ";
+ ss << count;
+ ss << " clients not shutdown, please shutdown clients first!";
+ err_info = ss.str();
+ return false;
+ }
+ return TubeMQService::Instance()->Stop(err_info);
+}
+
+
+TubeMQConsumer::TubeMQConsumer() {
+ client_id_ = tb_config::kInvalidValue;
+ status_.Set(0);
+}
+
+TubeMQConsumer::~TubeMQConsumer() {
+ ShutDown();
+}
+
+bool TubeMQConsumer::Start(string& err_info,
+ const ConsumerConfig& config) {
+ if (!TubeMQService::Instance()->IsRunning()) {
+ err_info = "TubeMQ Service not startted!";
+ return false;
+ }
+ // check status
+ if (!status_.CompareAndSet(0, 1)) {
+ err_info = "Duplicated call!";
+ return false;
+ }
+ BaseConsumer* rmt_client = new BaseConsumer();
+ if (rmt_client == NULL) {
+ err_info = "No memory for create CONSUMER remote object!";
+ return false;
+ }
+ if (!rmt_client->Start(err_info, config)) {
+ rmt_client->ShutDown();
+ delete rmt_client;
+ return false;
+ }
+ client_id_ = rmt_client->GetClientIndex();
+ status_.Set(2);
+ err_info = "Ok!";
+ return true;
+}
+
+void TubeMQConsumer::ShutDown() {
+ if (!status_.CompareAndSet(2, 0)) {
+ return;
+ }
+ if (client_id_ != tb_config::kInvalidValue) {
+ BaseConsumer* rmt_client =
+ (BaseConsumer *)TubeMQService::Instance()->GetClientObj(client_id_);
+ if ((rmt_client != NULL)
+ && (rmt_client->GetClientIndex() == client_id_)) {
+ rmt_client->ShutDown();
+ delete rmt_client;
+ }
+ client_id_ = tb_config::kInvalidValue;
+ }
+}
+
+bool TubeMQConsumer::GetMessage(ConsumerResult& result) {
+ if (!TubeMQService::Instance()->IsRunning()) {
+ result.SetFailureResult(err_code::kErrMQServiceStop,
+ "TubeMQ Service stopped!");
+ return false;
+ }
+ if (status_.Get() != 2) {
+ result.SetFailureResult(err_code::kErrClientStop,
+ "TubeMQ Service not startted!");
+ return false;
+ }
+ if (client_id_ == tb_config::kInvalidValue) {
+ result.SetFailureResult(err_code::kErrClientStop,
+ "Tube client not call init function, please initial first!");
+ return false;
+ }
+ BaseConsumer* rmt_client =
+ (BaseConsumer*)TubeMQService::Instance()->GetClientObj(client_id_);
+ if ((rmt_client == NULL)
+ || (rmt_client->GetClientIndex() != client_id_)) {
+ result.SetFailureResult(err_code::kErrBadRequest,
+ "Rmt client CB has been released, please re-start this client");
+ return false;
+ }
+ return rmt_client->GetMessage(result);
+}
+
+bool TubeMQConsumer::Confirm(const string& confirm_context,
+ bool is_consumed, ConsumerResult& result) {
+ if (!TubeMQService::Instance()->IsRunning()) {
+ result.SetFailureResult(err_code::kErrMQServiceStop,
+ "TubeMQ Service stopped!");
+ return false;
+ }
+ if (status_.Get() != 2) {
+ result.SetFailureResult(err_code::kErrClientStop,
+ "TubeMQ Service not startted!");
+ return false;
+ }
+ if (client_id_ == tb_config::kInvalidValue) {
+ result.SetFailureResult(err_code::kErrClientStop,
+ "Tube client not call init function, please initial first!");
+ return false;
+ }
+ BaseConsumer* rmt_client =
+ (BaseConsumer*)TubeMQService::Instance()->GetClientObj(client_id_);
+ if ((rmt_client == NULL)
+ || (rmt_client->GetClientIndex() != client_id_)) {
+ result.SetFailureResult(err_code::kErrBadRequest,
+ "Rmt client CB has been released, please re-start this client");
+ return false;
+ }
+ return rmt_client->Confirm(confirm_context, is_consumed, result);
+}
+
+bool TubeMQConsumer::GetCurConsumedInfo(map<string, ConsumeOffsetInfo>& consume_info_map) {
+ if (!TubeMQService::Instance()->IsRunning()) {
+ return false;
+ }
+ if ((status_.Get() != 2)
+ || (client_id_ == tb_config::kInvalidValue)) {
+ return false;
+ }
+ BaseConsumer* rmt_client =
+ (BaseConsumer*)TubeMQService::Instance()->GetClientObj(client_id_);
+ if ((rmt_client == NULL)
+ || (rmt_client->GetClientIndex() != client_id_)) {
+ return false;
+ }
+ return rmt_client->GetCurConsumedInfo(consume_info_map);
+}
+
+} // namespace tubemq
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
index 89759df..c186701 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_codec.h
@@ -185,9 +185,10 @@ class TubeMQCodec final : public CodecProtocol {
// return code: -1 failed; 0-Unfinished; > 0 package buffer size
virtual int32_t Check(BufferPtr &in, Any &out, uint32_t &request_id, bool &has_request_id,
size_t &package_length) {
+ LOG_TRACE("check in:%s", in->String().c_str());
// check package is valid
if (in->length() < 12) {
- package_length = 12;
+ // package_length = 12;
LOG_TRACE("Check: data's length < 12, is %ld, out", in->length());
return 0;
}
@@ -206,13 +207,11 @@ class TubeMQCodec final : public CodecProtocol {
}
// check data list
uint32_t item_len = 0;
+ // package_length = 12;
auto check_buf = in->Slice();
for (uint32_t i = 0; i < list_size; i++) {
+ // package_length += 4;
if (check_buf->length() < 4) {
- package_length += 4;
- if (i > 0) {
- package_length += i * rpc_config::kRpcMaxBufferSize;
- }
LOG_TRACE("Check: buffer Remaining length < 4, is %ld, out", check_buf->length());
return 0;
}
@@ -226,11 +225,8 @@ class TubeMQCodec final : public CodecProtocol {
rpc_config::kRpcMaxBufferSize);
return -1;
}
+ // package_length += item_len;
if (item_len > check_buf->length()) {
- package_length += 4 + item_len;
- if (i > 0) {
- package_length += i * rpc_config::kRpcMaxBufferSize;
- }
LOG_TRACE("Check: item_len(%d) > remaining length(%ld), out", item_len,
check_buf->length());
return 0;
@@ -248,8 +244,8 @@ class TubeMQCodec final : public CodecProtocol {
in->Skip(item_len);
}
out = buf;
- LOG_TRACE("Check: received message check finished, request_id=%d, readed_len:%d",
- request_id, readed_len);
+ LOG_TRACE("Check: received message check finished, request_id=%d, readed_len:%d", request_id,
+ readed_len);
return readed_len;
}
@@ -310,8 +306,7 @@ class TubeMQCodec final : public CodecProtocol {
}
return block_cnt;
}
-
- };
+};
} // namespace tubemq
#endif
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
index b976add..a2534ff 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_config.cc
@@ -22,9 +22,9 @@
#include <sstream>
#include <vector>
-#include "tubemq/const_config.h"
-#include "tubemq/const_rpc.h"
-#include "tubemq/utils.h"
+#include "const_config.h"
+#include "const_rpc.h"
+#include "utils.h"
namespace tubemq {
@@ -33,17 +33,17 @@ using std::stringstream;
using std::vector;
BaseConfig::BaseConfig() {
- this->master_addrinfo_ = "";
- this->auth_enable_ = false;
- this->auth_usrname_ = "";
- this->auth_usrpassword_ = "";
- this->tls_enabled_ = false;
- this->tls_trust_store_path_ = "";
- this->tls_trust_store_password_ = "";
- this->rpc_read_timeout_sec_ = rpc_config::kRpcTimoutDefSec;
- this->heartbeat_period_sec_ = tb_config::kHeartBeatPeriodDef;
- this->max_heartbeat_retry_times_ = tb_config::kHeartBeatFailRetryTimesDef;
- this->heartbeat_period_afterfail_sec_ = tb_config::kHeartBeatSleepPeriodDef;
+ master_addrinfo_ = "";
+ auth_enable_ = false;
+ auth_usrname_ = "";
+ auth_usrpassword_ = "";
+ tls_enabled_ = false;
+ tls_trust_store_path_ = "";
+ tls_trust_store_password_ = "";
+ rpc_read_timeout_ms_ = tb_config::kRpcTimoutDefMs;
+ heartbeat_period_ms_ = tb_config::kHeartBeatPeriodDefMs;
+ max_heartbeat_retry_times_ = tb_config::kHeartBeatFailRetryTimesDef;
+ heartbeat_period_afterfail_ms_ = tb_config::kHeartBeatSleepPeriodDefMs;
}
BaseConfig::~BaseConfig() {
@@ -52,17 +52,17 @@ BaseConfig::~BaseConfig() {
BaseConfig& BaseConfig::operator=(const BaseConfig& target) {
if (this != &target) {
- this->master_addrinfo_ = target.master_addrinfo_;
- this->auth_enable_ = target.auth_enable_;
- this->auth_usrname_ = target.auth_usrname_;
- this->auth_usrpassword_ = target.auth_usrpassword_;
- this->tls_enabled_ = target.tls_enabled_;
- this->tls_trust_store_path_ = target.tls_trust_store_path_;
- this->tls_trust_store_password_ = target.tls_trust_store_password_;
- this->rpc_read_timeout_sec_ = target.rpc_read_timeout_sec_;
- this->heartbeat_period_sec_ = target.heartbeat_period_sec_;
- this->max_heartbeat_retry_times_ = target.max_heartbeat_retry_times_;
- this->heartbeat_period_afterfail_sec_ = target.heartbeat_period_afterfail_sec_;
+ master_addrinfo_ = target.master_addrinfo_;
+ auth_enable_ = target.auth_enable_;
+ auth_usrname_ = target.auth_usrname_;
+ auth_usrpassword_ = target.auth_usrpassword_;
+ tls_enabled_ = target.tls_enabled_;
+ tls_trust_store_path_ = target.tls_trust_store_path_;
+ tls_trust_store_password_ = target.tls_trust_store_password_;
+ rpc_read_timeout_ms_ = target.rpc_read_timeout_ms_;
+ heartbeat_period_ms_ = target.heartbeat_period_ms_;
+ max_heartbeat_retry_times_ = target.max_heartbeat_retry_times_;
+ heartbeat_period_afterfail_ms_ = target.heartbeat_period_afterfail_ms_;
}
return *this;
}
@@ -91,14 +91,14 @@ bool BaseConfig::SetMasterAddrInfo(string& err_info, const string& master_addrin
err_info = "Illegal parameter: master_addrinfo is blank!";
return false;
}
- this->master_addrinfo_ = trimed_master_addr_info;
+ master_addrinfo_ = trimed_master_addr_info;
err_info = "Ok";
return true;
}
bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable, const string& trust_store_path,
const string& trust_store_password) {
- this->tls_enabled_ = tls_enable;
+ tls_enabled_ = tls_enable;
if (tls_enable) {
string trimed_trust_store_path = Utils::Trim(trust_store_path);
if (trimed_trust_store_path.empty()) {
@@ -110,11 +110,11 @@ bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable, const string& tru
err_info = "Illegal parameter: trust_store_password is empty!";
return false;
}
- this->tls_trust_store_path_ = trimed_trust_store_path;
- this->tls_trust_store_password_ = trimed_trust_store_password;
+ tls_trust_store_path_ = trimed_trust_store_path;
+ tls_trust_store_password_ = trimed_trust_store_password;
} else {
- this->tls_trust_store_path_ = "";
- this->tls_trust_store_password_ = "";
+ tls_trust_store_path_ = "";
+ tls_trust_store_password_ = "";
}
err_info = "Ok";
return true;
@@ -122,7 +122,7 @@ bool BaseConfig::SetTlsInfo(string& err_info, bool tls_enable, const string& tru
bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable, const string& usr_name,
const string& usr_password) {
- this->auth_enable_ = authentic_enable;
+ auth_enable_ = authentic_enable;
if (authentic_enable) {
string trimed_usr_name = Utils::Trim(usr_name);
if (trimed_usr_name.empty()) {
@@ -134,102 +134,103 @@ bool BaseConfig::SetAuthenticInfo(string& err_info, bool authentic_enable, const
err_info = "Illegal parameter: usr_password is empty!";
return false;
}
- this->auth_usrname_ = trimed_usr_name;
- this->auth_usrpassword_ = trimed_usr_password;
+ auth_usrname_ = trimed_usr_name;
+ auth_usrpassword_ = trimed_usr_password;
} else {
- this->auth_usrname_ = "";
- this->auth_usrpassword_ = "";
+ auth_usrname_ = "";
+ auth_usrpassword_ = "";
}
err_info = "Ok";
return true;
}
-const string& BaseConfig::GetMasterAddrInfo() const { return this->master_addrinfo_; }
+const string& BaseConfig::GetMasterAddrInfo() const { return master_addrinfo_; }
-bool BaseConfig::IsTlsEnabled() { return this->tls_enabled_; }
+bool BaseConfig::IsTlsEnabled() { return tls_enabled_; }
-const string& BaseConfig::GetTrustStorePath() const { return this->tls_trust_store_path_; }
+const string& BaseConfig::GetTrustStorePath() const { return tls_trust_store_path_; }
-const string& BaseConfig::GetTrustStorePassword() const { return this->tls_trust_store_password_; }
+const string& BaseConfig::GetTrustStorePassword() const { return tls_trust_store_password_; }
-bool BaseConfig::IsAuthenticEnabled() { return this->auth_enable_; }
+bool BaseConfig::IsAuthenticEnabled() { return auth_enable_; }
-const string& BaseConfig::GetUsrName() const { return this->auth_usrname_; }
+const string& BaseConfig::GetUsrName() const { return auth_usrname_; }
-const string& BaseConfig::GetUsrPassWord() const { return this->auth_usrpassword_; }
+const string& BaseConfig::GetUsrPassWord() const { return auth_usrpassword_; }
-void BaseConfig::SetRpcReadTimeoutSec(int rpc_read_timeout_sec) {
- if (rpc_read_timeout_sec >= rpc_config::kRpcTimoutMaxSec) {
- this->rpc_read_timeout_sec_ = rpc_config::kRpcTimoutMaxSec;
- } else if (rpc_read_timeout_sec <= rpc_config::kRpcTimoutMinSec) {
- this->rpc_read_timeout_sec_ = rpc_config::kRpcTimoutMinSec;
+void BaseConfig::SetRpcReadTimeoutMs(int rpc_read_timeout_ms) {
+ if (rpc_read_timeout_ms >= tb_config::kRpcTimoutMaxMs) {
+ rpc_read_timeout_ms_ = tb_config::kRpcTimoutMaxMs;
+ } else if (rpc_read_timeout_ms <= tb_config::kRpcTimoutMinMs) {
+ rpc_read_timeout_ms_ = tb_config::kRpcTimoutMinMs;
} else {
- this->rpc_read_timeout_sec_ = rpc_read_timeout_sec;
+ rpc_read_timeout_ms_ = rpc_read_timeout_ms;
}
}
-int32_t BaseConfig::GetRpcReadTimeoutSec() { return this->rpc_read_timeout_sec_; }
+int32_t BaseConfig::GetRpcReadTimeoutMs() { return rpc_read_timeout_ms_; }
-void BaseConfig::SetHeartbeatPeriodSec(int32_t heartbeat_period_sec) {
- this->heartbeat_period_sec_ = heartbeat_period_sec;
+void BaseConfig::SetHeartbeatPeriodMs(int32_t heartbeat_period_ms) {
+ heartbeat_period_ms_ = heartbeat_period_ms;
}
-int32_t BaseConfig::GetHeartbeatPeriodSec() { return this->heartbeat_period_sec_; }
+int32_t BaseConfig::GetHeartbeatPeriodMs() { return heartbeat_period_ms_; }
void BaseConfig::SetMaxHeartBeatRetryTimes(int32_t max_heartbeat_retry_times) {
- this->max_heartbeat_retry_times_ = max_heartbeat_retry_times;
+ max_heartbeat_retry_times_ = max_heartbeat_retry_times;
}
-int32_t BaseConfig::GetMaxHeartBeatRetryTimes() { return this->max_heartbeat_retry_times_; }
+int32_t BaseConfig::GetMaxHeartBeatRetryTimes() { return max_heartbeat_retry_times_; }
-void BaseConfig::SetHeartbeatPeriodAftFailSec(int32_t heartbeat_period_afterfail_sec) {
- this->heartbeat_period_afterfail_sec_ = heartbeat_period_afterfail_sec;
+void BaseConfig::SetHeartbeatPeriodAftFailMs(int32_t heartbeat_period_afterfail_ms) {
+ heartbeat_period_afterfail_ms_ = heartbeat_period_afterfail_ms;
}
-int32_t BaseConfig::GetHeartbeatPeriodAftFailSec() { return this->heartbeat_period_afterfail_sec_; }
+int32_t BaseConfig::GetHeartbeatPeriodAftFailMs() { return heartbeat_period_afterfail_ms_; }
string BaseConfig::ToString() {
stringstream ss;
ss << "BaseConfig={master_addrinfo_='";
- ss << this->master_addrinfo_;
+ ss << master_addrinfo_;
ss << "', authEnable=";
- ss << this->auth_enable_;
+ ss << auth_enable_;
ss << ", auth_usrname_='";
- ss << this->auth_usrname_;
+ ss << auth_usrname_;
ss << "', auth_usrpassword_='";
- ss << this->auth_usrpassword_;
+ ss << auth_usrpassword_;
ss << "', tls_enabled_=";
- ss << this->tls_enabled_;
+ ss << tls_enabled_;
ss << ", tls_trust_store_path_='";
- ss << this->tls_trust_store_path_;
+ ss << tls_trust_store_path_;
ss << "', tls_trust_store_password_='";
- ss << this->tls_trust_store_password_;
- ss << "', rpc_read_timeout_sec_=";
- ss << this->rpc_read_timeout_sec_;
- ss << ", heartbeat_period_sec_=";
- ss << this->heartbeat_period_sec_;
+ ss << tls_trust_store_password_;
+ ss << "', rpc_read_timeout_ms_=";
+ ss << rpc_read_timeout_ms_;
+ ss << ", heartbeat_period_ms_=";
+ ss << heartbeat_period_ms_;
ss << ", max_heartbeat_retry_times_=";
- ss << this->max_heartbeat_retry_times_;
- ss << ", heartbeat_period_afterfail_sec_=";
- ss << this->heartbeat_period_afterfail_sec_;
+ ss << max_heartbeat_retry_times_;
+ ss << ", heartbeat_period_afterfail_ms_=";
+ ss << heartbeat_period_afterfail_ms_;
ss << "}";
return ss.str();
}
ConsumerConfig::ConsumerConfig() {
- this->group_name_ = "";
- this->is_bound_consume_ = false;
- this->session_key_ = "";
- this->source_count_ = 0;
- this->is_select_big_ = true;
- this->consume_position_ = kConsumeFromLatestOffset;
- this->is_confirm_in_local_ = false;
- this->is_rollback_if_confirm_timout_ = true;
- this->max_subinfo_report_intvl_ = tb_config::kSubInfoReportMaxIntervalTimes;
- this->msg_notfound_wait_period_ms_ = tb_config::kMsgNotfoundWaitPeriodMsDef;
- this->reb_confirm_wait_period_ms_ = tb_config::kRebConfirmWaitPeriodMsDef;
- this->max_confirm_wait_period_ms_ = tb_config::kConfirmWaitPeriodMsMax;
- this->shutdown_reb_wait_period_ms_ = tb_config::kRebWaitPeriodWhenShutdownMs;
+ group_name_ = "";
+ is_bound_consume_ = false;
+ session_key_ = "";
+ source_count_ = 0;
+ is_select_big_ = true;
+ consume_position_ = kConsumeFromLatestOffset;
+ is_rollback_if_confirm_timout_ = true;
+ max_subinfo_report_intvl_ = tb_config::kSubInfoReportMaxIntervalTimes;
+ max_part_check_period_ms_ = tb_config::kMaxPartCheckPeriodMsDef;
+ part_check_slice_ms_ = tb_config::kPartCheckSliceMsDef;
+ msg_notfound_wait_period_ms_ = tb_config::kMsgNotfoundWaitPeriodMsDef;
+ reb_confirm_wait_period_ms_ = tb_config::kRebConfirmWaitPeriodMsDef;
+ max_confirm_wait_period_ms_ = tb_config::kConfirmWaitPeriodMsMax;
+ shutdown_reb_wait_period_ms_ = tb_config::kRebWaitPeriodWhenShutdownMs;
}
ConsumerConfig::~ConsumerConfig() {
@@ -241,21 +242,20 @@ ConsumerConfig& ConsumerConfig::operator=(const ConsumerConfig& target) {
// parent class
BaseConfig::operator=(target);
// child class
- this->group_name_ = target.group_name_;
- this->sub_topic_and_filter_map_ = target.sub_topic_and_filter_map_;
- this->is_bound_consume_ = target.is_bound_consume_;
- this->session_key_ = target.session_key_;
- this->source_count_ = target.source_count_;
- this->is_select_big_ = target.is_select_big_;
- this->part_offset_map_ = target.part_offset_map_;
- this->consume_position_ = target.consume_position_;
- this->max_subinfo_report_intvl_ = target.max_subinfo_report_intvl_;
- this->msg_notfound_wait_period_ms_ = target.msg_notfound_wait_period_ms_;
- this->is_confirm_in_local_ = target.is_confirm_in_local_;
- this->is_rollback_if_confirm_timout_ = target.is_rollback_if_confirm_timout_;
- this->reb_confirm_wait_period_ms_ = target.reb_confirm_wait_period_ms_;
- this->max_confirm_wait_period_ms_ = target.max_confirm_wait_period_ms_;
- this->shutdown_reb_wait_period_ms_ = target.shutdown_reb_wait_period_ms_;
+ group_name_ = target.group_name_;
+ sub_topic_and_filter_map_ = target.sub_topic_and_filter_map_;
+ is_bound_consume_ = target.is_bound_consume_;
+ session_key_ = target.session_key_;
+ source_count_ = target.source_count_;
+ is_select_big_ = target.is_select_big_;
+ part_offset_map_ = target.part_offset_map_;
+ consume_position_ = target.consume_position_;
+ max_subinfo_report_intvl_ = target.max_subinfo_report_intvl_;
+ msg_notfound_wait_period_ms_ = target.msg_notfound_wait_period_ms_;
+ is_rollback_if_confirm_timout_ = target.is_rollback_if_confirm_timout_;
+ reb_confirm_wait_period_ms_ = target.reb_confirm_wait_period_ms_;
+ max_confirm_wait_period_ms_ = target.max_confirm_wait_period_ms_;
+ shutdown_reb_wait_period_ms_ = target.shutdown_reb_wait_period_ms_;
}
return *this;
}
@@ -285,9 +285,9 @@ bool ConsumerConfig::SetGroupConsumeTarget(string& err_info, const string& group
set<string> tmp_filters;
tmp_sub_map[topic_name] = tmp_filters;
}
- this->is_bound_consume_ = false;
- this->group_name_ = tgt_group_name;
- this->sub_topic_and_filter_map_ = tmp_sub_map;
+ is_bound_consume_ = false;
+ group_name_ = tgt_group_name;
+ sub_topic_and_filter_map_ = tmp_sub_map;
err_info = "Ok";
return true;
}
@@ -376,15 +376,16 @@ bool ConsumerConfig::setGroupConsumeTarget(
}
// check if bound consume
if (!is_bound_consume) {
- this->is_bound_consume_ = false;
- this->group_name_ = tgt_group_name;
- this->sub_topic_and_filter_map_ = tmp_sub_map;
+ is_bound_consume_ = false;
+ group_name_ = tgt_group_name;
+ sub_topic_and_filter_map_ = tmp_sub_map;
err_info = "Ok";
return true;
}
// check session_key
string tgt_session_key = Utils::Trim(session_key);
- if (tgt_session_key.length() == 0 || tgt_session_key.length() > tb_config::kSessionKeyMaxLength) {
+ if (tgt_session_key.length() == 0
+ || tgt_session_key.length() > tb_config::kSessionKeyMaxLength) {
if (tgt_session_key.length() == 0) {
err_info = "Illegal parameter: session_key is empty!";
} else {
@@ -410,7 +411,7 @@ bool ConsumerConfig::setGroupConsumeTarget(
err_info = ss.str();
return false;
}
- if (tmp_sub_map.find(result[1]) != tmp_sub_map.end()) {
+ if (tmp_sub_map.find(result[1]) == tmp_sub_map.end()) {
stringstream ss;
ss << "Illegal parameter: ";
ss << it_part->first;
@@ -443,77 +444,96 @@ bool ConsumerConfig::setGroupConsumeTarget(
tmp_parts_map[part_key] = it_part->second;
}
// set verified data
- this->is_bound_consume_ = true;
- this->group_name_ = tgt_group_name;
- this->sub_topic_and_filter_map_ = tmp_sub_map;
- this->session_key_ = tgt_session_key;
- this->source_count_ = source_count;
- this->is_select_big_ = is_select_big;
- this->part_offset_map_ = tmp_parts_map;
+ is_bound_consume_ = true;
+ group_name_ = tgt_group_name;
+ sub_topic_and_filter_map_ = tmp_sub_map;
+ session_key_ = tgt_session_key;
+ source_count_ = source_count;
+ is_select_big_ = is_select_big;
+ part_offset_map_ = tmp_parts_map;
err_info = "Ok";
return true;
}
-const string& ConsumerConfig::GetGroupName() const { return this->group_name_; }
+const string& ConsumerConfig::GetGroupName() const { return group_name_; }
const map<string, set<string> >& ConsumerConfig::GetSubTopicAndFilterMap() const {
- return this->sub_topic_and_filter_map_;
+ return sub_topic_and_filter_map_;
}
void ConsumerConfig::SetConsumePosition(ConsumePosition consume_from_where) {
- this->consume_position_ = consume_from_where;
+ consume_position_ = consume_from_where;
}
-const ConsumePosition ConsumerConfig::GetConsumePosition() const { return this->consume_position_; }
+const ConsumePosition ConsumerConfig::GetConsumePosition() const { return consume_position_; }
-const int ConsumerConfig::GetMsgNotFoundWaitPeriodMs() const {
- return this->msg_notfound_wait_period_ms_;
+const int32_t ConsumerConfig::GetMsgNotFoundWaitPeriodMs() const {
+ return msg_notfound_wait_period_ms_;
}
-void ConsumerConfig::SetMsgNotFoundWaitPeriodMs(int msg_notfound_wait_period_ms) {
- this->msg_notfound_wait_period_ms_ = msg_notfound_wait_period_ms;
+void ConsumerConfig::SetMsgNotFoundWaitPeriodMs(int32_t msg_notfound_wait_period_ms) {
+ msg_notfound_wait_period_ms_ = msg_notfound_wait_period_ms;
}
-const int ConsumerConfig::GetMaxSubinfoReportIntvl() const {
- return this->max_subinfo_report_intvl_;
+const uint32_t ConsumerConfig::GetPartCheckSliceMs() const {
+ return part_check_slice_ms_;
}
-void ConsumerConfig::SetMaxSubinfoReportIntvl(int max_subinfo_report_intvl) {
- this->max_subinfo_report_intvl_ = max_subinfo_report_intvl;
+void ConsumerConfig::SetPartCheckSliceMs(uint32_t part_check_slice_ms) {
+ if (part_check_slice_ms >= 0
+ && part_check_slice_ms <= 1000) {
+ part_check_slice_ms_ = part_check_slice_ms;
+ }
+}
+
+const int32_t ConsumerConfig::GetMaxPartCheckPeriodMs() const {
+ return max_part_check_period_ms_;
}
-bool ConsumerConfig::IsConfirmInLocal() { return this->is_confirm_in_local_; }
+void ConsumerConfig::SetMaxPartCheckPeriodMs(int32_t max_part_check_period_ms) {
+ max_part_check_period_ms_ = max_part_check_period_ms;
+}
+
+const int32_t ConsumerConfig::GetMaxSubinfoReportIntvl() const {
+ return max_subinfo_report_intvl_;
+}
-void ConsumerConfig::SetConfirmInLocal(bool confirm_in_local) {
- this->is_confirm_in_local_ = confirm_in_local;
+void ConsumerConfig::SetMaxSubinfoReportIntvl(int32_t max_subinfo_report_intvl) {
+ max_subinfo_report_intvl_ = max_subinfo_report_intvl;
}
-bool ConsumerConfig::IsRollbackIfConfirmTimeout() { return this->is_rollback_if_confirm_timout_; }
+bool ConsumerConfig::IsRollbackIfConfirmTimeout() {
+ return is_rollback_if_confirm_timout_; }
-void ConsumerConfig::setRollbackIfConfirmTimeout(bool is_rollback_if_confirm_timeout) {
- this->is_rollback_if_confirm_timout_ = is_rollback_if_confirm_timeout;
+void ConsumerConfig::setRollbackIfConfirmTimeout(
+ bool is_rollback_if_confirm_timeout) {
+ is_rollback_if_confirm_timout_ = is_rollback_if_confirm_timeout;
}
const int ConsumerConfig::GetWaitPeriodIfConfirmWaitRebalanceMs() const {
- return this->reb_confirm_wait_period_ms_;
+ return reb_confirm_wait_period_ms_;
}
-void ConsumerConfig::SetWaitPeriodIfConfirmWaitRebalanceMs(int reb_confirm_wait_period_ms) {
- this->reb_confirm_wait_period_ms_ = reb_confirm_wait_period_ms;
+void ConsumerConfig::SetWaitPeriodIfConfirmWaitRebalanceMs(
+ int32_t reb_confirm_wait_period_ms) {
+ reb_confirm_wait_period_ms_ = reb_confirm_wait_period_ms;
}
-const int ConsumerConfig::GetMaxConfirmWaitPeriodMs() const { return max_confirm_wait_period_ms_; }
+const int32_t ConsumerConfig::GetMaxConfirmWaitPeriodMs() const {
+ return max_confirm_wait_period_ms_; }
-void ConsumerConfig::SetMaxConfirmWaitPeriodMs(int max_confirm_wait_period_ms) {
- this->max_confirm_wait_period_ms_ = max_confirm_wait_period_ms;
+void ConsumerConfig::SetMaxConfirmWaitPeriodMs(
+ int32_t max_confirm_wait_period_ms) {
+ max_confirm_wait_period_ms_ = max_confirm_wait_period_ms;
}
-const int ConsumerConfig::GetShutdownRebWaitPeriodMs() const {
- return this->shutdown_reb_wait_period_ms_;
+const int32_t ConsumerConfig::GetShutdownRebWaitPeriodMs() const {
+ return shutdown_reb_wait_period_ms_;
}
-void ConsumerConfig::SetShutdownRebWaitPeriodMs(int wait_period_when_shutdown_ms) {
- this->shutdown_reb_wait_period_ms_ = wait_period_when_shutdown_ms;
+void ConsumerConfig::SetShutdownRebWaitPeriodMs(
+ int32_t wait_period_when_shutdown_ms) {
+ shutdown_reb_wait_period_ms_ = wait_period_when_shutdown_ms;
}
string ConsumerConfig::ToString() {
@@ -526,10 +546,10 @@ string ConsumerConfig::ToString() {
ss << "ConsumerConfig = {";
ss << BaseConfig::ToString();
ss << ", group_name_='";
- ss << this->group_name_;
+ ss << group_name_;
ss << "', sub_topic_and_filter_map_={";
- for (it_map = this->sub_topic_and_filter_map_.begin();
- it_map != this->sub_topic_and_filter_map_.end(); ++it_map) {
+ for (it_map = sub_topic_and_filter_map_.begin();
+ it_map != sub_topic_and_filter_map_.end(); ++it_map) {
if (i++ > 0) {
ss << ",";
}
@@ -549,16 +569,16 @@ string ConsumerConfig::ToString() {
ss << "]";
}
ss << "}, is_bound_consume_=";
- ss << this->is_bound_consume_;
+ ss << is_bound_consume_;
ss << ", session_key_='";
- ss << this->session_key_;
+ ss << session_key_;
ss << "', source_count_=";
- ss << this->source_count_;
+ ss << source_count_;
ss << ", is_select_big_=";
- ss << this->is_select_big_;
+ ss << is_select_big_;
ss << ", part_offset_map_={";
i = 0;
- for (it = this->part_offset_map_.begin(); it != this->part_offset_map_.end(); ++it) {
+ for (it = part_offset_map_.begin(); it != part_offset_map_.end(); ++it) {
if (i++ > 0) {
ss << ",";
}
@@ -568,21 +588,23 @@ string ConsumerConfig::ToString() {
ss << it->second;
}
ss << "}, consume_position_=";
- ss << this->consume_position_;
+ ss << consume_position_;
ss << ", max_subinfo_report_intvl_=";
- ss << this->max_subinfo_report_intvl_;
+ ss << max_subinfo_report_intvl_;
ss << ", msg_notfound_wait_period_ms_=";
- ss << this->msg_notfound_wait_period_ms_;
- ss << ", is_confirm_in_local_=";
- ss << this->is_confirm_in_local_;
+ ss << msg_notfound_wait_period_ms_;
+ ss << ", max_part_check_period_ms_=";
+ ss << max_part_check_period_ms_;
+ ss << ", part_check_slice_ms_=";
+ ss << part_check_slice_ms_;
ss << ", is_rollback_if_confirm_timout_=";
- ss << this->is_rollback_if_confirm_timout_;
+ ss << is_rollback_if_confirm_timout_;
ss << ", reb_confirm_wait_period_ms_=";
- ss << this->reb_confirm_wait_period_ms_;
+ ss << reb_confirm_wait_period_ms_;
ss << ", max_confirm_wait_period_ms_=";
- ss << this->max_confirm_wait_period_ms_;
+ ss << max_confirm_wait_period_ms_;
ss << ", shutdown_reb_wait_period_ms_=";
- ss << this->shutdown_reb_wait_period_ms_;
+ ss << shutdown_reb_wait_period_ms_;
ss << "}";
return ss.str();
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
index f914741..a12b199 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_message.cc
@@ -20,89 +20,88 @@
#include "tubemq/tubemq_message.h"
#include <string.h>
-
#include <sstream>
-#include "tubemq/const_config.h"
-#include "tubemq/utils.h"
+#include "const_config.h"
+#include "utils.h"
namespace tubemq {
using std::stringstream;
-
-// message flag's properties settings
-static const int32_t kMsgFlagIncProperties = 0x01;
-// reserved property key Filter Item
-static const char kRsvPropKeyFilterItem[] = "$msgType$";
-// reserved property key message send time
-static const char kRsvPropKeyMsgTime[] = "$msgTime$";
-
Message::Message() {
- this->topic_ = "";
- this->flag_ = 0;
- this->message_id_ = tb_config::kInvalidValue;
- this->data_ = NULL;
- this->datalen_ = 0;
- this->properties_.clear();
+ flag_ = 0;
+ message_id_ = tb_config::kInvalidValue;
+ data_ = NULL;
+ datalen_ = 0;
}
Message::Message(const Message& target) {
- this->topic_ = target.topic_;
- this->message_id_ = target.message_id_;
+ topic_ = target.topic_;
+ message_id_ = target.message_id_;
copyData(target.data_, target.datalen_);
copyProperties(target.properties_);
- this->flag_ = target.flag_;
+ flag_ = target.flag_;
}
Message::Message(const string& topic, const char* data, uint32_t datalen) {
- this->topic_ = topic;
- this->flag_ = 0;
- this->message_id_ = tb_config::kInvalidValue;
+ topic_ = topic;
+ flag_ = 0;
+ message_id_ = tb_config::kInvalidValue;
copyData(data, datalen);
- this->properties_.clear();
+ properties_.clear();
+}
+
+Message::Message(const string& topic, int32_t flag,
+ int64_t message_id, const char* data, uint32_t datalen,
+ const map<string, string>& properties) {
+ topic_ = topic;
+ flag_ = flag;
+ message_id_ = message_id;
+ copyData(data, datalen);
+ copyProperties(properties);
}
Message::~Message() { clearData(); }
Message& Message::operator=(const Message& target) {
if (this == &target) return *this;
- this->topic_ = target.topic_;
- this->message_id_ = target.message_id_;
+ topic_ = target.topic_;
+ message_id_ = target.message_id_;
clearData();
copyData(target.data_, target.datalen_);
copyProperties(target.properties_);
- this->flag_ = target.flag_;
+ flag_ = target.flag_;
return *this;
}
-const int64_t Message::GetMessageId() const { return this->message_id_; }
+const int64_t Message::GetMessageId() const { return message_id_; }
-void Message::SetMessageId(int64_t message_id) { this->message_id_ = message_id; }
+void Message::SetMessageId(int64_t message_id) { message_id_ = message_id; }
-const string& Message::GetTopic() const { return this->topic_; }
+const string& Message::GetTopic() const { return topic_; }
-void Message::SetTopic(const string& topic) { this->topic_ = topic; }
+void Message::SetTopic(const string& topic) { topic_ = topic; }
-const char* Message::GetData() const { return this->data_; }
+const char* Message::GetData() const { return data_; }
-uint32_t Message::GetDataLength() const { return this->datalen_; }
+uint32_t Message::GetDataLength() const { return datalen_; }
void Message::setData(const char* data, uint32_t datalen) {
clearData();
copyData(data, datalen);
}
-const int32_t Message::GetFlag() const { return this->flag_; }
+const int32_t Message::GetFlag() const { return flag_; }
-void Message::SetFlag(int32_t flag) { this->flag_ = flag; }
+void Message::SetFlag(int32_t flag) { flag_ = flag; }
-const map<string, string>& Message::GetProperties() const { return this->properties_; }
+const map<string, string>& Message::GetProperties() const { return properties_; }
int32_t Message::GetProperties(string& attribute) {
attribute.clear();
map<string, string>::iterator it_map;
- for (it_map = this->properties_.begin(); it_map != this->properties_.end(); ++it_map) {
+ for (it_map = properties_.begin(); it_map != properties_.end(); ++it_map) {
if (!attribute.empty()) {
attribute += delimiter::kDelimiterComma;
}
@@ -117,8 +116,8 @@ bool Message::HasProperty(const string& key) {
map<string, string>::iterator it_map;
string trimed_key = Utils::Trim(key);
if (!trimed_key.empty()) {
- it_map = this->properties_.find(trimed_key);
- if (it_map != this->properties_.end()) {
+ it_map = properties_.find(trimed_key);
+ if (it_map != properties_.end()) {
return true;
}
}
@@ -129,8 +128,8 @@ bool Message::GetProperty(const string& key, string& value) {
map<string, string>::iterator it_map;
string trimed_key = Utils::Trim(key);
if (!trimed_key.empty()) {
- it_map = this->properties_.find(trimed_key);
- if (it_map != this->properties_.end()) {
+ it_map = properties_.find(trimed_key);
+ if (it_map != properties_.end()) {
value = it_map->second;
return true;
}
@@ -138,7 +137,9 @@ bool Message::GetProperty(const string& key, string& value) {
return false;
}
-bool Message::GetFilterItem(string& value) { return GetProperty(kRsvPropKeyFilterItem, value); }
+bool Message::GetFilterItem(string& value) {
+ return GetProperty(tb_config::kRsvPropKeyFilterItem, value);
+}
bool Message::AddProperty(string& err_info, const string& key, const string& value) {
string trimed_key = Utils::Trim(key);
@@ -169,53 +170,54 @@ bool Message::AddProperty(string& err_info, const string& key, const string& val
err_info = ss.str();
return false;
}
- if (trimed_key == kRsvPropKeyFilterItem || trimed_key == kRsvPropKeyMsgTime) {
+ if (trimed_key == tb_config::kRsvPropKeyFilterItem
+ || trimed_key == tb_config::kRsvPropKeyMsgTime) {
stringstream ss;
ss << "Reserved token '";
- ss << kRsvPropKeyFilterItem;
+ ss << tb_config::kRsvPropKeyFilterItem;
ss << "' or '";
- ss << kRsvPropKeyMsgTime;
+ ss << tb_config::kRsvPropKeyMsgTime;
ss << "' must not be used in parmeter key!";
err_info = ss.str();
return false;
}
// add key and value
- this->properties_[trimed_key] = trimed_value;
- if (!this->properties_.empty()) {
- this->flag_ |= kMsgFlagIncProperties;
+ properties_[trimed_key] = trimed_value;
+ if (!properties_.empty()) {
+ flag_ |= tb_config::kMsgFlagIncProperties;
}
err_info = "Ok";
return true;
}
void Message::clearData() {
- if (this->data_ != NULL) {
- delete[] this->data_;
- this->data_ = NULL;
- this->datalen_ = 0;
+ if (data_ != NULL) {
+ delete[] data_;
+ data_ = NULL;
+ datalen_ = 0;
}
}
void Message::copyData(const char* data, uint32_t datalen) {
if (data == NULL) {
- this->data_ = NULL;
- this->datalen_ = 0;
+ data_ = NULL;
+ datalen_ = 0;
} else {
- this->datalen_ = datalen;
- this->data_ = new char[datalen];
- memset(this->data_, 0, datalen);
- memcpy(this->data_, data, datalen);
+ datalen_ = datalen;
+ data_ = new char[datalen];
+ memset(data_, 0, datalen);
+ memcpy(data_, data, datalen);
}
}
void Message::copyProperties(const map<string, string>& properties) {
- this->properties_.clear();
+ properties_.clear();
map<string, string>::const_iterator it_map;
for (it_map = properties.begin(); it_map != properties.end(); ++it_map) {
- this->properties_[it_map->first] = it_map->second;
+ properties_[it_map->first] = it_map->second;
}
- if (!this->properties_.empty()) {
- this->flag_ |= kMsgFlagIncProperties;
+ if (!properties_.empty()) {
+ flag_ |= tb_config::kMsgFlagIncProperties;
}
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
index 5298606..ef79440 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/tubemq_return.cc
@@ -18,7 +18,7 @@
*/
#include "tubemq/tubemq_return.h"
-#include "tubemq/const_config.h"
+#include "const_config.h"
@@ -32,27 +32,52 @@ PeerInfo::PeerInfo() {
curr_offset_ = tb_config::kInvalidValue;
}
-PeerInfo::PeerInfo(const Partition& partition, int64_t offset) {
- SetMsgSourceInfo(partition, offset);
+
+PeerInfo::PeerInfo(const string& broker_host, uint32_t partition_id,
+ const string& partiton_key, int64_t offset) {
+ broker_host_ = broker_host;
+ partition_id_ = partition_id;
+ partition_key_ = partiton_key;
+ curr_offset_ = offset;
}
PeerInfo& PeerInfo::operator=(const PeerInfo& target) {
if (this != &target) {
- this->partition_id_ = target.partition_id_;
- this->broker_host_ = target.broker_host_;
- this->partition_key_ = target.partition_key_;
- this->curr_offset_ = target.curr_offset_;
+ partition_id_ = target.partition_id_;
+ broker_host_ = target.broker_host_;
+ partition_key_ = target.partition_key_;
+ curr_offset_ = target.curr_offset_;
}
return *this;
}
-void PeerInfo::SetMsgSourceInfo(const Partition& partition, int64_t offset) {
- partition_id_ = partition.GetPartitionId();
- broker_host_ = partition.GetBrokerHost();
- partition_key_ = partition.GetPartitionKey();
- curr_offset_ = offset;
+ConsumeOffsetInfo::ConsumeOffsetInfo() {
+ partition_key_ = "";
+ curr_offset_ = tb_config::kInvalidValue;
+}
+
+ConsumeOffsetInfo::ConsumeOffsetInfo(
+ const string& part_key, int64_t curr_offset) {
+ partition_key_ = part_key;
+ curr_offset_ = curr_offset;
+}
+
+void ConsumeOffsetInfo::SetConsumeOffsetInfo(
+ const string& part_key, int64_t curr_offset) {
+ partition_key_ = part_key;
+ curr_offset_ = curr_offset;
}
+ConsumeOffsetInfo& ConsumeOffsetInfo::operator=(
+ const ConsumeOffsetInfo& target) {
+ if (this != &target) {
+ partition_key_ = target.partition_key_;
+ curr_offset_ = target.curr_offset_;
+ }
+ return *this;
+}
+
+
ConsumerResult::ConsumerResult() {
success_ = false;
err_code_ = tb_config::kInvalidValue;
@@ -62,25 +87,25 @@ ConsumerResult::ConsumerResult() {
}
ConsumerResult::ConsumerResult(const ConsumerResult& target) {
- this->success_ = target.success_;
- this->err_code_ = target.err_code_;
- this->err_msg_ = target.err_msg_;
- this->topic_name_ = target.topic_name_;
- this->peer_info_ = target.peer_info_;
- this->confirm_context_ = target.confirm_context_;
- this->message_list_ = target.message_list_;
+ success_ = target.success_;
+ err_code_ = target.err_code_;
+ err_msg_ = target.err_msg_;
+ topic_name_ = target.topic_name_;
+ peer_info_ = target.peer_info_;
+ confirm_context_ = target.confirm_context_;
+ message_list_ = target.message_list_;
}
-ConsumerResult::ConsumerResult(int32_t err_code, string err_msg) {
+ConsumerResult::ConsumerResult(int32_t error_code, string err_msg) {
success_ = false;
- err_code_ = err_code;
+ err_code_ = error_code;
err_msg_ = err_msg;
topic_name_ = "";
confirm_context_ = "";
}
ConsumerResult::~ConsumerResult() {
- this->message_list_.clear();
+ message_list_.clear();
success_ = false;
err_code_ = tb_config::kInvalidValue;
err_msg_ = "";
@@ -90,52 +115,64 @@ ConsumerResult::~ConsumerResult() {
ConsumerResult& ConsumerResult::operator=(const ConsumerResult& target) {
if (this != &target) {
- this->success_ = target.success_;
- this->err_code_ = target.err_code_;
- this->err_msg_ = target.err_msg_;
- this->topic_name_ = target.topic_name_;
- this->peer_info_ = target.peer_info_;
- this->confirm_context_ = target.confirm_context_;
- this->message_list_ = target.message_list_;
+ success_ = target.success_;
+ err_code_ = target.err_code_;
+ err_msg_ = target.err_msg_;
+ topic_name_ = target.topic_name_;
+ peer_info_ = target.peer_info_;
+ confirm_context_ = target.confirm_context_;
+ message_list_ = target.message_list_;
}
return *this;
}
-void ConsumerResult::SetFailureResult(int32_t err_code, string err_msg) {
+void ConsumerResult::SetFailureResult(int32_t error_code, string err_msg) {
success_ = false;
- err_code_ = err_code;
+ err_code_ = error_code;
err_msg_ = err_msg;
}
-void ConsumerResult::SetFailureResult(int32_t err_code, string err_msg,
+void ConsumerResult::SetFailureResult(int32_t error_code, string err_msg,
const string& topic_name, const PeerInfo& peer_info) {
success_ = false;
- err_code_ = err_code;
+ err_code_ = error_code;
err_msg_ = err_msg;
topic_name_ = topic_name;
peer_info_ = peer_info;
}
-void ConsumerResult::SetSuccessResult(int32_t err_code,
+void ConsumerResult::SetSuccessResult(int32_t error_code,
+ const string& topic_name,
+ const PeerInfo& peer_info) {
+ success_ = true;
+ err_code_ = error_code;
+ err_msg_ = "Ok";
+ topic_name_ = topic_name;
+ peer_info_ = peer_info;
+ confirm_context_ = "";
+ message_list_.clear();
+}
+
+void ConsumerResult::SetSuccessResult(int32_t error_code,
const string& topic_name,
const PeerInfo& peer_info,
const string& confirm_context,
const list<Message>& message_list) {
- this->success_ = true;
- this->err_code_ = err_code;
- this->err_msg_ = "Ok";
- this->topic_name_ = topic_name;
- this->peer_info_ = peer_info;
- this->confirm_context_ = confirm_context;
- this->message_list_ = message_list;
+ success_ = true;
+ err_code_ = error_code;
+ err_msg_ = "Ok";
+ topic_name_ = topic_name;
+ peer_info_ = peer_info;
+ confirm_context_ = confirm_context;
+ message_list_ = message_list;
}
const string& ConsumerResult::GetPartitionKey() const {
- return this->peer_info_.GetPartitionKey();
+ return peer_info_.GetPartitionKey();
}
const int64_t ConsumerResult::GetCurrOffset() const {
- return this->peer_info_.GetCurrOffset();
+ return peer_info_.GetCurrOffset();
}
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/unique_seq_id.h b/tubemq-client-twins/tubemq-client-cpp/src/unique_seq_id.h
new file mode 100644
index 0000000..b24b627
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/unique_seq_id.h
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_UNIQUESEQID_H
+#define TUBEMQ_UNIQUESEQID_H
+
+#include <stdint.h>
+
+#include <atomic>
+
+namespace tubemq {
+
+class UniqueSeqId {
+ public:
+ UniqueSeqId() : id_(0) {}
+
+ uint32_t Next() { return id_.fetch_add(1, std::memory_order_relaxed); }
+
+ protected:
+ std::atomic<uint32_t> id_;
+};
+
+} // namespace tubemq
+
+#endif
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
index 75fcba5..81ef56e 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.cc
@@ -17,24 +17,27 @@
* under the License.
*/
-#include "tubemq/utils.h"
+#include "utils.h"
#include <arpa/inet.h>
+#include <ctype.h>
#include <linux/if.h>
+#include <netdb.h>
#include <netinet/in.h>
#include <regex.h>
-#include <stdlib.h>
#include <stdio.h>
+#include <stdlib.h>
#include <string.h>
#include <sys/ioctl.h>
#include <sys/socket.h>
#include <sys/time.h>
#include <unistd.h>
-
#include <sstream>
#include <vector>
+#include "const_config.h"
+#include "const_rpc.h"
+
-#include "tubemq/const_config.h"
namespace tubemq {
@@ -43,6 +46,64 @@ using std::stringstream;
static const char kWhitespaceCharSet[] = " \n\r\t\f\v";
+/*
+ * copy from https://web.mit.edu/freebsd/head/sys/libkern/crc32.c
+*/
+static const uint32_t crc32_tab[256] = {
+ 0x00000000, 0x77073096, 0xee0e612c, 0x990951ba, 0x076dc419, 0x706af48f,
+ 0xe963a535, 0x9e6495a3, 0x0edb8832, 0x79dcb8a4, 0xe0d5e91e, 0x97d2d988,
+ 0x09b64c2b, 0x7eb17cbd, 0xe7b82d07, 0x90bf1d91, 0x1db71064, 0x6ab020f2,
+ 0xf3b97148, 0x84be41de, 0x1adad47d, 0x6ddde4eb, 0xf4d4b551, 0x83d385c7,
+ 0x136c9856, 0x646ba8c0, 0xfd62f97a, 0x8a65c9ec, 0x14015c4f, 0x63066cd9,
+ 0xfa0f3d63, 0x8d080df5, 0x3b6e20c8, 0x4c69105e, 0xd56041e4, 0xa2677172,
+ 0x3c03e4d1, 0x4b04d447, 0xd20d85fd, 0xa50ab56b, 0x35b5a8fa, 0x42b2986c,
+ 0xdbbbc9d6, 0xacbcf940, 0x32d86ce3, 0x45df5c75, 0xdcd60dcf, 0xabd13d59,
+ 0x26d930ac, 0x51de003a, 0xc8d75180, 0xbfd06116, 0x21b4f4b5, 0x56b3c423,
+ 0xcfba9599, 0xb8bda50f, 0x2802b89e, 0x5f058808, 0xc60cd9b2, 0xb10be924,
+ 0x2f6f7c87, 0x58684c11, 0xc1611dab, 0xb6662d3d, 0x76dc4190, 0x01db7106,
+ 0x98d220bc, 0xefd5102a, 0x71b18589, 0x06b6b51f, 0x9fbfe4a5, 0xe8b8d433,
+ 0x7807c9a2, 0x0f00f934, 0x9609a88e, 0xe10e9818, 0x7f6a0dbb, 0x086d3d2d,
+ 0x91646c97, 0xe6635c01, 0x6b6b51f4, 0x1c6c6162, 0x856530d8, 0xf262004e,
+ 0x6c0695ed, 0x1b01a57b, 0x8208f4c1, 0xf50fc457, 0x65b0d9c6, 0x12b7e950,
+ 0x8bbeb8ea, 0xfcb9887c, 0x62dd1ddf, 0x15da2d49, 0x8cd37cf3, 0xfbd44c65,
+ 0x4db26158, 0x3ab551ce, 0xa3bc0074, 0xd4bb30e2, 0x4adfa541, 0x3dd895d7,
+ 0xa4d1c46d, 0xd3d6f4fb, 0x4369e96a, 0x346ed9fc, 0xad678846, 0xda60b8d0,
+ 0x44042d73, 0x33031de5, 0xaa0a4c5f, 0xdd0d7cc9, 0x5005713c, 0x270241aa,
+ 0xbe0b1010, 0xc90c2086, 0x5768b525, 0x206f85b3, 0xb966d409, 0xce61e49f,
+ 0x5edef90e, 0x29d9c998, 0xb0d09822, 0xc7d7a8b4, 0x59b33d17, 0x2eb40d81,
+ 0xb7bd5c3b, 0xc0ba6cad, 0xedb88320, 0x9abfb3b6, 0x03b6e20c, 0x74b1d29a,
+ 0xead54739, 0x9dd277af, 0x04db2615, 0x73dc1683, 0xe3630b12, 0x94643b84,
+ 0x0d6d6a3e, 0x7a6a5aa8, 0xe40ecf0b, 0x9309ff9d, 0x0a00ae27, 0x7d079eb1,
+ 0xf00f9344, 0x8708a3d2, 0x1e01f268, 0x6906c2fe, 0xf762575d, 0x806567cb,
+ 0x196c3671, 0x6e6b06e7, 0xfed41b76, 0x89d32be0, 0x10da7a5a, 0x67dd4acc,
+ 0xf9b9df6f, 0x8ebeeff9, 0x17b7be43, 0x60b08ed5, 0xd6d6a3e8, 0xa1d1937e,
+ 0x38d8c2c4, 0x4fdff252, 0xd1bb67f1, 0xa6bc5767, 0x3fb506dd, 0x48b2364b,
+ 0xd80d2bda, 0xaf0a1b4c, 0x36034af6, 0x41047a60, 0xdf60efc3, 0xa867df55,
+ 0x316e8eef, 0x4669be79, 0xcb61b38c, 0xbc66831a, 0x256fd2a0, 0x5268e236,
+ 0xcc0c7795, 0xbb0b4703, 0x220216b9, 0x5505262f, 0xc5ba3bbe, 0xb2bd0b28,
+ 0x2bb45a92, 0x5cb36a04, 0xc2d7ffa7, 0xb5d0cf31, 0x2cd99e8b, 0x5bdeae1d,
+ 0x9b64c2b0, 0xec63f226, 0x756aa39c, 0x026d930a, 0x9c0906a9, 0xeb0e363f,
+ 0x72076785, 0x05005713, 0x95bf4a82, 0xe2b87a14, 0x7bb12bae, 0x0cb61b38,
+ 0x92d28e9b, 0xe5d5be0d, 0x7cdcefb7, 0x0bdbdf21, 0x86d3d2d4, 0xf1d4e242,
+ 0x68ddb3f8, 0x1fda836e, 0x81be16cd, 0xf6b9265b, 0x6fb077e1, 0x18b74777,
+ 0x88085ae6, 0xff0f6a70, 0x66063bca, 0x11010b5c, 0x8f659eff, 0xf862ae69,
+ 0x616bffd3, 0x166ccf45, 0xa00ae278, 0xd70dd2ee, 0x4e048354, 0x3903b3c2,
+ 0xa7672661, 0xd06016f7, 0x4969474d, 0x3e6e77db, 0xaed16a4a, 0xd9d65adc,
+ 0x40df0b66, 0x37d83bf0, 0xa9bcae53, 0xdebb9ec5, 0x47b2cf7f, 0x30b5ffe9,
+ 0xbdbdf21c, 0xcabac28a, 0x53b39330, 0x24b4a3a6, 0xbad03605, 0xcdd70693,
+ 0x54de5729, 0x23d967bf, 0xb3667a2e, 0xc4614ab8, 0x5d681b02, 0x2a6f2b94,
+ 0xb40bbe37, 0xc30c8ea1, 0x5a05df1b, 0x2d02ef8d
+};
+
+int32_t Utils::Crc32(const string &buf) {
+ uint32_t crc = ~0U;
+ for (uint32_t i = 0; i < buf.size(); ++i) {
+ unsigned char c_data = buf[i];
+ crc = crc32_tab[(crc ^ c_data) & 0xFF] ^ (crc >> 8);
+ }
+ return ((~crc)& 0x7FFFFFFF);
+}
+
string Utils::Trim(const string& source) {
string target = source;
if (!target.empty()) {
@@ -132,6 +193,55 @@ void Utils::Split(const string& source, map<string, int32_t>& result, const stri
}
}
+void Utils::Split(const string& source, map<string, string>& result,
+ const string& delimiter_step1, const string& delimiter_step2) {
+ string item_str;
+ string key_str;
+ string val_str;
+ string::size_type pos1 = 0;
+ string::size_type pos2 = 0;
+ string::size_type pos3 = 0;
+ if (!source.empty()) {
+ pos1 = 0;
+ pos2 = source.find(delimiter_step1);
+ while (string::npos != pos2) {
+ item_str = source.substr(pos1, pos2 - pos1);
+ item_str = Utils::Trim(item_str);
+ pos1 = pos2 + delimiter_step1.length();
+ pos2 = source.find(delimiter_step1, pos1);
+ if (item_str.empty()) {
+ continue;
+ }
+ pos3 = item_str.find(delimiter_step2);
+ if (string::npos == pos3) {
+ continue;
+ }
+ key_str = item_str.substr(0, pos3);
+ val_str = item_str.substr(pos3 + delimiter_step2.length());
+ key_str = Utils::Trim(key_str);
+ val_str = Utils::Trim(val_str);
+ if (key_str.empty()) {
+ continue;
+ }
+ result[key_str] = val_str;
+ }
+ if (pos1 != source.length()) {
+ item_str = source.substr(pos1);
+ item_str = Utils::Trim(item_str);
+ pos3 = item_str.find(delimiter_step2);
+ if (string::npos != pos3) {
+ key_str = item_str.substr(0, pos3);
+ val_str = item_str.substr(pos3 + delimiter_step2.length());
+ key_str = Utils::Trim(key_str);
+ val_str = Utils::Trim(val_str);
+ if (!key_str.empty()) {
+ result[key_str] = val_str;
+ }
+ }
+ }
+ }
+}
+
void Utils::Join(const vector<string>& vec, const string& delimiter, string& target) {
vector<string>::const_iterator it;
target.clear();
@@ -143,6 +253,19 @@ void Utils::Join(const vector<string>& vec, const string& delimiter, string& tar
}
}
+void Utils::Join(const map<string, string>& source, string& target,
+ const string& delimiter_step1, const string& delimiter_step2) {
+ map<string, string>::const_iterator it;
+ target.clear();
+ int count = 0;
+ for (it = source.begin(); it != source.end(); ++it) {
+ if (count++ > 0) {
+ target += delimiter_step1;
+ }
+ target += it->first + delimiter_step2 + it->second;
+ }
+}
+
bool Utils::ValidString(string& err_info, const string& source, bool allow_empty, bool pat_match,
bool check_max_length, unsigned int maxlen) {
if (source.empty()) {
@@ -202,7 +325,8 @@ bool Utils::ValidGroupName(string& err_info, const string& group_name, string& t
int cflags = REG_EXTENDED;
regex_t reg;
regmatch_t pmatch[1];
- const char* patRule = "^[a-zA-Z][\\w-]+$";
+ // const char* patRule = "^[a-zA-Z][\\w-]+$";
+ const char* patRule = "^[a-zA-Z]\\w+$";
regcomp(®, patRule, cflags);
int status = regexec(®, tgt_group_name.c_str(), 1, pmatch, 0);
regfree(®);
@@ -210,8 +334,9 @@ bool Utils::ValidGroupName(string& err_info, const string& group_name, string& t
stringstream ss;
ss << "Illegal parameter: ";
ss << group_name;
- ss << " must begin with a letter,can only contain ";
- ss << "characters,numbers,hyphen,and underscores";
+ ss << " must begin with a letter,can only contain characters,numbers,and underscores";
+ // ss << " must begin with a letter,can only contain ";
+ // ss << "characters,numbers,hyphen,and underscores";
err_info = ss.str();
return false;
}
@@ -281,13 +406,13 @@ int64_t Utils::GetCurrentTimeMillis() {
bool Utils::ValidConfigFile(string& err_info, const string& conf_file) {
FILE *fp = NULL;
-
+
if (conf_file.length() == 0) {
err_info = "Configure file is blank";
return false;
- }
- fp = fopen(conf_file.c_str(),"r");
- if(fp == NULL) {
+ }
+ fp = fopen(conf_file.c_str(), "r");
+ if (fp == NULL) {
err_info = "Open configure file Failed!";
return false;
}
@@ -321,7 +446,7 @@ bool Utils::GetLocalIPV4Address(string& err_info, string& localhost) {
if (0 == strncmp(&ifreq->ifr_name[0], "lo", sizeof("lo"))) {
continue;
}
- memcpy(&if_flag.ifr_name[0],&ifreq->ifr_name[0],sizeof(ifreq->ifr_name));
+ memcpy(&if_flag.ifr_name[0], &ifreq->ifr_name[0], sizeof(ifreq->ifr_name));
if ((ioctl(sockfd, SIOCGIFFLAGS, (char *) &if_flag)) < 0) {
continue;
}
@@ -329,8 +454,8 @@ bool Utils::GetLocalIPV4Address(string& err_info, string& localhost) {
|| !(if_flag.ifr_flags & IFF_UP)) {
continue;
}
-
- if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr),
+
+ if (!strncmp(inet_ntoa(((struct sockaddr_in*)&(ifreq->ifr_addr))->sin_addr),
"127.0.0.1", 7)) {
continue;
}
@@ -344,6 +469,91 @@ bool Utils::GetLocalIPV4Address(string& err_info, string& localhost) {
return false;
}
+int32_t Utils::GetServiceTypeByMethodId(int32_t method_id) {
+ switch (method_id) {
+ // broker write service
+ case rpc_config::kBrokerMethoddProducerRegister:
+ case rpc_config::kBrokerMethoddProducerHeatbeat:
+ case rpc_config::kBrokerMethoddProducerSendMsg:
+ case rpc_config::kBrokerMethoddProducerClose: {
+ return rpc_config::kBrokerWriteService;
+ }
+ // broker read service
+ case rpc_config::kBrokerMethoddConsumerRegister:
+ case rpc_config::kBrokerMethoddConsumerHeatbeat:
+ case rpc_config::kBrokerMethoddConsumerGetMsg:
+ case rpc_config::kBrokerMethoddConsumerCommit:
+ case rpc_config::kBrokerMethoddConsumerClose: {
+ return rpc_config::kBrokerReadService;
+ }
+ // master service
+ case rpc_config::kMasterMethoddProducerRegister:
+ case rpc_config::kMasterMethoddProducerHeatbeat:
+ case rpc_config::kMasterMethoddProducerClose:
+ case rpc_config::kMasterMethoddConsumerRegister:
+ case rpc_config::kMasterMethoddConsumerHeatbeat:
+ case rpc_config::kMasterMethoddConsumerClose:
+ default: {
+ return rpc_config::kMasterService;
+ }
+ }
+}
+
+void Utils::XfsAddrByDns(const map<string, int32_t>& orig_addr_map,
+ map<string, string>& target_addr_map) {
+ hostent* host = NULL;
+ map<string, int32_t>::const_iterator it;
+ for (it = orig_addr_map.begin(); it != orig_addr_map.end(); it++) {
+ char first_char = it->first.c_str()[0];
+ if (isalpha(first_char)) {
+ host = gethostbyname(it->first.c_str());
+ if (host != NULL) {
+ switch (host->h_addrtype) {
+ case AF_INET:
+ case AF_INET6: {
+ char **pptr = NULL;
+ unsigned int addr = 0;
+ char temp_str[32];
+ memset(temp_str, 0, 32);
+ pptr = host->h_addr_list;
+ addr = ((unsigned int *) host->h_addr_list[0])[0];
+ if ((addr & 0xffff) == 0x0a0a) {
+ pptr++;
+ addr = ((unsigned int *) host->h_addr_list[0])[1];
+ }
+ inet_ntop(host->h_addrtype, *pptr, temp_str, sizeof(temp_str));
+ string tempIpaddr = temp_str;
+ if (tempIpaddr.length() > 0) {
+ target_addr_map[it->first] = tempIpaddr;
+ }
+ }
+ break;
+
+ default:
+ break;
+ }
+ }
+ } else {
+ target_addr_map[it->first] = it->first;
+ }
+ }
+}
+
+bool Utils::NeedDnsXfs(const string& masteraddr) {
+ if (masteraddr.length() > 0) {
+ char first_char = masteraddr.c_str()[0];
+ if (isalpha(first_char)) {
+ return true;
+ }
+ }
+ return false;
+}
+
+string Utils::GenBrokerAuthenticateToken(const string& username,
+ const string& usrpassword) {
+ return "";
+}
+
} // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/utils.h b/tubemq-client-twins/tubemq-client-cpp/src/utils.h
new file mode 100644
index 0000000..c61043b
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/utils.h
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_UTILS_H_
+#define TUBEMQ_CLIENT_UTILS_H_
+
+#include <stdint.h>
+
+#include <map>
+#include <string>
+#include <vector>
+
+namespace tubemq {
+
+using std::map;
+using std::string;
+using std::vector;
+
+class Utils {
+ public:
+ // trim string info
+ static int32_t Crc32(const string &buf);
+ static string Trim(const string& source);
+ // split string to vector
+ static void Split(const string& source, vector<string>& result, const string& delimiter);
+ // split string to map<string, int>
+ static void Split(const string& source, map<string, int32_t>& result,
+ const string& delimiter_step1, const string& delimiter_step2);
+ static void Split(const string& source, map<string, string>& result,
+ const string& delimiter_step1, const string& delimiter_step2);
+ static void Join(const vector<string>& vec, const string& delimiter, string& target);
+ static void Join(const map<string, string>& source, string& target,
+ const string& delimiter_step1, const string& delimiter_step2);
+ static bool ValidString(string& err_info, const string& source, bool allow_empty, bool pat_match,
+ bool check_max_length, unsigned int maxlen);
+ static bool ValidGroupName(string& err_info, const string& group_name, string& tgt_group_name);
+ static bool ValidFilterItem(string& err_info, const string& src_filteritem,
+ string& tgt_filteritem);
+ static string Int2str(int32_t data);
+ static string Long2str(int64_t data);
+ static uint32_t IpToInt(const string& ipv4_addr);
+ static int64_t GetCurrentTimeMillis();
+ static bool ValidConfigFile(string& err_info, const string& conf_file);
+ static bool GetLocalIPV4Address(string& err_info, string& localhost);
+ static int32_t GetServiceTypeByMethodId(int32_t method_id);
+ static void XfsAddrByDns(const map<string, int32_t>& orig_addr_map,
+ map<string, string>& target_addr_map);
+ static bool NeedDnsXfs(const string& masteraddr);
+ static string GenBrokerAuthenticateToken(const string& username, const string& usrpassword);
+};
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_UTILS_H_
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/version.h b/tubemq-client-twins/tubemq-client-cpp/src/version.h
new file mode 100644
index 0000000..c479ede
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/version.h
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef TUBEMQ_CLIENT_VERSION_H_
+#define TUBEMQ_CLIENT_VERSION_H_
+
+#include <string>
+
+namespace tubemq {
+
+using std::string;
+
+static const char kTubeMQClientVersion[] = "0.1.0-0.5.0";
+
+} // namespace tubemq
+
+#endif // TUBEMQ_CLIENT_VERSION_H_