You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/04/06 14:07:50 UTC
[rocketmq-client-cpp] 01/01: refactor: return std::unique_ptr not
pointer
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
commit f3c827806fab17356966b69ed086be7af5dd67f8
Author: James Yin <yw...@hotmail.com>
AuthorDate: Tue Apr 6 21:17:42 2021 +0800
refactor: return std::unique_ptr not pointer
---
include/RemotingCommand.h | 8 +--
src/ClientRemotingProcessor.cpp | 19 ++---
src/ClientRemotingProcessor.h | 12 ++--
src/MQClientAPIImpl.cpp | 84 ++++++++++++-----------
src/MQClientAPIImpl.h | 70 ++++++++++---------
src/MQClientInstance.cpp | 35 +++++-----
src/MQClientInstance.h | 16 +++--
src/common/FilterAPI.hpp | 7 +-
src/consumer/DefaultLitePullConsumerImpl.cpp | 41 ++++++-----
src/consumer/DefaultLitePullConsumerImpl.h | 37 +++++-----
src/consumer/DefaultMQPushConsumerImpl.cpp | 14 ++--
src/consumer/DefaultMQPushConsumerImpl.h | 2 +-
src/consumer/MQConsumerInner.h | 3 +-
src/consumer/PullAPIWrapper.cpp | 45 ++++++------
src/consumer/PullAPIWrapper.h | 30 ++++----
src/consumer/RebalanceImpl.cpp | 19 ++---
src/consumer/RebalanceImpl.h | 4 +-
src/consumer/RebalanceLitePullImpl.h | 3 -
src/consumer/RemoteBrokerOffsetStore.cpp | 4 +-
src/io/ByteBuffer.cpp | 8 +--
src/io/ByteBuffer.hpp | 14 ++--
src/io/DefaultByteBuffer.hpp | 15 ++--
src/message/MessageClientIDSetter.cpp | 4 +-
src/producer/DefaultMQProducerImpl.cpp | 49 ++++++-------
src/producer/DefaultMQProducerImpl.h | 36 +++++-----
src/protocol/RemotingCommand.cpp | 8 +--
src/protocol/TopicList.h | 2 +-
src/protocol/body/LockBatchResponseBody.hpp | 4 +-
src/protocol/body/ResetOffsetBody.hpp | 4 +-
src/protocol/body/TopicRouteData.hpp | 4 +-
src/protocol/header/CommandHeader.cpp | 60 +++++++++-------
src/protocol/header/CommandHeader.h | 27 ++++----
src/protocol/header/ReplyMessageRequestHeader.hpp | 4 +-
src/transport/RequestProcessor.h | 2 +-
src/transport/TcpRemotingClient.cpp | 4 +-
test/src/protocol/RemotingCommandTest.cpp | 2 +-
36 files changed, 359 insertions(+), 341 deletions(-)
diff --git a/include/RemotingCommand.h b/include/RemotingCommand.h
index 68847b3..af67c87 100644
--- a/include/RemotingCommand.h
+++ b/include/RemotingCommand.h
@@ -67,7 +67,7 @@ class ROCKETMQCLIENT_API RemotingCommand {
template <class H>
H* decodeCommandCustomHeader(bool useCache = true);
- static RemotingCommand* Decode(ByteArrayRef array, bool hasPackageLength = false);
+ static std::unique_ptr<RemotingCommand> Decode(ByteArrayRef array, bool hasPackageLength = false);
std::string toString() const;
@@ -116,9 +116,9 @@ H* RemotingCommand::decodeCommandCustomHeader(bool useCache) {
}
try {
- H* header = H::Decode(ext_fields_);
- custom_header_.reset(header);
- return header;
+ std::unique_ptr<H> header = H::Decode(ext_fields_);
+ custom_header_ = std::move(header);
+ return static_cast<H*>(custom_header_.get());
} catch (std::exception& e) {
THROW_MQEXCEPTION(RemotingCommandException, e.what(), -1);
}
diff --git a/src/ClientRemotingProcessor.cpp b/src/ClientRemotingProcessor.cpp
index d4fc34c..329d0d4 100644
--- a/src/ClientRemotingProcessor.cpp
+++ b/src/ClientRemotingProcessor.cpp
@@ -35,7 +35,8 @@ ClientRemotingProcessor::ClientRemotingProcessor(MQClientInstance* clientInstanc
ClientRemotingProcessor::~ClientRemotingProcessor() = default;
-RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransportPtr channel, RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::processRequest(TcpTransportPtr channel,
+ RemotingCommand* request) {
const auto& addr = channel->getPeerAddrAndPort();
LOG_DEBUG_NEW("processRequest, code:{}, addr:{}", request->code(), addr);
switch (request->code()) {
@@ -61,7 +62,8 @@ RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransportPtr channel
return nullptr;
}
-RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::string& addr, RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::checkTransactionState(const std::string& addr,
+ RemotingCommand* request) {
auto* requestHeader = request->decodeCommandCustomHeader<CheckTransactionStateRequestHeader>();
assert(requestHeader != nullptr);
@@ -95,14 +97,14 @@ RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::strin
return nullptr;
}
-RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::notifyConsumerIdsChanged(RemotingCommand* request) {
auto* requestHeader = request->decodeCommandCustomHeader<NotifyConsumerIdsChangedRequestHeader>();
LOG_INFO_NEW("notifyConsumerIdsChanged, group:{}", requestHeader->getConsumerGroup());
client_instance_->rebalanceImmediately();
return nullptr;
}
-RemotingCommand* ClientRemotingProcessor::resetOffset(RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::resetOffset(RemotingCommand* request) {
auto* responseHeader = request->decodeCommandCustomHeader<ResetOffsetRequestHeader>();
auto requestBody = request->body();
if (requestBody != nullptr && requestBody->size() > 0) {
@@ -116,7 +118,8 @@ RemotingCommand* ClientRemotingProcessor::resetOffset(RemotingCommand* request)
return nullptr; // as resetOffset is oneWayRPC, do not need return any response
}
-RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(const std::string& addr, RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::getConsumerRunningInfo(const std::string& addr,
+ RemotingCommand* request) {
auto* requestHeader = request->decodeCommandCustomHeader<GetConsumerRunningInfoRequestHeader>();
LOG_INFO_NEW("getConsumerRunningInfo, group:{}", requestHeader->getConsumerGroup());
@@ -137,10 +140,10 @@ RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(const std::stri
response->set_remark("The Consumer Group not exist in this consumer");
}
- return response.release();
+ return response;
}
-RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* request) {
std::unique_ptr<RemotingCommand> response(
new RemotingCommand(MQResponseCode::SYSTEM_ERROR, "not set any response code"));
@@ -192,7 +195,7 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
response->set_remark("process reply message fail");
}
- return response.release();
+ return response;
}
void ClientRemotingProcessor::processReplyMessage(std::unique_ptr<MQMessageExt> replyMsg) {
diff --git a/src/ClientRemotingProcessor.h b/src/ClientRemotingProcessor.h
index ca28297..7812d88 100644
--- a/src/ClientRemotingProcessor.h
+++ b/src/ClientRemotingProcessor.h
@@ -28,13 +28,13 @@ class ClientRemotingProcessor : public RequestProcessor {
ClientRemotingProcessor(MQClientInstance* clientInstance);
virtual ~ClientRemotingProcessor();
- RemotingCommand* processRequest(TcpTransportPtr channel, RemotingCommand* request) override;
+ std::unique_ptr<RemotingCommand> processRequest(TcpTransportPtr channel, RemotingCommand* request) override;
- RemotingCommand* checkTransactionState(const std::string& addr, RemotingCommand* request);
- RemotingCommand* notifyConsumerIdsChanged(RemotingCommand* request);
- RemotingCommand* resetOffset(RemotingCommand* request);
- RemotingCommand* getConsumerRunningInfo(const std::string& addr, RemotingCommand* request);
- RemotingCommand* receiveReplyMessage(RemotingCommand* request);
+ std::unique_ptr<RemotingCommand> checkTransactionState(const std::string& addr, RemotingCommand* request);
+ std::unique_ptr<RemotingCommand> notifyConsumerIdsChanged(RemotingCommand* request);
+ std::unique_ptr<RemotingCommand> resetOffset(RemotingCommand* request);
+ std::unique_ptr<RemotingCommand> getConsumerRunningInfo(const std::string& addr, RemotingCommand* request);
+ std::unique_ptr<RemotingCommand> receiveReplyMessage(RemotingCommand* request);
private:
void processReplyMessage(std::unique_ptr<MQMessageExt> replyMsg);
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index fe76f13..176da3b 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -88,28 +88,28 @@ void MQClientAPIImpl::createTopic(const std::string& addr, const std::string& de
THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
}
-SendResult* MQClientAPIImpl::sendMessage(const std::string& addr,
- const std::string& brokerName,
- const MessagePtr msg,
- std::unique_ptr<SendMessageRequestHeader> requestHeader,
- int timeoutMillis,
- CommunicationMode communicationMode,
- DefaultMQProducerImplPtr producer) {
+std::unique_ptr<SendResult> MQClientAPIImpl::sendMessage(const std::string& addr,
+ const std::string& brokerName,
+ const MessagePtr msg,
+ std::unique_ptr<SendMessageRequestHeader> requestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ DefaultMQProducerImplPtr producer) {
return sendMessage(addr, brokerName, msg, std::move(requestHeader), timeoutMillis, communicationMode, nullptr,
nullptr, nullptr, 0, producer);
}
-SendResult* MQClientAPIImpl::sendMessage(const std::string& addr,
- const std::string& brokerName,
- const MessagePtr msg,
- std::unique_ptr<SendMessageRequestHeader> requestHeader,
- int timeoutMillis,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- TopicPublishInfoPtr topicPublishInfo,
- MQClientInstancePtr instance,
- int retryTimesWhenSendFailed,
- DefaultMQProducerImplPtr producer) {
+std::unique_ptr<SendResult> MQClientAPIImpl::sendMessage(const std::string& addr,
+ const std::string& brokerName,
+ const MessagePtr msg,
+ std::unique_ptr<SendMessageRequestHeader> requestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ TopicPublishInfoPtr topicPublishInfo,
+ MQClientInstancePtr instance,
+ int retryTimesWhenSendFailed,
+ DefaultMQProducerImplPtr producer) {
int code = SEND_MESSAGE;
std::unique_ptr<CommandCustomHeader> header;
@@ -124,7 +124,7 @@ SendResult* MQClientAPIImpl::sendMessage(const std::string& addr,
}
if (code != SEND_MESSAGE && code != SEND_REPLY_MESSAGE) {
- header.reset(SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(requestHeader.get()));
+ header = SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(requestHeader.get());
} else {
header = std::move(requestHeader);
}
@@ -173,20 +173,20 @@ void MQClientAPIImpl::sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw,
remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
}
-SendResult* MQClientAPIImpl::sendMessageSync(const std::string& addr,
- const std::string& brokerName,
- const MessagePtr msg,
- RemotingCommand& request,
- int timeoutMillis) {
+std::unique_ptr<SendResult> MQClientAPIImpl::sendMessageSync(const std::string& addr,
+ const std::string& brokerName,
+ const MessagePtr msg,
+ RemotingCommand& request,
+ int timeoutMillis) {
// block until response
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
return processSendResponse(brokerName, msg, response.get());
}
-SendResult* MQClientAPIImpl::processSendResponse(const std::string& brokerName,
- const MessagePtr msg,
- RemotingCommand* response) {
+std::unique_ptr<SendResult> MQClientAPIImpl::processSendResponse(const std::string& brokerName,
+ const MessagePtr msg,
+ RemotingCommand* response) {
SendStatus sendStatus = SEND_OK;
switch (response->code()) {
case FLUSH_DISK_TIMEOUT:
@@ -227,18 +227,18 @@ SendResult* MQClientAPIImpl::processSendResponse(const std::string& brokerName,
}
}
- SendResult* sendResult =
- new SendResult(sendStatus, uniqMsgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset);
+ std::unique_ptr<SendResult> sendResult(
+ new SendResult(sendStatus, uniqMsgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset));
sendResult->set_transaction_id(responseHeader->transactionId);
return sendResult;
}
-PullResult* MQClientAPIImpl::pullMessage(const std::string& addr,
- PullMessageRequestHeader* requestHeader,
- int timeoutMillis,
- CommunicationMode communicationMode,
- PullCallback* pullCallback) {
+std::unique_ptr<PullResult> MQClientAPIImpl::pullMessage(const std::string& addr,
+ PullMessageRequestHeader* requestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ PullCallback* pullCallback) {
RemotingCommand request(PULL_MESSAGE, requestHeader);
switch (communicationMode) {
@@ -261,13 +261,15 @@ void MQClientAPIImpl::pullMessageAsync(const std::string& addr,
remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
}
-PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis) {
+std::unique_ptr<PullResult> MQClientAPIImpl::pullMessageSync(const std::string& addr,
+ RemotingCommand& request,
+ int timeoutMillis) {
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
assert(response != nullptr);
return processPullResponse(response.get());
}
-PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* response) {
+std::unique_ptr<PullResult> MQClientAPIImpl::processPullResponse(RemotingCommand* response) {
PullStatus pullStatus = NO_NEW_MSG;
switch (response->code()) {
case SUCCESS:
@@ -294,8 +296,9 @@ PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* response) {
auto* responseHeader = response->decodeCommandCustomHeader<PullMessageResponseHeader>();
assert(responseHeader != nullptr);
- return new PullResultExt(pullStatus, responseHeader->nextBeginOffset, responseHeader->minOffset,
- responseHeader->maxOffset, (int)responseHeader->suggestWhichBrokerId, response->body());
+ return std::unique_ptr<PullResult>(new PullResultExt(pullStatus, responseHeader->nextBeginOffset,
+ responseHeader->minOffset, responseHeader->maxOffset,
+ (int)responseHeader->suggestWhichBrokerId, response->body()));
}
MQMessageExt MQClientAPIImpl::viewMessage(const std::string& addr, int64_t phyoffset, int timeoutMillis) {
@@ -622,7 +625,8 @@ void MQClientAPIImpl::unlockBatchMQ(const std::string& addr,
}
}
-TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis) {
+std::unique_ptr<TopicRouteData> MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::string& topic,
+ int timeoutMillis) {
RemotingCommand request(GET_ROUTEINFO_BY_TOPIC, new GetRouteInfoRequestHeader(topic));
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(null, request, timeoutMillis));
@@ -642,7 +646,7 @@ TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::stri
THROW_MQEXCEPTION(MQClientException, response->remark(), response->code());
}
-TopicList* MQClientAPIImpl::getTopicListFromNameServer() {
+std::unique_ptr<TopicList> MQClientAPIImpl::getTopicListFromNameServer() {
RemotingCommand request(GET_ALL_TOPIC_LIST_FROM_NAMESERVER, nullptr);
std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(null, request));
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 535f33d..5bebeff 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -59,32 +59,34 @@ class MQClientAPIImpl {
void createTopic(const std::string& addr, const std::string& defaultTopic, TopicConfig topicConfig);
- SendResult* sendMessage(const std::string& addr,
- const std::string& brokerName,
- const MessagePtr msg,
- std::unique_ptr<SendMessageRequestHeader> requestHeader,
- int timeoutMillis,
- CommunicationMode communicationMode,
- DefaultMQProducerImplPtr producer);
- SendResult* sendMessage(const std::string& addr,
- const std::string& brokerName,
- const MessagePtr msg,
- std::unique_ptr<SendMessageRequestHeader> requestHeader,
- int timeoutMillis,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- TopicPublishInfoPtr topicPublishInfo,
- MQClientInstancePtr instance,
- int retryTimesWhenSendFailed,
- DefaultMQProducerImplPtr producer);
- SendResult* processSendResponse(const std::string& brokerName, const MessagePtr msg, RemotingCommand* pResponse);
-
- PullResult* pullMessage(const std::string& addr,
- PullMessageRequestHeader* requestHeader,
- int timeoutMillis,
- CommunicationMode communicationMode,
- PullCallback* pullCallback);
- PullResult* processPullResponse(RemotingCommand* pResponse);
+ std::unique_ptr<SendResult> sendMessage(const std::string& addr,
+ const std::string& brokerName,
+ const MessagePtr msg,
+ std::unique_ptr<SendMessageRequestHeader> requestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ DefaultMQProducerImplPtr producer);
+ std::unique_ptr<SendResult> sendMessage(const std::string& addr,
+ const std::string& brokerName,
+ const MessagePtr msg,
+ std::unique_ptr<SendMessageRequestHeader> requestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ TopicPublishInfoPtr topicPublishInfo,
+ MQClientInstancePtr instance,
+ int retryTimesWhenSendFailed,
+ DefaultMQProducerImplPtr producer);
+ std::unique_ptr<SendResult> processSendResponse(const std::string& brokerName,
+ const MessagePtr msg,
+ RemotingCommand* pResponse);
+
+ std::unique_ptr<PullResult> pullMessage(const std::string& addr,
+ PullMessageRequestHeader* requestHeader,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ PullCallback* pullCallback);
+ std::unique_ptr<PullResult> processPullResponse(RemotingCommand* pResponse);
MQMessageExt viewMessage(const std::string& addr, int64_t phyoffset, int timeoutMillis);
@@ -141,9 +143,9 @@ class MQClientAPIImpl {
int timeoutMillis,
bool oneway = false);
- TopicRouteData* getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis);
+ std::unique_ptr<TopicRouteData> getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis);
- TopicList* getTopicListFromNameServer();
+ std::unique_ptr<TopicList> getTopicListFromNameServer();
int wipeWritePermOfBroker(const std::string& namesrvAddr, const std::string& brokerName, int timeoutMillis);
@@ -165,11 +167,11 @@ class MQClientAPIImpl {
private:
friend class SendCallbackWrap;
- SendResult* sendMessageSync(const std::string& addr,
- const std::string& brokerName,
- const MessagePtr msg,
- RemotingCommand& request,
- int timeoutMillis);
+ std::unique_ptr<SendResult> sendMessageSync(const std::string& addr,
+ const std::string& brokerName,
+ const MessagePtr msg,
+ RemotingCommand& request,
+ int timeoutMillis);
void sendMessageAsync(const std::string& addr,
const std::string& brokerName,
@@ -184,7 +186,7 @@ class MQClientAPIImpl {
void sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw, int64_t timeoutMillis);
- PullResult* pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis);
+ std::unique_ptr<PullResult> pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis);
void pullMessageAsync(const std::string& addr,
RemotingCommand& request,
diff --git a/src/MQClientInstance.cpp b/src/MQClientInstance.cpp
index 443b7c4..d9ebc4e 100644
--- a/src/MQClientInstance.cpp
+++ b/src/MQClientInstance.cpp
@@ -19,7 +19,6 @@
#include <typeindex>
#include "ClientRemotingProcessor.h"
-#include "protocol/body/ConsumerRunningInfo.h"
#include "Logging.h"
#include "MQAdminImpl.h"
#include "MQClientAPIImpl.h"
@@ -33,6 +32,7 @@
#include "TcpRemotingClient.h"
#include "TopicPublishInfo.hpp"
#include "UtilAll.h"
+#include "protocol/body/ConsumerRunningInfo.h"
namespace rocketmq {
@@ -415,8 +415,7 @@ bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& top
try {
TopicRouteDataPtr topicRouteData;
if (isDefault) {
- topicRouteData.reset(
- mq_client_api_impl_->getTopicRouteInfoFromNameServer(AUTO_CREATE_TOPIC_KEY_TOPIC, 1000 * 3));
+ topicRouteData = mq_client_api_impl_->getTopicRouteInfoFromNameServer(AUTO_CREATE_TOPIC_KEY_TOPIC, 1000 * 3);
if (topicRouteData != nullptr) {
auto& queueDatas = topicRouteData->queue_datas();
for (auto& qd : queueDatas) {
@@ -427,7 +426,7 @@ bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& top
}
LOG_DEBUG_NEW("getTopicRouteInfoFromNameServer is null for topic: {}", topic);
} else {
- topicRouteData.reset(mq_client_api_impl_->getTopicRouteInfoFromNameServer(topic, 1000 * 3));
+ topicRouteData = mq_client_api_impl_->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
}
if (topicRouteData != nullptr) {
LOG_INFO_NEW("updateTopicRouteInfoFromNameServer has data");
@@ -477,19 +476,19 @@ bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& top
return false;
}
-HeartbeatData* MQClientInstance::prepareHeartbeatData() {
- HeartbeatData* pHeartbeatData = new HeartbeatData();
+std::unique_ptr<HeartbeatData> MQClientInstance::prepareHeartbeatData() {
+ std::unique_ptr<HeartbeatData> heartbeat_data(new HeartbeatData());
// clientID
- pHeartbeatData->set_client_id(client_id_);
+ heartbeat_data->set_client_id(client_id_);
// Consumer
- insertConsumerInfoToHeartBeatData(pHeartbeatData);
+ insertConsumerInfoToHeartBeatData(heartbeat_data.get());
// Producer
- insertProducerInfoToHeartBeatData(pHeartbeatData);
+ insertProducerInfoToHeartBeatData(heartbeat_data.get());
- return pHeartbeatData;
+ return heartbeat_data;
}
void MQClientInstance::insertConsumerInfoToHeartBeatData(HeartbeatData* heartbeatData) {
@@ -769,7 +768,7 @@ TopicPublishInfoPtr MQClientInstance::tryToFindTopicPublishInfo(const std::strin
}
}
-FindBrokerResult* MQClientInstance::findBrokerAddressInAdmin(const std::string& brokerName) {
+std::unique_ptr<FindBrokerResult> MQClientInstance::findBrokerAddressInAdmin(const std::string& brokerName) {
BrokerAddrMAP brokerTable(getBrokerAddrTable());
bool found = false;
bool slave = false;
@@ -788,7 +787,7 @@ FindBrokerResult* MQClientInstance::findBrokerAddressInAdmin(const std::string&
brokerTable.clear();
if (found) {
- return new FindBrokerResult(brokerAddr, slave);
+ return std::unique_ptr<FindBrokerResult>(new FindBrokerResult(brokerAddr, slave));
}
return nullptr;
@@ -817,9 +816,9 @@ std::string MQClientInstance::findBrokerAddressInPublish(const std::string& brok
return null;
}
-FindBrokerResult* MQClientInstance::findBrokerAddressInSubscribe(const std::string& brokerName,
- int brokerId,
- bool onlyThisBroker) {
+std::unique_ptr<FindBrokerResult> MQClientInstance::findBrokerAddressInSubscribe(const std::string& brokerName,
+ int brokerId,
+ bool onlyThisBroker) {
std::string brokerAddr;
bool slave = false;
bool found = false;
@@ -846,7 +845,7 @@ FindBrokerResult* MQClientInstance::findBrokerAddressInSubscribe(const std::stri
brokerTable.clear();
if (found) {
- return new FindBrokerResult(brokerAddr, slave);
+ return std::unique_ptr<FindBrokerResult>(new FindBrokerResult(brokerAddr, slave));
}
return nullptr;
@@ -950,7 +949,7 @@ void MQClientInstance::resetOffset(const std::string& group,
}
}
-ConsumerRunningInfo* MQClientInstance::consumerRunningInfo(const std::string& consumerGroup) {
+std::unique_ptr<ConsumerRunningInfo> MQClientInstance::consumerRunningInfo(const std::string& consumerGroup) {
auto* consumer = selectConsumer(consumerGroup);
if (consumer != nullptr) {
std::unique_ptr<ConsumerRunningInfo> runningInfo(consumer->consumerRunningInfo());
@@ -967,7 +966,7 @@ ConsumerRunningInfo* MQClientInstance::consumerRunningInfo(const std::string& co
runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION,
MQVersion::GetVersionDesc(MQVersion::CURRENT_VERSION));
- return runningInfo.release();
+ return runningInfo;
}
}
diff --git a/src/MQClientInstance.h b/src/MQClientInstance.h
index 5a589aa..bf606b1 100644
--- a/src/MQClientInstance.h
+++ b/src/MQClientInstance.h
@@ -22,18 +22,18 @@
#include <set>
#include <utility>
-#include "protocol/body/ConsumerRunningInfo.h"
#include "FindBrokerResult.hpp"
#include "MQClientConfig.h"
-#include "MQException.h"
#include "MQConsumerInner.h"
+#include "MQException.h"
#include "MQMessageQueue.h"
#include "MQProducerInner.h"
#include "ServiceState.h"
#include "TopicPublishInfo.hpp"
+#include "concurrent/executor.hpp"
+#include "protocol/body/ConsumerRunningInfo.h"
#include "protocol/body/TopicRouteData.hpp"
#include "protocol/heartbeat/HeartbeatData.hpp"
-#include "concurrent/executor.hpp"
namespace rocketmq {
@@ -83,9 +83,11 @@ class MQClientInstance {
MQProducerInner* selectProducer(const std::string& group);
MQConsumerInner* selectConsumer(const std::string& group);
- FindBrokerResult* findBrokerAddressInAdmin(const std::string& brokerName);
+ std::unique_ptr<FindBrokerResult> findBrokerAddressInAdmin(const std::string& brokerName);
std::string findBrokerAddressInPublish(const std::string& brokerName);
- FindBrokerResult* findBrokerAddressInSubscribe(const std::string& brokerName, int brokerId, bool onlyThisBroker);
+ std::unique_ptr<FindBrokerResult> findBrokerAddressInSubscribe(const std::string& brokerName,
+ int brokerId,
+ bool onlyThisBroker);
void findConsumerIds(const std::string& topic, const std::string& group, std::vector<std::string>& cids);
@@ -95,7 +97,7 @@ class MQClientInstance {
const std::string& topic,
const std::map<MQMessageQueue, int64_t>& offsetTable);
- ConsumerRunningInfo* consumerRunningInfo(const std::string& consumerGroup);
+ std::unique_ptr<ConsumerRunningInfo> consumerRunningInfo(const std::string& consumerGroup);
public:
TopicPublishInfoPtr tryToFindTopicPublishInfo(const std::string& topic);
@@ -133,7 +135,7 @@ class MQClientInstance {
// heartbeat
void sendHeartbeatToAllBroker();
- HeartbeatData* prepareHeartbeatData();
+ std::unique_ptr<HeartbeatData> prepareHeartbeatData();
void insertConsumerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
void insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
diff --git a/src/common/FilterAPI.hpp b/src/common/FilterAPI.hpp
index 9bb18bb..76096cf 100644
--- a/src/common/FilterAPI.hpp
+++ b/src/common/FilterAPI.hpp
@@ -20,14 +20,15 @@
#include <string> // std::string
#include "MQException.h"
-#include "protocol/heartbeat/SubscriptionData.hpp"
#include "UtilAll.h"
+#include "protocol/heartbeat/SubscriptionData.hpp"
namespace rocketmq {
class FilterAPI {
public:
- static SubscriptionData* buildSubscriptionData(const std::string& topic, const std::string& sub_string) {
+ static std::unique_ptr<SubscriptionData> buildSubscriptionData(const std::string& topic,
+ const std::string& sub_string) {
// delete in Rebalance
std::unique_ptr<SubscriptionData> subscription_data(new SubscriptionData(topic, sub_string));
@@ -52,7 +53,7 @@ class FilterAPI {
}
}
- return subscription_data.release();
+ return subscription_data;
}
};
diff --git a/src/consumer/DefaultLitePullConsumerImpl.cpp b/src/consumer/DefaultLitePullConsumerImpl.cpp
index 3a5fc81..f53f7ed 100644
--- a/src/consumer/DefaultLitePullConsumerImpl.cpp
+++ b/src/consumer/DefaultLitePullConsumerImpl.cpp
@@ -22,11 +22,11 @@
#include "AssignedMessageQueue.hpp"
#include "FilterAPI.hpp"
+#include "LocalFileOffsetStore.h"
#include "MQAdminImpl.h"
#include "MQClientAPIImpl.h"
#include "MQClientInstance.h"
#include "NamespaceUtil.h"
-#include "LocalFileOffsetStore.h"
#include "PullAPIWrapper.h"
#include "PullSysFlag.h"
#include "RebalanceLitePullImpl.h"
@@ -186,7 +186,7 @@ class DefaultLitePullConsumerImpl::PullTaskImpl : public std::enable_shared_from
if (consumer->subscription_type_ == SubscriptionType::SUBSCRIBE) {
subscription_data = consumer->rebalance_impl_->getSubscriptionData(message_queue_.topic());
} else {
- subscription_data = FilterAPI::buildSubscriptionData(message_queue_.topic(), SUB_ALL);
+ subscription_data = FilterAPI::buildSubscriptionData(message_queue_.topic(), SUB_ALL).release();
}
std::unique_ptr<PullResult> pull_result(
@@ -502,8 +502,7 @@ void DefaultLitePullConsumerImpl::subscribe(const std::string& topic, const std:
THROW_MQEXCEPTION(MQClientException, "Topic can not be null or empty.", -1);
}
set_subscription_type(SubscriptionType::SUBSCRIBE);
- auto* subscription_data = FilterAPI::buildSubscriptionData(topic, subExpression);
- rebalance_impl_->setSubscriptionData(topic, subscription_data);
+ rebalance_impl_->setSubscriptionData(topic, FilterAPI::buildSubscriptionData(topic, subExpression));
message_queue_listener_.reset(new MessageQueueListenerImpl(shared_from_this()));
assigned_message_queue_->set_rebalance_impl(rebalance_impl_.get());
@@ -600,29 +599,29 @@ int64_t DefaultLitePullConsumerImpl::fetchConsumeOffset(const MQMessageQueue& me
return rebalance_impl_->computePullFromWhere(messageQueue);
}
-PullResult* DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
- SubscriptionData* subscription_data,
- int64_t offset,
- int max_nums) {
+std::unique_ptr<PullResult> DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
+ SubscriptionData* subscription_data,
+ int64_t offset,
+ int max_nums) {
return pull(mq, subscription_data, offset, max_nums,
getDefaultLitePullConsumerConfig()->consumer_pull_timeout_millis());
}
-PullResult* DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
- SubscriptionData* subscription_data,
- int64_t offset,
- int max_nums,
- long timeout) {
+std::unique_ptr<PullResult> DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
+ SubscriptionData* subscription_data,
+ int64_t offset,
+ int max_nums,
+ long timeout) {
return pullSyncImpl(mq, subscription_data, offset, max_nums,
getDefaultLitePullConsumerConfig()->long_polling_enable(), timeout);
}
-PullResult* DefaultLitePullConsumerImpl::pullSyncImpl(const MQMessageQueue& mq,
- SubscriptionData* subscription_data,
- int64_t offset,
- int max_nums,
- bool block,
- long timeout) {
+std::unique_ptr<PullResult> DefaultLitePullConsumerImpl::pullSyncImpl(const MQMessageQueue& mq,
+ SubscriptionData* subscription_data,
+ int64_t offset,
+ int max_nums,
+ bool block,
+ long timeout) {
if (offset < 0) {
THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
}
@@ -838,8 +837,8 @@ void DefaultLitePullConsumerImpl::registerTopicMessageQueueChangeListener(
}
}
-ConsumerRunningInfo* DefaultLitePullConsumerImpl::consumerRunningInfo() {
- auto* info = new ConsumerRunningInfo();
+std::unique_ptr<ConsumerRunningInfo> DefaultLitePullConsumerImpl::consumerRunningInfo() {
+ std::unique_ptr<ConsumerRunningInfo> info(new ConsumerRunningInfo());
info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(start_time_));
diff --git a/src/consumer/DefaultLitePullConsumerImpl.h b/src/consumer/DefaultLitePullConsumerImpl.h
index 6923799..1f727d9 100755
--- a/src/consumer/DefaultLitePullConsumerImpl.h
+++ b/src/consumer/DefaultLitePullConsumerImpl.h
@@ -21,14 +21,14 @@
#include <mutex> // std::mutex
#include <string> // std::string
-#include "concurrent/blocking_queue.hpp"
-#include "concurrent/executor.hpp"
#include "DefaultLitePullConsumer.h"
-#include "MessageQueueListener.h"
-#include "MessageQueueLock.hpp"
#include "MQClientImpl.h"
#include "MQConsumerInner.h"
+#include "MessageQueueListener.h"
+#include "MessageQueueLock.hpp"
#include "TopicMessageQueueChangeListener.h"
+#include "concurrent/blocking_queue.hpp"
+#include "concurrent/executor.hpp"
namespace rocketmq {
@@ -125,7 +125,7 @@ class DefaultLitePullConsumerImpl : public std::enable_shared_from_this<DefaultL
// offset persistence
void persistConsumerOffset() override;
- ConsumerRunningInfo* consumerRunningInfo() override;
+ std::unique_ptr<ConsumerRunningInfo> consumerRunningInfo() override;
private:
void checkConfig();
@@ -149,18 +149,21 @@ class DefaultLitePullConsumerImpl : public std::enable_shared_from_this<DefaultL
int64_t nextPullOffset(const MQMessageQueue& messageQueue);
int64_t fetchConsumeOffset(const MQMessageQueue& messageQueue);
- PullResult* pull(const MQMessageQueue& mq, SubscriptionData* subscription_data, int64_t offset, int max_nums);
- PullResult* pull(const MQMessageQueue& mq,
- SubscriptionData* subscription_data,
- int64_t offset,
- int max_nums,
- long timeout);
- PullResult* pullSyncImpl(const MQMessageQueue& mq,
- SubscriptionData* subscription_data,
- int64_t offset,
- int max_nums,
- bool block,
- long timeout);
+ std::unique_ptr<PullResult> pull(const MQMessageQueue& mq,
+ SubscriptionData* subscription_data,
+ int64_t offset,
+ int max_nums);
+ std::unique_ptr<PullResult> pull(const MQMessageQueue& mq,
+ SubscriptionData* subscription_data,
+ int64_t offset,
+ int max_nums,
+ long timeout);
+ std::unique_ptr<PullResult> pullSyncImpl(const MQMessageQueue& mq,
+ SubscriptionData* subscription_data,
+ int64_t offset,
+ int max_nums,
+ bool block,
+ long timeout);
void submitConsumeRequest(ConsumeRequest* consume_request);
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 77ba017..1c8a763 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -62,8 +62,8 @@ class DefaultMQPushConsumerImpl::AsyncPullCallback : public AutoDeletePullCallba
return;
}
- pull_result.reset(consumer->pull_api_wrapper_->processPullResult(pull_request_->message_queue(),
- std::move(pull_result), subscription_data_));
+ pull_result = consumer->pull_api_wrapper_->processPullResult(pull_request_->message_queue(), std::move(pull_result),
+ subscription_data_);
switch (pull_result->pull_status()) {
case FOUND: {
int64_t prev_request_offset = pull_request_->next_offset();
@@ -298,8 +298,7 @@ void DefaultMQPushConsumerImpl::checkConfig() {
void DefaultMQPushConsumerImpl::copySubscription() {
for (const auto& it : subscription_) {
LOG_INFO_NEW("buildSubscriptionData: {}, {}", it.first, it.second);
- SubscriptionData* subscriptionData = FilterAPI::buildSubscriptionData(it.first, it.second);
- rebalance_impl_->setSubscriptionData(it.first, subscriptionData);
+ rebalance_impl_->setSubscriptionData(it.first, FilterAPI::buildSubscriptionData(it.first, it.second));
}
switch (getDefaultMQPushConsumerConfig()->message_model()) {
@@ -308,8 +307,7 @@ void DefaultMQPushConsumerImpl::copySubscription() {
case CLUSTERING: {
// auto subscript retry topic
std::string retryTopic = UtilAll::getRetryTopic(client_config_->group_name());
- SubscriptionData* subscriptionData = FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL);
- rebalance_impl_->setSubscriptionData(retryTopic, subscriptionData);
+ rebalance_impl_->setSubscriptionData(retryTopic, FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
break;
}
default:
@@ -572,8 +570,8 @@ void DefaultMQPushConsumerImpl::updateConsumeOffset(const MQMessageQueue& mq, in
}
}
-ConsumerRunningInfo* DefaultMQPushConsumerImpl::consumerRunningInfo() {
- auto* info = new ConsumerRunningInfo();
+std::unique_ptr<ConsumerRunningInfo> DefaultMQPushConsumerImpl::consumerRunningInfo() {
+ std::unique_ptr<ConsumerRunningInfo> info(new ConsumerRunningInfo());
info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, UtilAll::to_string(consume_orderly_));
info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
diff --git a/src/consumer/DefaultMQPushConsumerImpl.h b/src/consumer/DefaultMQPushConsumerImpl.h
index ecac586..28fe385 100755
--- a/src/consumer/DefaultMQPushConsumerImpl.h
+++ b/src/consumer/DefaultMQPushConsumerImpl.h
@@ -100,7 +100,7 @@ class DefaultMQPushConsumerImpl : public std::enable_shared_from_this<DefaultMQP
// offset persistence
void persistConsumerOffset() override;
- ConsumerRunningInfo* consumerRunningInfo() override;
+ std::unique_ptr<ConsumerRunningInfo> consumerRunningInfo() override;
public:
void executePullRequestLater(PullRequestPtr pullRequest, long timeDelay);
diff --git a/src/consumer/MQConsumerInner.h b/src/consumer/MQConsumerInner.h
index d38e839..3daa1b8 100644
--- a/src/consumer/MQConsumerInner.h
+++ b/src/consumer/MQConsumerInner.h
@@ -21,6 +21,7 @@
#include <vector>
#include "ConsumeType.h"
+#include "MQMessageQueue.h"
#include "protocol/heartbeat/SubscriptionData.hpp"
namespace rocketmq {
@@ -48,7 +49,7 @@ class MQConsumerInner {
// offset persistence
virtual void persistConsumerOffset() = 0;
- virtual ConsumerRunningInfo* consumerRunningInfo() = 0;
+ virtual std::unique_ptr<ConsumerRunningInfo> consumerRunningInfo() = 0;
};
} // namespace rocketmq
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index e25f423..16049f2 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -16,11 +16,13 @@
*/
#include "PullAPIWrapper.h"
+#include <memory>
+
#include "ByteBuffer.hpp"
#include "MQClientAPIImpl.h"
#include "MQClientInstance.h"
-#include "MessageDecoder.h"
#include "MessageAccessor.hpp"
+#include "MessageDecoder.h"
#include "PullResultExt.hpp"
#include "PullSysFlag.h"
@@ -50,12 +52,12 @@ int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue& mq) {
return MASTER_ID;
}
-PullResult* PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
- std::unique_ptr<PullResult> pull_result,
- SubscriptionData* subscription_data) {
+std::unique_ptr<PullResult> PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
+ std::unique_ptr<PullResult> pull_result,
+ SubscriptionData* subscription_data) {
auto* pull_result_ext = dynamic_cast<PullResultExt*>(pull_result.get());
if (pull_result_ext == nullptr) {
- return pull_result.release();
+ return pull_result;
}
// update node
@@ -94,28 +96,29 @@ PullResult* PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
}
}
- return new PullResult(pull_result_ext->pull_status(), pull_result_ext->next_begin_offset(),
- pull_result_ext->min_offset(), pull_result_ext->max_offset(), std::move(msg_list_filter_again));
+ return std::unique_ptr<PullResult>(new PullResult(pull_result_ext->pull_status(),
+ pull_result_ext->next_begin_offset(), pull_result_ext->min_offset(),
+ pull_result_ext->max_offset(), std::move(msg_list_filter_again)));
}
-PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq,
- const std::string& subExpression,
- const std::string& expressionType,
- int64_t subVersion,
- int64_t offset,
- int maxNums,
- int sysFlag,
- int64_t commitOffset,
- int brokerSuspendMaxTimeMillis,
- int timeoutMillis,
- CommunicationMode communicationMode,
- PullCallback* pullCallback) {
+std::unique_ptr<PullResult> PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq,
+ const std::string& subExpression,
+ const std::string& expressionType,
+ int64_t subVersion,
+ int64_t offset,
+ int maxNums,
+ int sysFlag,
+ int64_t commitOffset,
+ int brokerSuspendMaxTimeMillis,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ PullCallback* pullCallback) {
std::unique_ptr<FindBrokerResult> findBrokerResult(
client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), recalculatePullFromWhichNode(mq), false));
if (findBrokerResult == nullptr) {
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
- findBrokerResult.reset(
- client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), recalculatePullFromWhichNode(mq), false));
+ findBrokerResult =
+ client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), recalculatePullFromWhichNode(mq), false);
}
if (findBrokerResult != nullptr) {
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 7bdbf5b..01727d3 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -32,22 +32,22 @@ class PullAPIWrapper {
PullAPIWrapper(MQClientInstance* instance, const std::string& consumerGroup);
~PullAPIWrapper();
- PullResult* processPullResult(const MQMessageQueue& mq,
- std::unique_ptr<PullResult> pull_result,
- SubscriptionData* subscriptionData);
+ std::unique_ptr<PullResult> processPullResult(const MQMessageQueue& mq,
+ std::unique_ptr<PullResult> pull_result,
+ SubscriptionData* subscriptionData);
- PullResult* pullKernelImpl(const MQMessageQueue& mq,
- const std::string& subExpression,
- const std::string& expressionType,
- int64_t subVersion,
- int64_t offset,
- int maxNums,
- int sysFlag,
- int64_t commitOffset,
- int brokerSuspendMaxTimeMillis,
- int timeoutMillis,
- CommunicationMode communicationMode,
- PullCallback* pullCallback);
+ std::unique_ptr<PullResult> pullKernelImpl(const MQMessageQueue& mq,
+ const std::string& subExpression,
+ const std::string& expressionType,
+ int64_t subVersion,
+ int64_t offset,
+ int maxNums,
+ int sysFlag,
+ int64_t commitOffset,
+ int brokerSuspendMaxTimeMillis,
+ int timeoutMillis,
+ CommunicationMode communicationMode,
+ PullCallback* pullCallback);
private:
void updatePullFromWhichNode(const MQMessageQueue& mq, int brokerId);
diff --git a/src/consumer/RebalanceImpl.cpp b/src/consumer/RebalanceImpl.cpp
index f890ea9..f97bc16 100644
--- a/src/consumer/RebalanceImpl.cpp
+++ b/src/consumer/RebalanceImpl.cpp
@@ -30,11 +30,7 @@ RebalanceImpl::RebalanceImpl(const std::string& consumerGroup,
allocate_mq_strategy_(allocateMqStrategy),
client_instance_(instance) {}
-RebalanceImpl::~RebalanceImpl() {
- for (auto& it : subscription_inner_) {
- deleteAndZero(it.second);
- }
-}
+RebalanceImpl::~RebalanceImpl() = default;
void RebalanceImpl::unlock(const MQMessageQueue& mq, const bool oneway) {
std::unique_ptr<FindBrokerResult> findBrokerResult(
@@ -474,18 +470,15 @@ TOPIC2SD& RebalanceImpl::getSubscriptionInner() {
SubscriptionData* RebalanceImpl::getSubscriptionData(const std::string& topic) {
const auto& it = subscription_inner_.find(topic);
if (it != subscription_inner_.end()) {
- return it->second;
+ return it->second.get();
}
return nullptr;
}
-void RebalanceImpl::setSubscriptionData(const std::string& topic, SubscriptionData* subscriptionData) noexcept {
- if (subscriptionData != nullptr) {
- const auto& it = subscription_inner_.find(topic);
- if (it != subscription_inner_.end()) {
- deleteAndZero(it->second);
- }
- subscription_inner_[topic] = subscriptionData;
+void RebalanceImpl::setSubscriptionData(const std::string& topic,
+ std::unique_ptr<SubscriptionData> subscription_data) noexcept {
+ if (subscription_data != nullptr) {
+ subscription_inner_[topic] = std::move(subscription_data);
}
}
diff --git a/src/consumer/RebalanceImpl.h b/src/consumer/RebalanceImpl.h
index a44784e..55b33fd 100755
--- a/src/consumer/RebalanceImpl.h
+++ b/src/consumer/RebalanceImpl.h
@@ -32,7 +32,7 @@ namespace rocketmq {
typedef std::map<MQMessageQueue, ProcessQueuePtr> MQ2PQ;
typedef std::map<std::string, std::vector<MQMessageQueue>> TOPIC2MQS;
-typedef std::map<std::string, SubscriptionData*> TOPIC2SD;
+typedef std::map<std::string, std::unique_ptr<SubscriptionData>> TOPIC2SD;
typedef std::map<std::string, std::vector<MQMessageQueue>> BROKER2MQS;
class RebalanceImpl {
@@ -75,7 +75,7 @@ class RebalanceImpl {
public:
TOPIC2SD& getSubscriptionInner();
SubscriptionData* getSubscriptionData(const std::string& topic);
- void setSubscriptionData(const std::string& topic, SubscriptionData* sd) noexcept;
+ void setSubscriptionData(const std::string& topic, std::unique_ptr<SubscriptionData> sd) noexcept;
bool getTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& mqs);
void setTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& mqs);
diff --git a/src/consumer/RebalanceLitePullImpl.h b/src/consumer/RebalanceLitePullImpl.h
index 56b6189..90bcf8e 100755
--- a/src/consumer/RebalanceLitePullImpl.h
+++ b/src/consumer/RebalanceLitePullImpl.h
@@ -23,9 +23,6 @@
namespace rocketmq {
typedef std::map<MQMessageQueue, ProcessQueuePtr> MQ2PQ;
-typedef std::map<std::string, std::vector<MQMessageQueue>> TOPIC2MQS;
-typedef std::map<std::string, SubscriptionData*> TOPIC2SD;
-typedef std::map<std::string, std::vector<MQMessageQueue>> BROKER2MQS;
class RebalanceLitePullImpl : public RebalanceImpl {
public:
diff --git a/src/consumer/RemoteBrokerOffsetStore.cpp b/src/consumer/RemoteBrokerOffsetStore.cpp
index ad2e210..713bf17 100644
--- a/src/consumer/RemoteBrokerOffsetStore.cpp
+++ b/src/consumer/RemoteBrokerOffsetStore.cpp
@@ -151,7 +151,7 @@ void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MQMessageQueue&
if (findBrokerResult == nullptr) {
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
- findBrokerResult.reset(client_instance_->findBrokerAddressInAdmin(mq.broker_name()));
+ findBrokerResult = client_instance_->findBrokerAddressInAdmin(mq.broker_name());
}
if (findBrokerResult != nullptr) {
@@ -177,7 +177,7 @@ int64_t RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MQMessageQue
if (findBrokerResult == nullptr) {
client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
- findBrokerResult.reset(client_instance_->findBrokerAddressInAdmin(mq.broker_name()));
+ findBrokerResult = client_instance_->findBrokerAddressInAdmin(mq.broker_name());
}
if (findBrokerResult != nullptr) {
diff --git a/src/io/ByteBuffer.cpp b/src/io/ByteBuffer.cpp
index f12bd72..0121656 100644
--- a/src/io/ByteBuffer.cpp
+++ b/src/io/ByteBuffer.cpp
@@ -22,16 +22,16 @@
namespace rocketmq {
-ByteBuffer* ByteBuffer::allocate(int32_t capacity) {
+std::unique_ptr<ByteBuffer> ByteBuffer::allocate(int32_t capacity) {
if (capacity < 0) {
throw std::invalid_argument("");
}
- return new DefaultByteBuffer(capacity, capacity);
+ return std::unique_ptr<ByteBuffer>(new DefaultByteBuffer(capacity, capacity));
}
-ByteBuffer* ByteBuffer::wrap(ByteArrayRef array, int32_t offset, int32_t length) {
+std::unique_ptr<ByteBuffer> ByteBuffer::wrap(ByteArrayRef array, int32_t offset, int32_t length) {
try {
- return new DefaultByteBuffer(std::move(array), offset, length);
+ return std::unique_ptr<ByteBuffer>(new DefaultByteBuffer(std::move(array), offset, length));
} catch (const std::exception& x) {
throw std::runtime_error("IndexOutOfBoundsException");
}
diff --git a/src/io/ByteBuffer.hpp b/src/io/ByteBuffer.hpp
index 5a30250..4e3ea04 100644
--- a/src/io/ByteBuffer.hpp
+++ b/src/io/ByteBuffer.hpp
@@ -19,18 +19,18 @@
#include <sstream> // std::stringstream
-#include "ByteArray.h"
#include "Buffer.hpp"
+#include "ByteArray.h"
#include "ByteOrder.h"
namespace rocketmq {
class ByteBuffer : public Buffer<char> {
public:
- static ByteBuffer* allocate(int32_t capacity);
+ static std::unique_ptr<ByteBuffer> allocate(int32_t capacity);
- static ByteBuffer* wrap(ByteArrayRef array, int32_t offset, int32_t length);
- static ByteBuffer* wrap(ByteArrayRef array) { return wrap(array, 0, array->size()); }
+ static std::unique_ptr<ByteBuffer> wrap(ByteArrayRef array, int32_t offset, int32_t length);
+ static std::unique_ptr<ByteBuffer> wrap(ByteArrayRef array) { return wrap(array, 0, array->size()); }
protected:
ByteBuffer(int32_t mark, int32_t pos, int32_t lim, int32_t cap, ByteArrayRef byte_array, int32_t offset)
@@ -46,9 +46,9 @@ class ByteBuffer : public Buffer<char> {
virtual ~ByteBuffer() = default;
public:
- virtual ByteBuffer* slice() = 0;
- virtual ByteBuffer* duplicate() = 0;
- virtual ByteBuffer* asReadOnlyBuffer() = 0;
+ virtual std::unique_ptr<ByteBuffer> slice() = 0;
+ virtual std::unique_ptr<ByteBuffer> duplicate() = 0;
+ virtual std::unique_ptr<ByteBuffer> asReadOnlyBuffer() = 0;
// get/put routine
// ======================
diff --git a/src/io/DefaultByteBuffer.hpp b/src/io/DefaultByteBuffer.hpp
index ca740ba..55b225d 100644
--- a/src/io/DefaultByteBuffer.hpp
+++ b/src/io/DefaultByteBuffer.hpp
@@ -20,6 +20,7 @@
#include <cstdlib> // std::memcpy
#include <algorithm> // std::move
+#include <memory> // std::unique_ptr
#include <typeindex> // std::type_index
#include "ByteBuffer.hpp"
@@ -38,17 +39,19 @@ class DefaultByteBuffer : public ByteBuffer {
: ByteBuffer(mark, pos, lim, cap, std::move(buf), off) {}
public:
- ByteBuffer* slice() override {
- return new DefaultByteBuffer(byte_array_, -1, 0, remaining(), remaining(), position() + offset_);
+ std::unique_ptr<ByteBuffer> slice() override {
+ return std::unique_ptr<ByteBuffer>(
+ new DefaultByteBuffer(byte_array_, -1, 0, remaining(), remaining(), position() + offset_));
}
- ByteBuffer* duplicate() override {
- return new DefaultByteBuffer(byte_array_, mark_value(), position(), limit(), capacity(), offset_);
+ std::unique_ptr<ByteBuffer> duplicate() override {
+ return std::unique_ptr<ByteBuffer>(
+ new DefaultByteBuffer(byte_array_, mark_value(), position(), limit(), capacity(), offset_));
}
- ByteBuffer* asReadOnlyBuffer() override {
+ std::unique_ptr<ByteBuffer> asReadOnlyBuffer() override {
// return new HeapByteBufferR(byte_array_, mark_value(), position(), limit(), capacity(), offset_);
- return nullptr;
+ return std::unique_ptr<ByteBuffer>();
}
char get() override { return (*byte_array_)[ix(nextGetIndex())]; }
diff --git a/src/message/MessageClientIDSetter.cpp b/src/message/MessageClientIDSetter.cpp
index 5560ba1..5dfba1c 100644
--- a/src/message/MessageClientIDSetter.cpp
+++ b/src/message/MessageClientIDSetter.cpp
@@ -37,7 +37,7 @@ MessageClientIDSetter::MessageClientIDSetter() {
std::unique_ptr<ByteBuffer> buffer;
sockaddr* addr = GetSelfIP();
if (addr != nullptr) {
- buffer.reset(ByteBuffer::allocate(SockaddrSize(addr) + 2 + 4));
+ buffer = ByteBuffer::allocate(SockaddrSize(addr) + 2 + 4);
if (addr->sa_family == AF_INET) {
auto* sin = (struct sockaddr_in*)addr;
buffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize));
@@ -49,7 +49,7 @@ MessageClientIDSetter::MessageClientIDSetter() {
}
}
if (buffer == nullptr) {
- buffer.reset(ByteBuffer::allocate(4 + 2 + 4));
+ buffer = ByteBuffer::allocate(4 + 2 + 4);
buffer->putInt(UtilAll::currentTimeMillis());
}
buffer->putShort(UtilAll::getProcessId());
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index 0105bd6..9e9ecc2 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -612,10 +612,10 @@ void DefaultMQProducerImpl::prepareSendRequest(Message& msg, long timeout) {
}
}
-SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- long timeout) {
+std::unique_ptr<SendResult> DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ long timeout) {
Validators::checkMessage(*msg, dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size());
uint64_t beginTimestampFirst = UtilAll::currentTimeMillis();
@@ -647,8 +647,7 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
break;
}
- sendResult.reset(
- sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime));
+ sendResult = sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
endTimestamp = UtilAll::currentTimeMillis();
updateFaultItem(mq.broker_name(), endTimestamp - beginTimestampPrev, false);
switch (communicationMode) {
@@ -664,7 +663,7 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
}
}
- return sendResult.release();
+ return sendResult;
default:
break;
}
@@ -679,7 +678,7 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
} // end of for
if (sendResult != nullptr) {
- return sendResult.release();
+ return sendResult;
}
std::string info = "Send [" + UtilAll::to_string(times) + "] times, still failed, cost [" +
@@ -700,12 +699,12 @@ void DefaultMQProducerImpl::updateFaultItem(const std::string& brokerName, const
mq_fault_strategy_->updateFaultItem(brokerName, currentLatency, isolation);
}
-SendResult* DefaultMQProducerImpl::sendKernelImpl(MessagePtr msg,
- const MQMessageQueue& mq,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- TopicPublishInfoPtr topicPublishInfo,
- long timeout) {
+std::unique_ptr<SendResult> DefaultMQProducerImpl::sendKernelImpl(MessagePtr msg,
+ const MQMessageQueue& mq,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ TopicPublishInfoPtr topicPublishInfo,
+ long timeout) {
uint64_t beginStartTime = UtilAll::currentTimeMillis();
std::string brokerAddr = client_instance_->findBrokerAddressInPublish(mq.broker_name());
if (brokerAddr.empty()) {
@@ -763,7 +762,7 @@ SendResult* DefaultMQProducerImpl::sendKernelImpl(MessagePtr msg,
}
}
- SendResult* sendResult = nullptr;
+ std::unique_ptr<SendResult> sendResult;
switch (communicationMode) {
case ASYNC: {
long costTimeAsync = UtilAll::currentTimeMillis() - beginStartTime;
@@ -825,12 +824,12 @@ bool DefaultMQProducerImpl::tryToCompressMessage(Message& msg) {
return false;
}
-SendResult* DefaultMQProducerImpl::sendSelectImpl(MessagePtr msg,
- MessageQueueSelector* selector,
- void* arg,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- long timeout) {
+std::unique_ptr<SendResult> DefaultMQProducerImpl::sendSelectImpl(MessagePtr msg,
+ MessageQueueSelector* selector,
+ void* arg,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ long timeout) {
auto beginStartTime = UtilAll::currentTimeMillis();
Validators::checkMessage(*msg, dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size());
@@ -869,7 +868,9 @@ TransactionListener* DefaultMQProducerImpl::getCheckListener() {
return nullptr;
};
-TransactionSendResult* DefaultMQProducerImpl::sendMessageInTransactionImpl(MessagePtr msg, void* arg, long timeout) {
+std::unique_ptr<TransactionSendResult> DefaultMQProducerImpl::sendMessageInTransactionImpl(MessagePtr msg,
+ void* arg,
+ long timeout) {
auto* transactionListener = getCheckListener();
if (nullptr == transactionListener) {
THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1);
@@ -879,7 +880,7 @@ TransactionSendResult* DefaultMQProducerImpl::sendMessageInTransactionImpl(Messa
MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_TRANSACTION_PREPARED, "true");
MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_PRODUCER_GROUP, client_config_->group_name());
try {
- sendResult.reset(sendDefaultImpl(msg, SYNC, nullptr, timeout));
+ sendResult = sendDefaultImpl(msg, SYNC, nullptr, timeout);
} catch (MQException& e) {
THROW_MQEXCEPTION(MQClientException, "send message Exception", -1);
}
@@ -923,7 +924,7 @@ TransactionSendResult* DefaultMQProducerImpl::sendMessageInTransactionImpl(Messa
}
// FIXME: setTransactionId will cause OOM?
- TransactionSendResult* transactionSendResult = new TransactionSendResult(*sendResult.get());
+ std::unique_ptr<TransactionSendResult> transactionSendResult(new TransactionSendResult(*sendResult));
transactionSendResult->set_transaction_id(msg->transaction_id());
transactionSendResult->set_local_transaction_state(localTransactionState);
return transactionSendResult;
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index 57eef51..3a3073c 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -127,28 +127,28 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
CheckTransactionStateRequestHeader* checkRequestHeader) override;
private:
- SendResult* sendDefaultImpl(MessagePtr msg,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- long timeout);
-
- SendResult* sendKernelImpl(MessagePtr msg,
- const MQMessageQueue& mq,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- std::shared_ptr<const TopicPublishInfo> topicPublishInfo,
- long timeout);
+ std::unique_ptr<SendResult> sendDefaultImpl(MessagePtr msg,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ long timeout);
+
+ std::unique_ptr<SendResult> sendKernelImpl(MessagePtr msg,
+ const MQMessageQueue& mq,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ std::shared_ptr<const TopicPublishInfo> topicPublishInfo,
+ long timeout);
bool tryToCompressMessage(Message& msg);
- SendResult* sendSelectImpl(MessagePtr msg,
- MessageQueueSelector* selector,
- void* arg,
- CommunicationMode communicationMode,
- SendCallback* sendCallback,
- long timeout);
+ std::unique_ptr<SendResult> sendSelectImpl(MessagePtr msg,
+ MessageQueueSelector* selector,
+ void* arg,
+ CommunicationMode communicationMode,
+ SendCallback* sendCallback,
+ long timeout);
- TransactionSendResult* sendMessageInTransactionImpl(MessagePtr msg, void* arg, long timeout);
+ std::unique_ptr<TransactionSendResult> sendMessageInTransactionImpl(MessagePtr msg, void* arg, long timeout);
void endTransaction(SendResult& sendResult,
LocalTransactionState localTransactionState,
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index 4e6284f..02038fe 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -22,8 +22,8 @@
#include <atomic> // std::atomic
#include <limits> // std::numeric_limits
-#include "ByteOrder.h"
#include "ByteBuffer.hpp"
+#include "ByteOrder.h"
#include "Logging.h"
#include "MQVersion.h"
#include "RemotingSerializable.h"
@@ -133,7 +133,7 @@ static inline int32_t getHeaderLength(int32_t length) {
return length & 0x00FFFFFF;
}
-static RemotingCommand* Decode(ByteBuffer& byteBuffer, bool hasPackageLength) {
+static std::unique_ptr<RemotingCommand> Decode(ByteBuffer& byteBuffer, bool hasPackageLength) {
// decode package: [4 bytes(packageLength) +] 4 bytes(headerLength) + header + body
int32_t length = byteBuffer.limit();
@@ -196,10 +196,10 @@ static RemotingCommand* Decode(ByteBuffer& byteBuffer, bool hasPackageLength) {
LOG_DEBUG_NEW("code:{}, language:{}, version:{}, opaque:{}, flag:{}, remark:{}, headLen:{}, bodyLen:{}", code,
language, version, opaque, flag, remark, headerLength, bodyLength);
- return cmd.release();
+ return cmd;
}
-RemotingCommand* RemotingCommand::Decode(ByteArrayRef array, bool hasPackageLength) {
+std::unique_ptr<RemotingCommand> RemotingCommand::Decode(ByteArrayRef array, bool hasPackageLength) {
std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::wrap(std::move(array)));
return rocketmq::Decode(*byteBuffer, hasPackageLength);
}
diff --git a/src/protocol/TopicList.h b/src/protocol/TopicList.h
index bbcb7cc..af004c0 100644
--- a/src/protocol/TopicList.h
+++ b/src/protocol/TopicList.h
@@ -26,7 +26,7 @@ namespace rocketmq {
class TopicList {
public:
- static TopicList* Decode(const ByteArray& mem) { return new TopicList(); }
+ static std::unique_ptr<TopicList> Decode(const ByteArray& mem) { return std::unique_ptr<TopicList>(new TopicList()); }
private:
std::vector<std::string> topic_list_;
diff --git a/src/protocol/body/LockBatchResponseBody.hpp b/src/protocol/body/LockBatchResponseBody.hpp
index af16bf2..f443f88 100644
--- a/src/protocol/body/LockBatchResponseBody.hpp
+++ b/src/protocol/body/LockBatchResponseBody.hpp
@@ -28,7 +28,7 @@ namespace rocketmq {
class LockBatchResponseBody {
public:
- static LockBatchResponseBody* Decode(const ByteArray& bodyData) {
+ static std::unique_ptr<LockBatchResponseBody> Decode(const ByteArray& bodyData) {
Json::Value root = RemotingSerializable::fromJson(bodyData);
auto& mqs = root["lockOKMQSet"];
std::unique_ptr<LockBatchResponseBody> body(new LockBatchResponseBody());
@@ -37,7 +37,7 @@ class LockBatchResponseBody {
LOG_INFO_NEW("LockBatchResponseBody MQ:{}", mq.toString());
body->lock_ok_mq_set().push_back(std::move(mq));
}
- return body.release();
+ return body;
}
public:
diff --git a/src/protocol/body/ResetOffsetBody.hpp b/src/protocol/body/ResetOffsetBody.hpp
index 157e567..f5a5bfc 100644
--- a/src/protocol/body/ResetOffsetBody.hpp
+++ b/src/protocol/body/ResetOffsetBody.hpp
@@ -26,7 +26,7 @@ namespace rocketmq {
class ResetOffsetBody {
public:
- static ResetOffsetBody* Decode(const ByteArray& bodyData) {
+ static std::unique_ptr<ResetOffsetBody> Decode(const ByteArray& bodyData) {
// FIXME: object as key
Json::Value root = RemotingSerializable::fromJson(bodyData);
auto& qds = root["offsetTable"];
@@ -38,7 +38,7 @@ class ResetOffsetBody {
int64_t offset = qds[member].asInt64();
body->offset_table_.emplace(std::move(mq), offset);
}
- return body.release();
+ return body;
}
public:
diff --git a/src/protocol/body/TopicRouteData.hpp b/src/protocol/body/TopicRouteData.hpp
index 38e1889..887bcc3 100644
--- a/src/protocol/body/TopicRouteData.hpp
+++ b/src/protocol/body/TopicRouteData.hpp
@@ -106,7 +106,7 @@ typedef std::shared_ptr<TopicRouteData> TopicRouteDataPtr;
class TopicRouteData {
public:
- static TopicRouteData* Decode(const ByteArray& bodyData) {
+ static std::unique_ptr<TopicRouteData> Decode(const ByteArray& bodyData) {
Json::Value root = RemotingSerializable::fromJson(bodyData);
std::unique_ptr<TopicRouteData> trd(new TopicRouteData());
@@ -136,7 +136,7 @@ class TopicRouteData {
}
sort(trd->broker_datas().begin(), trd->broker_datas().end());
- return trd.release();
+ return trd;
}
/**
diff --git a/src/protocol/header/CommandHeader.cpp b/src/protocol/header/CommandHeader.cpp
index 4a7e7ca..b75757c 100644
--- a/src/protocol/header/CommandHeader.cpp
+++ b/src/protocol/header/CommandHeader.cpp
@@ -95,7 +95,7 @@ void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::str
// CheckTransactionStateRequestHeader
//######################################
-CheckTransactionStateRequestHeader* CheckTransactionStateRequestHeader::Decode(
+std::unique_ptr<CheckTransactionStateRequestHeader> CheckTransactionStateRequestHeader::Decode(
std::map<std::string, std::string>& extFields) {
std::unique_ptr<CheckTransactionStateRequestHeader> header(new CheckTransactionStateRequestHeader());
header->tranStateTableOffset = std::stoll(extFields.at("tranStateTableOffset"));
@@ -116,7 +116,7 @@ CheckTransactionStateRequestHeader* CheckTransactionStateRequestHeader::Decode(
header->offsetMsgId = it->second;
}
- return header.release();
+ return header;
}
void CheckTransactionStateRequestHeader::SetDeclaredFieldOfCommandHeader(
@@ -246,8 +246,9 @@ void SendMessageRequestHeader::setReconsumeTimes(int _reconsumeTimes) {
// SendMessageRequestHeaderV2
//######################################
-SendMessageRequestHeaderV2* SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(SendMessageRequestHeader* v1) {
- SendMessageRequestHeaderV2* v2 = new SendMessageRequestHeaderV2();
+std::unique_ptr<SendMessageRequestHeaderV2> SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(
+ SendMessageRequestHeader* v1) {
+ std::unique_ptr<SendMessageRequestHeaderV2> v2(new SendMessageRequestHeaderV2());
v2->a = v1->producerGroup;
v2->b = v1->topic;
v2->c = v1->defaultTopic;
@@ -312,7 +313,8 @@ void SendMessageRequestHeaderV2::SetDeclaredFieldOfCommandHeader(std::map<std::s
// SendMessageResponseHeader
//######################################
-SendMessageResponseHeader* SendMessageResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<SendMessageResponseHeader> SendMessageResponseHeader::Decode(
+ std::map<std::string, std::string>& extFields) {
std::unique_ptr<SendMessageResponseHeader> header(new SendMessageResponseHeader());
header->msgId = extFields.at("msgId");
header->queueId = std::stoi(extFields.at("queueId"));
@@ -323,7 +325,7 @@ SendMessageResponseHeader* SendMessageResponseHeader::Decode(std::map<std::strin
header->transactionId = it->second;
}
- return header.release();
+ return header;
}
void SendMessageResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -378,13 +380,14 @@ void PullMessageRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::str
// PullMessageResponseHeader
//######################################
-PullMessageResponseHeader* PullMessageResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<PullMessageResponseHeader> PullMessageResponseHeader::Decode(
+ std::map<std::string, std::string>& extFields) {
std::unique_ptr<PullMessageResponseHeader> header(new PullMessageResponseHeader());
header->suggestWhichBrokerId = std::stoll(extFields.at("suggestWhichBrokerId"));
header->nextBeginOffset = std::stoll(extFields.at("nextBeginOffset"));
header->minOffset = std::stoll(extFields.at("minOffset"));
header->maxOffset = std::stoll(extFields.at("maxOffset"));
- return header.release();
+ return header;
}
void PullMessageResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -422,10 +425,11 @@ void GetMinOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::st
// GetMinOffsetResponseHeader
//######################################
-GetMinOffsetResponseHeader* GetMinOffsetResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<GetMinOffsetResponseHeader> GetMinOffsetResponseHeader::Decode(
+ std::map<std::string, std::string>& extFields) {
std::unique_ptr<GetMinOffsetResponseHeader> header(new GetMinOffsetResponseHeader());
header->offset = std::stoll(extFields.at("offset"));
- return header.release();
+ return header;
}
void GetMinOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -451,10 +455,11 @@ void GetMaxOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::st
// GetMaxOffsetResponseHeader
//######################################
-GetMaxOffsetResponseHeader* GetMaxOffsetResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<GetMaxOffsetResponseHeader> GetMaxOffsetResponseHeader::Decode(
+ std::map<std::string, std::string>& extFields) {
std::unique_ptr<GetMaxOffsetResponseHeader> header(new GetMaxOffsetResponseHeader());
header->offset = std::stoll(extFields.at("offset"));
- return header.release();
+ return header;
}
void GetMaxOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -482,10 +487,11 @@ void SearchOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::st
// SearchOffsetResponseHeader
//######################################
-SearchOffsetResponseHeader* SearchOffsetResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<SearchOffsetResponseHeader> SearchOffsetResponseHeader::Decode(
+ std::map<std::string, std::string>& extFields) {
std::unique_ptr<SearchOffsetResponseHeader> header(new SearchOffsetResponseHeader());
header->offset = std::stoll(extFields.at("offset"));
- return header.release();
+ return header;
}
void SearchOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -524,11 +530,11 @@ void GetEarliestMsgStoretimeRequestHeader::SetDeclaredFieldOfCommandHeader(
// GetEarliestMsgStoretimeResponseHeader
//######################################
-GetEarliestMsgStoretimeResponseHeader* GetEarliestMsgStoretimeResponseHeader::Decode(
+std::unique_ptr<GetEarliestMsgStoretimeResponseHeader> GetEarliestMsgStoretimeResponseHeader::Decode(
std::map<std::string, std::string>& extFields) {
std::unique_ptr<GetEarliestMsgStoretimeResponseHeader> header(new GetEarliestMsgStoretimeResponseHeader());
header->timestamp = std::stoll(extFields.at("timestamp"));
- return header.release();
+ return header;
}
void GetEarliestMsgStoretimeResponseHeader::SetDeclaredFieldOfCommandHeader(
@@ -570,11 +576,11 @@ void QueryConsumerOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<
// QueryConsumerOffsetResponseHeader
//######################################
-QueryConsumerOffsetResponseHeader* QueryConsumerOffsetResponseHeader::Decode(
+std::unique_ptr<QueryConsumerOffsetResponseHeader> QueryConsumerOffsetResponseHeader::Decode(
std::map<std::string, std::string>& extFields) {
std::unique_ptr<QueryConsumerOffsetResponseHeader> header(new QueryConsumerOffsetResponseHeader());
header->offset = std::stoll(extFields.at("offset"));
- return header.release();
+ return header;
}
void QueryConsumerOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
@@ -642,14 +648,15 @@ void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<
// GetConsumerListByGroupResponseBody
//######################################
-GetConsumerListByGroupResponseBody* GetConsumerListByGroupResponseBody::Decode(const ByteArray& bodyData) {
+std::unique_ptr<GetConsumerListByGroupResponseBody> GetConsumerListByGroupResponseBody::Decode(
+ const ByteArray& bodyData) {
Json::Value root = RemotingSerializable::fromJson(bodyData);
auto& ids = root["consumerIdList"];
std::unique_ptr<GetConsumerListByGroupResponseBody> body(new GetConsumerListByGroupResponseBody());
for (unsigned int i = 0; i < ids.size(); i++) {
body->consumerIdList.push_back(ids[i].asString());
}
- return body.release();
+ return body;
}
void GetConsumerListByGroupResponseBody::SetDeclaredFieldOfCommandHeader(
@@ -659,7 +666,8 @@ void GetConsumerListByGroupResponseBody::SetDeclaredFieldOfCommandHeader(
// ResetOffsetRequestHeader
//######################################
-ResetOffsetRequestHeader* ResetOffsetRequestHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<ResetOffsetRequestHeader> ResetOffsetRequestHeader::Decode(
+ std::map<std::string, std::string>& extFields) {
std::unique_ptr<ResetOffsetRequestHeader> header(new ResetOffsetRequestHeader());
header->topic = extFields.at("topic");
header->group = extFields.at("group");
@@ -667,7 +675,7 @@ ResetOffsetRequestHeader* ResetOffsetRequestHeader::Decode(std::map<std::string,
header->isForce = UtilAll::stob(extFields.at("isForce"));
LOG_INFO_NEW("topic:{}, group:{}, timestamp:{}, isForce:{}", header->topic, header->group, header->timestamp,
header->isForce);
- return header.release();
+ return header;
}
void ResetOffsetRequestHeader::setTopic(const std::string& tmp) {
@@ -706,7 +714,7 @@ const bool ResetOffsetRequestHeader::getForceFlag() const {
// GetConsumerRunningInfoRequestHeader
//######################################
-GetConsumerRunningInfoRequestHeader* GetConsumerRunningInfoRequestHeader::Decode(
+std::unique_ptr<GetConsumerRunningInfoRequestHeader> GetConsumerRunningInfoRequestHeader::Decode(
std::map<std::string, std::string>& extFields) {
std::unique_ptr<GetConsumerRunningInfoRequestHeader> header(new GetConsumerRunningInfoRequestHeader());
header->consumerGroup = extFields.at("consumerGroup");
@@ -714,7 +722,7 @@ GetConsumerRunningInfoRequestHeader* GetConsumerRunningInfoRequestHeader::Decode
header->jstackEnable = UtilAll::stob(extFields.at("jstackEnable"));
LOG_INFO("consumerGroup:%s, clientId:%s, jstackEnable:%d", header->consumerGroup.c_str(), header->clientId.c_str(),
header->jstackEnable);
- return header.release();
+ return header;
}
void GetConsumerRunningInfoRequestHeader::Encode(Json::Value& extFields) {
@@ -758,11 +766,11 @@ void GetConsumerRunningInfoRequestHeader::setJstackEnable(const bool& jstackEnab
// NotifyConsumerIdsChangedRequestHeader
//######################################
-NotifyConsumerIdsChangedRequestHeader* NotifyConsumerIdsChangedRequestHeader::Decode(
+std::unique_ptr<NotifyConsumerIdsChangedRequestHeader> NotifyConsumerIdsChangedRequestHeader::Decode(
std::map<std::string, std::string>& extFields) {
std::unique_ptr<NotifyConsumerIdsChangedRequestHeader> header(new NotifyConsumerIdsChangedRequestHeader());
header->consumerGroup = extFields.at("consumerGroup");
- return header.release();
+ return header;
}
const std::string& NotifyConsumerIdsChangedRequestHeader::getConsumerGroup() const {
diff --git a/src/protocol/header/CommandHeader.h b/src/protocol/header/CommandHeader.h
index 31c10eb..8963510 100644
--- a/src/protocol/header/CommandHeader.h
+++ b/src/protocol/header/CommandHeader.h
@@ -17,6 +17,7 @@
#ifndef ROCKETMQ_PROTOCOL_COMMANDHEADER_H_
#define ROCKETMQ_PROTOCOL_COMMANDHEADER_H_
+#include <memory> // std::unique_ptr
#include <vector> // std::vector
#include "ByteArray.h"
@@ -71,7 +72,7 @@ class CheckTransactionStateRequestHeader : public CommandCustomHeader {
public:
CheckTransactionStateRequestHeader() : tranStateTableOffset(0), commitLogOffset(0) {}
- static CheckTransactionStateRequestHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<CheckTransactionStateRequestHeader> Decode(std::map<std::string, std::string>& extFields);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
std::string toString() const;
@@ -139,7 +140,7 @@ class SendMessageRequestHeader : public CommandCustomHeader {
class SendMessageRequestHeaderV2 : public CommandCustomHeader {
public:
- static SendMessageRequestHeaderV2* createSendMessageRequestHeaderV2(SendMessageRequestHeader* v1);
+ static std::unique_ptr<SendMessageRequestHeaderV2> createSendMessageRequestHeaderV2(SendMessageRequestHeader* v1);
void Encode(Json::Value& outData) override;
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
@@ -167,7 +168,7 @@ class SendMessageResponseHeader : public CommandCustomHeader {
public:
SendMessageResponseHeader() : queueId(0), queueOffset(0) {}
- static SendMessageResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<SendMessageResponseHeader> Decode(std::map<std::string, std::string>& extFields);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
public:
@@ -209,7 +210,7 @@ class PullMessageResponseHeader : public CommandCustomHeader {
public:
PullMessageResponseHeader() : suggestWhichBrokerId(0), nextBeginOffset(0), minOffset(0), maxOffset(0) {}
- static PullMessageResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<PullMessageResponseHeader> Decode(std::map<std::string, std::string>& extFields);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
public:
@@ -241,7 +242,7 @@ class GetMinOffsetResponseHeader : public CommandCustomHeader {
public:
GetMinOffsetResponseHeader() : offset(0) {}
- static GetMinOffsetResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<GetMinOffsetResponseHeader> Decode(std::map<std::string, std::string>& extFields);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
public:
@@ -264,7 +265,7 @@ class GetMaxOffsetResponseHeader : public CommandCustomHeader {
public:
GetMaxOffsetResponseHeader() : offset(0) {}
- static GetMaxOffsetResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<GetMaxOffsetResponseHeader> Decode(std::map<std::string, std::string>& extFields);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
public:
@@ -288,7 +289,7 @@ class SearchOffsetResponseHeader : public CommandCustomHeader {
public:
SearchOffsetResponseHeader() : offset(0) {}
- static SearchOffsetResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<SearchOffsetResponseHeader> Decode(std::map<std::string, std::string>& extFields);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
public:
@@ -322,7 +323,7 @@ class GetEarliestMsgStoretimeResponseHeader : public CommandCustomHeader {
public:
GetEarliestMsgStoretimeResponseHeader() : timestamp(0) {}
- static GetEarliestMsgStoretimeResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<GetEarliestMsgStoretimeResponseHeader> Decode(std::map<std::string, std::string>& extFields);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
public:
@@ -355,7 +356,7 @@ class QueryConsumerOffsetResponseHeader : public CommandCustomHeader {
public:
QueryConsumerOffsetResponseHeader() : offset(0) {}
- static QueryConsumerOffsetResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<QueryConsumerOffsetResponseHeader> Decode(std::map<std::string, std::string>& extFields);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
public:
@@ -395,7 +396,7 @@ class ConsumerSendMsgBackRequestHeader : public CommandCustomHeader {
class GetConsumerListByGroupResponseBody : public CommandCustomHeader {
public:
- static GetConsumerListByGroupResponseBody* Decode(const ByteArray& bodyData);
+ static std::unique_ptr<GetConsumerListByGroupResponseBody> Decode(const ByteArray& bodyData);
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
public:
@@ -406,7 +407,7 @@ class ResetOffsetRequestHeader : public CommandCustomHeader {
public:
ResetOffsetRequestHeader() : timestamp(0), isForce(false) {}
- static ResetOffsetRequestHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<ResetOffsetRequestHeader> Decode(std::map<std::string, std::string>& extFields);
const std::string& getTopic() const;
void setTopic(const std::string& tmp);
@@ -431,7 +432,7 @@ class GetConsumerRunningInfoRequestHeader : public CommandCustomHeader {
public:
GetConsumerRunningInfoRequestHeader() : jstackEnable(false) {}
- static GetConsumerRunningInfoRequestHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<GetConsumerRunningInfoRequestHeader> Decode(std::map<std::string, std::string>& extFields);
void Encode(Json::Value& extFields) override;
void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
@@ -452,7 +453,7 @@ class GetConsumerRunningInfoRequestHeader : public CommandCustomHeader {
class NotifyConsumerIdsChangedRequestHeader : public CommandCustomHeader {
public:
- static NotifyConsumerIdsChangedRequestHeader* Decode(std::map<std::string, std::string>& extFields);
+ static std::unique_ptr<NotifyConsumerIdsChangedRequestHeader> Decode(std::map<std::string, std::string>& extFields);
const std::string& getConsumerGroup() const;
void setConsumerGroup(const std::string& tmp);
diff --git a/src/protocol/header/ReplyMessageRequestHeader.hpp b/src/protocol/header/ReplyMessageRequestHeader.hpp
index 3aaab7b..bfeb35a 100644
--- a/src/protocol/header/ReplyMessageRequestHeader.hpp
+++ b/src/protocol/header/ReplyMessageRequestHeader.hpp
@@ -26,7 +26,7 @@ namespace rocketmq {
class ReplyMessageRequestHeader : public CommandCustomHeader {
public:
- static ReplyMessageRequestHeader* Decode(std::map<std::string, std::string>& extFields) {
+ static std::unique_ptr<ReplyMessageRequestHeader> Decode(std::map<std::string, std::string>& extFields) {
std::unique_ptr<ReplyMessageRequestHeader> header(new ReplyMessageRequestHeader());
header->producer_group_ = extFields.at("producerGroup");
@@ -61,7 +61,7 @@ class ReplyMessageRequestHeader : public CommandCustomHeader {
header->store_host_ = extFields.at("storeHost");
header->store_timestamp_ = std::stoll(extFields.at("storeTimestamp"));
- return header.release();
+ return header;
}
public:
diff --git a/src/transport/RequestProcessor.h b/src/transport/RequestProcessor.h
index 2e62391..15b5861 100644
--- a/src/transport/RequestProcessor.h
+++ b/src/transport/RequestProcessor.h
@@ -26,7 +26,7 @@ class RequestProcessor {
public:
virtual ~RequestProcessor() = default;
- virtual RemotingCommand* processRequest(TcpTransportPtr channel, RemotingCommand* request) = 0;
+ virtual std::unique_ptr<RemotingCommand> processRequest(TcpTransportPtr channel, RemotingCommand* request) = 0;
};
} // namespace rocketmq
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 655a811..c524cb7 100644
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -564,7 +564,7 @@ void TcpRemotingClient::messageReceived(ByteArrayRef msg, TcpTransportPtr channe
void TcpRemotingClient::processMessageReceived(ByteArrayRef msg, TcpTransportPtr channel) {
std::unique_ptr<RemotingCommand> cmd;
try {
- cmd.reset(RemotingCommand::Decode(std::move(msg)));
+ cmd = RemotingCommand::Decode(std::move(msg));
} catch (...) {
LOG_ERROR_NEW("processMessageReceived error");
return;
@@ -635,7 +635,7 @@ void TcpRemotingClient::processRequestCommand(std::unique_ptr<RemotingCommand> r
auto* processor = it->second;
doBeforeRpcHooks(channel->getPeerAddrAndPort(), *requestCommand, false);
- response.reset(processor->processRequest(channel, requestCommand.get()));
+ response = processor->processRequest(channel, requestCommand.get());
doAfterRpcHooks(channel->getPeerAddrAndPort(), *response, response.get(), true);
} catch (std::exception& e) {
LOG_ERROR_NEW("process request exception. {}", e.what());
diff --git a/test/src/protocol/RemotingCommandTest.cpp b/test/src/protocol/RemotingCommandTest.cpp
index 42ee771..c5d0627 100644
--- a/test/src/protocol/RemotingCommandTest.cpp
+++ b/test/src/protocol/RemotingCommandTest.cpp
@@ -156,7 +156,7 @@ TEST(RemotingCommandTest, EncodeAndDecode) {
remotingCommand2.set_body("123123");
package = remotingCommand2.encode();
- decodeRemtingCommand.reset(RemotingCommand::Decode(package, true));
+ decodeRemtingCommand = RemotingCommand::Decode(package, true);
auto* header = decodeRemtingCommand->decodeCommandCustomHeader<GetConsumerRunningInfoRequestHeader>();
EXPECT_EQ(requestHeader->getClientId(), header->getClientId());