You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/04/06 13:52:59 UTC

[rocketmq-client-cpp] branch re_dev updated: refactor: return std::unique_ptr not pointer

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/re_dev by this push:
     new 1a57cbd  refactor: return std::unique_ptr not pointer
1a57cbd is described below

commit 1a57cbde0f3ef6b6111d76d5287786577e09a4d4
Author: James Yin <yw...@hotmail.com>
AuthorDate: Tue Apr 6 21:17:42 2021 +0800

    refactor: return std::unique_ptr not pointer
---
 include/RemotingCommand.h                         |  8 +--
 src/ClientRemotingProcessor.cpp                   | 19 ++---
 src/ClientRemotingProcessor.h                     | 12 ++--
 src/MQClientAPIImpl.cpp                           | 84 ++++++++++++-----------
 src/MQClientAPIImpl.h                             | 70 ++++++++++---------
 src/MQClientInstance.cpp                          | 36 +++++-----
 src/MQClientInstance.h                            | 16 +++--
 src/common/FilterAPI.hpp                          |  4 +-
 src/consumer/DefaultLitePullConsumerImpl.cpp      | 46 ++++++-------
 src/consumer/DefaultLitePullConsumerImpl.h        | 37 +++++-----
 src/consumer/DefaultMQPushConsumerImpl.cpp        | 14 ++--
 src/consumer/DefaultMQPushConsumerImpl.h          |  2 +-
 src/consumer/MQConsumerInner.h                    |  3 +-
 src/consumer/PullAPIWrapper.cpp                   | 45 ++++++------
 src/consumer/PullAPIWrapper.h                     | 30 ++++----
 src/consumer/RebalanceImpl.cpp                    | 19 ++---
 src/consumer/RebalanceImpl.h                      |  4 +-
 src/consumer/RebalanceLitePullImpl.h              |  3 -
 src/consumer/RemoteBrokerOffsetStore.cpp          |  4 +-
 src/io/ByteBuffer.cpp                             |  8 +--
 src/io/ByteBuffer.hpp                             | 14 ++--
 src/io/DefaultByteBuffer.hpp                      | 15 ++--
 src/message/MessageClientIDSetter.cpp             |  4 +-
 src/producer/DefaultMQProducerImpl.cpp            | 49 ++++++-------
 src/producer/DefaultMQProducerImpl.h              | 36 +++++-----
 src/protocol/RemotingCommand.cpp                  |  8 +--
 src/protocol/TopicList.h                          |  2 +-
 src/protocol/body/LockBatchResponseBody.hpp       |  4 +-
 src/protocol/body/ResetOffsetBody.hpp             |  4 +-
 src/protocol/body/TopicRouteData.hpp              |  4 +-
 src/protocol/header/CommandHeader.cpp             | 60 +++++++++-------
 src/protocol/header/CommandHeader.h               | 27 ++++----
 src/protocol/header/ReplyMessageRequestHeader.hpp |  4 +-
 src/transport/RequestProcessor.h                  |  2 +-
 src/transport/TcpRemotingClient.cpp               |  4 +-
 test/src/protocol/RemotingCommandTest.cpp         |  2 +-
 36 files changed, 361 insertions(+), 342 deletions(-)

diff --git a/include/RemotingCommand.h b/include/RemotingCommand.h
index 68847b3..af67c87 100644
--- a/include/RemotingCommand.h
+++ b/include/RemotingCommand.h
@@ -67,7 +67,7 @@ class ROCKETMQCLIENT_API RemotingCommand {
   template <class H>
   H* decodeCommandCustomHeader(bool useCache = true);
 
-  static RemotingCommand* Decode(ByteArrayRef array, bool hasPackageLength = false);
+  static std::unique_ptr<RemotingCommand> Decode(ByteArrayRef array, bool hasPackageLength = false);
 
   std::string toString() const;
 
@@ -116,9 +116,9 @@ H* RemotingCommand::decodeCommandCustomHeader(bool useCache) {
   }
 
   try {
-    H* header = H::Decode(ext_fields_);
-    custom_header_.reset(header);
-    return header;
+    std::unique_ptr<H> header = H::Decode(ext_fields_);
+    custom_header_ = std::move(header);
+    return static_cast<H*>(custom_header_.get());
   } catch (std::exception& e) {
     THROW_MQEXCEPTION(RemotingCommandException, e.what(), -1);
   }
diff --git a/src/ClientRemotingProcessor.cpp b/src/ClientRemotingProcessor.cpp
index d4fc34c..329d0d4 100644
--- a/src/ClientRemotingProcessor.cpp
+++ b/src/ClientRemotingProcessor.cpp
@@ -35,7 +35,8 @@ ClientRemotingProcessor::ClientRemotingProcessor(MQClientInstance* clientInstanc
 
 ClientRemotingProcessor::~ClientRemotingProcessor() = default;
 
-RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransportPtr channel, RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::processRequest(TcpTransportPtr channel,
+                                                                         RemotingCommand* request) {
   const auto& addr = channel->getPeerAddrAndPort();
   LOG_DEBUG_NEW("processRequest, code:{}, addr:{}", request->code(), addr);
   switch (request->code()) {
@@ -61,7 +62,8 @@ RemotingCommand* ClientRemotingProcessor::processRequest(TcpTransportPtr channel
   return nullptr;
 }
 
-RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::string& addr, RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::checkTransactionState(const std::string& addr,
+                                                                                RemotingCommand* request) {
   auto* requestHeader = request->decodeCommandCustomHeader<CheckTransactionStateRequestHeader>();
   assert(requestHeader != nullptr);
 
@@ -95,14 +97,14 @@ RemotingCommand* ClientRemotingProcessor::checkTransactionState(const std::strin
   return nullptr;
 }
 
-RemotingCommand* ClientRemotingProcessor::notifyConsumerIdsChanged(RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::notifyConsumerIdsChanged(RemotingCommand* request) {
   auto* requestHeader = request->decodeCommandCustomHeader<NotifyConsumerIdsChangedRequestHeader>();
   LOG_INFO_NEW("notifyConsumerIdsChanged, group:{}", requestHeader->getConsumerGroup());
   client_instance_->rebalanceImmediately();
   return nullptr;
 }
 
-RemotingCommand* ClientRemotingProcessor::resetOffset(RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::resetOffset(RemotingCommand* request) {
   auto* responseHeader = request->decodeCommandCustomHeader<ResetOffsetRequestHeader>();
   auto requestBody = request->body();
   if (requestBody != nullptr && requestBody->size() > 0) {
@@ -116,7 +118,8 @@ RemotingCommand* ClientRemotingProcessor::resetOffset(RemotingCommand* request)
   return nullptr;  // as resetOffset is oneWayRPC, do not need return any response
 }
 
-RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(const std::string& addr, RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::getConsumerRunningInfo(const std::string& addr,
+                                                                                 RemotingCommand* request) {
   auto* requestHeader = request->decodeCommandCustomHeader<GetConsumerRunningInfoRequestHeader>();
   LOG_INFO_NEW("getConsumerRunningInfo, group:{}", requestHeader->getConsumerGroup());
 
@@ -137,10 +140,10 @@ RemotingCommand* ClientRemotingProcessor::getConsumerRunningInfo(const std::stri
     response->set_remark("The Consumer Group not exist in this consumer");
   }
 
-  return response.release();
+  return response;
 }
 
-RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* request) {
+std::unique_ptr<RemotingCommand> ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* request) {
   std::unique_ptr<RemotingCommand> response(
       new RemotingCommand(MQResponseCode::SYSTEM_ERROR, "not set any response code"));
 
@@ -192,7 +195,7 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
     response->set_remark("process reply message fail");
   }
 
-  return response.release();
+  return response;
 }
 
 void ClientRemotingProcessor::processReplyMessage(std::unique_ptr<MQMessageExt> replyMsg) {
diff --git a/src/ClientRemotingProcessor.h b/src/ClientRemotingProcessor.h
index ca28297..7812d88 100644
--- a/src/ClientRemotingProcessor.h
+++ b/src/ClientRemotingProcessor.h
@@ -28,13 +28,13 @@ class ClientRemotingProcessor : public RequestProcessor {
   ClientRemotingProcessor(MQClientInstance* clientInstance);
   virtual ~ClientRemotingProcessor();
 
-  RemotingCommand* processRequest(TcpTransportPtr channel, RemotingCommand* request) override;
+  std::unique_ptr<RemotingCommand> processRequest(TcpTransportPtr channel, RemotingCommand* request) override;
 
-  RemotingCommand* checkTransactionState(const std::string& addr, RemotingCommand* request);
-  RemotingCommand* notifyConsumerIdsChanged(RemotingCommand* request);
-  RemotingCommand* resetOffset(RemotingCommand* request);
-  RemotingCommand* getConsumerRunningInfo(const std::string& addr, RemotingCommand* request);
-  RemotingCommand* receiveReplyMessage(RemotingCommand* request);
+  std::unique_ptr<RemotingCommand> checkTransactionState(const std::string& addr, RemotingCommand* request);
+  std::unique_ptr<RemotingCommand> notifyConsumerIdsChanged(RemotingCommand* request);
+  std::unique_ptr<RemotingCommand> resetOffset(RemotingCommand* request);
+  std::unique_ptr<RemotingCommand> getConsumerRunningInfo(const std::string& addr, RemotingCommand* request);
+  std::unique_ptr<RemotingCommand> receiveReplyMessage(RemotingCommand* request);
 
  private:
   void processReplyMessage(std::unique_ptr<MQMessageExt> replyMsg);
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index fe76f13..176da3b 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -88,28 +88,28 @@ void MQClientAPIImpl::createTopic(const std::string& addr, const std::string& de
   THROW_MQEXCEPTION(MQBrokerException, response->remark(), response->code());
 }
 
-SendResult* MQClientAPIImpl::sendMessage(const std::string& addr,
-                                         const std::string& brokerName,
-                                         const MessagePtr msg,
-                                         std::unique_ptr<SendMessageRequestHeader> requestHeader,
-                                         int timeoutMillis,
-                                         CommunicationMode communicationMode,
-                                         DefaultMQProducerImplPtr producer) {
+std::unique_ptr<SendResult> MQClientAPIImpl::sendMessage(const std::string& addr,
+                                                         const std::string& brokerName,
+                                                         const MessagePtr msg,
+                                                         std::unique_ptr<SendMessageRequestHeader> requestHeader,
+                                                         int timeoutMillis,
+                                                         CommunicationMode communicationMode,
+                                                         DefaultMQProducerImplPtr producer) {
   return sendMessage(addr, brokerName, msg, std::move(requestHeader), timeoutMillis, communicationMode, nullptr,
                      nullptr, nullptr, 0, producer);
 }
 
-SendResult* MQClientAPIImpl::sendMessage(const std::string& addr,
-                                         const std::string& brokerName,
-                                         const MessagePtr msg,
-                                         std::unique_ptr<SendMessageRequestHeader> requestHeader,
-                                         int timeoutMillis,
-                                         CommunicationMode communicationMode,
-                                         SendCallback* sendCallback,
-                                         TopicPublishInfoPtr topicPublishInfo,
-                                         MQClientInstancePtr instance,
-                                         int retryTimesWhenSendFailed,
-                                         DefaultMQProducerImplPtr producer) {
+std::unique_ptr<SendResult> MQClientAPIImpl::sendMessage(const std::string& addr,
+                                                         const std::string& brokerName,
+                                                         const MessagePtr msg,
+                                                         std::unique_ptr<SendMessageRequestHeader> requestHeader,
+                                                         int timeoutMillis,
+                                                         CommunicationMode communicationMode,
+                                                         SendCallback* sendCallback,
+                                                         TopicPublishInfoPtr topicPublishInfo,
+                                                         MQClientInstancePtr instance,
+                                                         int retryTimesWhenSendFailed,
+                                                         DefaultMQProducerImplPtr producer) {
   int code = SEND_MESSAGE;
   std::unique_ptr<CommandCustomHeader> header;
 
@@ -124,7 +124,7 @@ SendResult* MQClientAPIImpl::sendMessage(const std::string& addr,
   }
 
   if (code != SEND_MESSAGE && code != SEND_REPLY_MESSAGE) {
-    header.reset(SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(requestHeader.get()));
+    header = SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(requestHeader.get());
   } else {
     header = std::move(requestHeader);
   }
@@ -173,20 +173,20 @@ void MQClientAPIImpl::sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw,
   remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
 }
 
-SendResult* MQClientAPIImpl::sendMessageSync(const std::string& addr,
-                                             const std::string& brokerName,
-                                             const MessagePtr msg,
-                                             RemotingCommand& request,
-                                             int timeoutMillis) {
+std::unique_ptr<SendResult> MQClientAPIImpl::sendMessageSync(const std::string& addr,
+                                                             const std::string& brokerName,
+                                                             const MessagePtr msg,
+                                                             RemotingCommand& request,
+                                                             int timeoutMillis) {
   // block until response
   std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
   assert(response != nullptr);
   return processSendResponse(brokerName, msg, response.get());
 }
 
-SendResult* MQClientAPIImpl::processSendResponse(const std::string& brokerName,
-                                                 const MessagePtr msg,
-                                                 RemotingCommand* response) {
+std::unique_ptr<SendResult> MQClientAPIImpl::processSendResponse(const std::string& brokerName,
+                                                                 const MessagePtr msg,
+                                                                 RemotingCommand* response) {
   SendStatus sendStatus = SEND_OK;
   switch (response->code()) {
     case FLUSH_DISK_TIMEOUT:
@@ -227,18 +227,18 @@ SendResult* MQClientAPIImpl::processSendResponse(const std::string& brokerName,
     }
   }
 
-  SendResult* sendResult =
-      new SendResult(sendStatus, uniqMsgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset);
+  std::unique_ptr<SendResult> sendResult(
+      new SendResult(sendStatus, uniqMsgId, responseHeader->msgId, messageQueue, responseHeader->queueOffset));
   sendResult->set_transaction_id(responseHeader->transactionId);
 
   return sendResult;
 }
 
-PullResult* MQClientAPIImpl::pullMessage(const std::string& addr,
-                                         PullMessageRequestHeader* requestHeader,
-                                         int timeoutMillis,
-                                         CommunicationMode communicationMode,
-                                         PullCallback* pullCallback) {
+std::unique_ptr<PullResult> MQClientAPIImpl::pullMessage(const std::string& addr,
+                                                         PullMessageRequestHeader* requestHeader,
+                                                         int timeoutMillis,
+                                                         CommunicationMode communicationMode,
+                                                         PullCallback* pullCallback) {
   RemotingCommand request(PULL_MESSAGE, requestHeader);
 
   switch (communicationMode) {
@@ -261,13 +261,15 @@ void MQClientAPIImpl::pullMessageAsync(const std::string& addr,
   remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
 }
 
-PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis) {
+std::unique_ptr<PullResult> MQClientAPIImpl::pullMessageSync(const std::string& addr,
+                                                             RemotingCommand& request,
+                                                             int timeoutMillis) {
   std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(addr, request, timeoutMillis));
   assert(response != nullptr);
   return processPullResponse(response.get());
 }
 
-PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* response) {
+std::unique_ptr<PullResult> MQClientAPIImpl::processPullResponse(RemotingCommand* response) {
   PullStatus pullStatus = NO_NEW_MSG;
   switch (response->code()) {
     case SUCCESS:
@@ -294,8 +296,9 @@ PullResult* MQClientAPIImpl::processPullResponse(RemotingCommand* response) {
   auto* responseHeader = response->decodeCommandCustomHeader<PullMessageResponseHeader>();
   assert(responseHeader != nullptr);
 
-  return new PullResultExt(pullStatus, responseHeader->nextBeginOffset, responseHeader->minOffset,
-                           responseHeader->maxOffset, (int)responseHeader->suggestWhichBrokerId, response->body());
+  return std::unique_ptr<PullResult>(new PullResultExt(pullStatus, responseHeader->nextBeginOffset,
+                                                       responseHeader->minOffset, responseHeader->maxOffset,
+                                                       (int)responseHeader->suggestWhichBrokerId, response->body()));
 }
 
 MQMessageExt MQClientAPIImpl::viewMessage(const std::string& addr, int64_t phyoffset, int timeoutMillis) {
@@ -622,7 +625,8 @@ void MQClientAPIImpl::unlockBatchMQ(const std::string& addr,
   }
 }
 
-TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis) {
+std::unique_ptr<TopicRouteData> MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::string& topic,
+                                                                                 int timeoutMillis) {
   RemotingCommand request(GET_ROUTEINFO_BY_TOPIC, new GetRouteInfoRequestHeader(topic));
 
   std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(null, request, timeoutMillis));
@@ -642,7 +646,7 @@ TopicRouteData* MQClientAPIImpl::getTopicRouteInfoFromNameServer(const std::stri
   THROW_MQEXCEPTION(MQClientException, response->remark(), response->code());
 }
 
-TopicList* MQClientAPIImpl::getTopicListFromNameServer() {
+std::unique_ptr<TopicList> MQClientAPIImpl::getTopicListFromNameServer() {
   RemotingCommand request(GET_ALL_TOPIC_LIST_FROM_NAMESERVER, nullptr);
 
   std::unique_ptr<RemotingCommand> response(remoting_client_->invokeSync(null, request));
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 535f33d..5bebeff 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -59,32 +59,34 @@ class MQClientAPIImpl {
 
   void createTopic(const std::string& addr, const std::string& defaultTopic, TopicConfig topicConfig);
 
-  SendResult* sendMessage(const std::string& addr,
-                          const std::string& brokerName,
-                          const MessagePtr msg,
-                          std::unique_ptr<SendMessageRequestHeader> requestHeader,
-                          int timeoutMillis,
-                          CommunicationMode communicationMode,
-                          DefaultMQProducerImplPtr producer);
-  SendResult* sendMessage(const std::string& addr,
-                          const std::string& brokerName,
-                          const MessagePtr msg,
-                          std::unique_ptr<SendMessageRequestHeader> requestHeader,
-                          int timeoutMillis,
-                          CommunicationMode communicationMode,
-                          SendCallback* sendCallback,
-                          TopicPublishInfoPtr topicPublishInfo,
-                          MQClientInstancePtr instance,
-                          int retryTimesWhenSendFailed,
-                          DefaultMQProducerImplPtr producer);
-  SendResult* processSendResponse(const std::string& brokerName, const MessagePtr msg, RemotingCommand* pResponse);
-
-  PullResult* pullMessage(const std::string& addr,
-                          PullMessageRequestHeader* requestHeader,
-                          int timeoutMillis,
-                          CommunicationMode communicationMode,
-                          PullCallback* pullCallback);
-  PullResult* processPullResponse(RemotingCommand* pResponse);
+  std::unique_ptr<SendResult> sendMessage(const std::string& addr,
+                                          const std::string& brokerName,
+                                          const MessagePtr msg,
+                                          std::unique_ptr<SendMessageRequestHeader> requestHeader,
+                                          int timeoutMillis,
+                                          CommunicationMode communicationMode,
+                                          DefaultMQProducerImplPtr producer);
+  std::unique_ptr<SendResult> sendMessage(const std::string& addr,
+                                          const std::string& brokerName,
+                                          const MessagePtr msg,
+                                          std::unique_ptr<SendMessageRequestHeader> requestHeader,
+                                          int timeoutMillis,
+                                          CommunicationMode communicationMode,
+                                          SendCallback* sendCallback,
+                                          TopicPublishInfoPtr topicPublishInfo,
+                                          MQClientInstancePtr instance,
+                                          int retryTimesWhenSendFailed,
+                                          DefaultMQProducerImplPtr producer);
+  std::unique_ptr<SendResult> processSendResponse(const std::string& brokerName,
+                                                  const MessagePtr msg,
+                                                  RemotingCommand* pResponse);
+
+  std::unique_ptr<PullResult> pullMessage(const std::string& addr,
+                                          PullMessageRequestHeader* requestHeader,
+                                          int timeoutMillis,
+                                          CommunicationMode communicationMode,
+                                          PullCallback* pullCallback);
+  std::unique_ptr<PullResult> processPullResponse(RemotingCommand* pResponse);
 
   MQMessageExt viewMessage(const std::string& addr, int64_t phyoffset, int timeoutMillis);
 
@@ -141,9 +143,9 @@ class MQClientAPIImpl {
                      int timeoutMillis,
                      bool oneway = false);
 
-  TopicRouteData* getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis);
+  std::unique_ptr<TopicRouteData> getTopicRouteInfoFromNameServer(const std::string& topic, int timeoutMillis);
 
-  TopicList* getTopicListFromNameServer();
+  std::unique_ptr<TopicList> getTopicListFromNameServer();
 
   int wipeWritePermOfBroker(const std::string& namesrvAddr, const std::string& brokerName, int timeoutMillis);
 
@@ -165,11 +167,11 @@ class MQClientAPIImpl {
  private:
   friend class SendCallbackWrap;
 
-  SendResult* sendMessageSync(const std::string& addr,
-                              const std::string& brokerName,
-                              const MessagePtr msg,
-                              RemotingCommand& request,
-                              int timeoutMillis);
+  std::unique_ptr<SendResult> sendMessageSync(const std::string& addr,
+                                              const std::string& brokerName,
+                                              const MessagePtr msg,
+                                              RemotingCommand& request,
+                                              int timeoutMillis);
 
   void sendMessageAsync(const std::string& addr,
                         const std::string& brokerName,
@@ -184,7 +186,7 @@ class MQClientAPIImpl {
 
   void sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw, int64_t timeoutMillis);
 
-  PullResult* pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis);
+  std::unique_ptr<PullResult> pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis);
 
   void pullMessageAsync(const std::string& addr,
                         RemotingCommand& request,
diff --git a/src/MQClientInstance.cpp b/src/MQClientInstance.cpp
index 443b7c4..a07a92e 100644
--- a/src/MQClientInstance.cpp
+++ b/src/MQClientInstance.cpp
@@ -19,7 +19,6 @@
 #include <typeindex>
 
 #include "ClientRemotingProcessor.h"
-#include "protocol/body/ConsumerRunningInfo.h"
 #include "Logging.h"
 #include "MQAdminImpl.h"
 #include "MQClientAPIImpl.h"
@@ -33,6 +32,7 @@
 #include "TcpRemotingClient.h"
 #include "TopicPublishInfo.hpp"
 #include "UtilAll.h"
+#include "protocol/body/ConsumerRunningInfo.h"
 
 namespace rocketmq {
 
@@ -415,8 +415,8 @@ bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& top
     try {
       TopicRouteDataPtr topicRouteData;
       if (isDefault) {
-        topicRouteData.reset(
-            mq_client_api_impl_->getTopicRouteInfoFromNameServer(AUTO_CREATE_TOPIC_KEY_TOPIC, 1000 * 3));
+        topicRouteData =
+            mq_client_api_impl_->getTopicRouteInfoFromNameServer(AUTO_CREATE_TOPIC_KEY_TOPIC, 1000 * 3);
         if (topicRouteData != nullptr) {
           auto& queueDatas = topicRouteData->queue_datas();
           for (auto& qd : queueDatas) {
@@ -427,7 +427,7 @@ bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& top
         }
         LOG_DEBUG_NEW("getTopicRouteInfoFromNameServer is null for topic: {}", topic);
       } else {
-        topicRouteData.reset(mq_client_api_impl_->getTopicRouteInfoFromNameServer(topic, 1000 * 3));
+        topicRouteData = mq_client_api_impl_->getTopicRouteInfoFromNameServer(topic, 1000 * 3);
       }
       if (topicRouteData != nullptr) {
         LOG_INFO_NEW("updateTopicRouteInfoFromNameServer has data");
@@ -477,19 +477,19 @@ bool MQClientInstance::updateTopicRouteInfoFromNameServer(const std::string& top
   return false;
 }
 
-HeartbeatData* MQClientInstance::prepareHeartbeatData() {
-  HeartbeatData* pHeartbeatData = new HeartbeatData();
+std::unique_ptr<HeartbeatData> MQClientInstance::prepareHeartbeatData() {
+  std::unique_ptr<HeartbeatData> heartbeat_data( new HeartbeatData());
 
   // clientID
-  pHeartbeatData->set_client_id(client_id_);
+  heartbeat_data->set_client_id(client_id_);
 
   // Consumer
-  insertConsumerInfoToHeartBeatData(pHeartbeatData);
+  insertConsumerInfoToHeartBeatData(heartbeat_data.get());
 
   // Producer
-  insertProducerInfoToHeartBeatData(pHeartbeatData);
+  insertProducerInfoToHeartBeatData(heartbeat_data.get());
 
-  return pHeartbeatData;
+  return heartbeat_data;
 }
 
 void MQClientInstance::insertConsumerInfoToHeartBeatData(HeartbeatData* heartbeatData) {
@@ -769,7 +769,7 @@ TopicPublishInfoPtr MQClientInstance::tryToFindTopicPublishInfo(const std::strin
   }
 }
 
-FindBrokerResult* MQClientInstance::findBrokerAddressInAdmin(const std::string& brokerName) {
+std::unique_ptr<FindBrokerResult> MQClientInstance::findBrokerAddressInAdmin(const std::string& brokerName) {
   BrokerAddrMAP brokerTable(getBrokerAddrTable());
   bool found = false;
   bool slave = false;
@@ -788,7 +788,7 @@ FindBrokerResult* MQClientInstance::findBrokerAddressInAdmin(const std::string&
 
   brokerTable.clear();
   if (found) {
-    return new FindBrokerResult(brokerAddr, slave);
+    return std::unique_ptr<FindBrokerResult>(new FindBrokerResult(brokerAddr, slave));
   }
 
   return nullptr;
@@ -817,9 +817,9 @@ std::string MQClientInstance::findBrokerAddressInPublish(const std::string& brok
   return null;
 }
 
-FindBrokerResult* MQClientInstance::findBrokerAddressInSubscribe(const std::string& brokerName,
-                                                                 int brokerId,
-                                                                 bool onlyThisBroker) {
+std::unique_ptr<FindBrokerResult> MQClientInstance::findBrokerAddressInSubscribe(const std::string& brokerName,
+                                                                                 int brokerId,
+                                                                                 bool onlyThisBroker) {
   std::string brokerAddr;
   bool slave = false;
   bool found = false;
@@ -846,7 +846,7 @@ FindBrokerResult* MQClientInstance::findBrokerAddressInSubscribe(const std::stri
   brokerTable.clear();
 
   if (found) {
-    return new FindBrokerResult(brokerAddr, slave);
+    return std::unique_ptr<FindBrokerResult>(new FindBrokerResult(brokerAddr, slave));
   }
 
   return nullptr;
@@ -950,7 +950,7 @@ void MQClientInstance::resetOffset(const std::string& group,
   }
 }
 
-ConsumerRunningInfo* MQClientInstance::consumerRunningInfo(const std::string& consumerGroup) {
+std::unique_ptr<ConsumerRunningInfo> MQClientInstance::consumerRunningInfo(const std::string& consumerGroup) {
   auto* consumer = selectConsumer(consumerGroup);
   if (consumer != nullptr) {
     std::unique_ptr<ConsumerRunningInfo> runningInfo(consumer->consumerRunningInfo());
@@ -967,7 +967,7 @@ ConsumerRunningInfo* MQClientInstance::consumerRunningInfo(const std::string& co
       runningInfo->setProperty(ConsumerRunningInfo::PROP_CLIENT_VERSION,
                                MQVersion::GetVersionDesc(MQVersion::CURRENT_VERSION));
 
-      return runningInfo.release();
+      return runningInfo;
     }
   }
 
diff --git a/src/MQClientInstance.h b/src/MQClientInstance.h
index 5a589aa..bf606b1 100644
--- a/src/MQClientInstance.h
+++ b/src/MQClientInstance.h
@@ -22,18 +22,18 @@
 #include <set>
 #include <utility>
 
-#include "protocol/body/ConsumerRunningInfo.h"
 #include "FindBrokerResult.hpp"
 #include "MQClientConfig.h"
-#include "MQException.h"
 #include "MQConsumerInner.h"
+#include "MQException.h"
 #include "MQMessageQueue.h"
 #include "MQProducerInner.h"
 #include "ServiceState.h"
 #include "TopicPublishInfo.hpp"
+#include "concurrent/executor.hpp"
+#include "protocol/body/ConsumerRunningInfo.h"
 #include "protocol/body/TopicRouteData.hpp"
 #include "protocol/heartbeat/HeartbeatData.hpp"
-#include "concurrent/executor.hpp"
 
 namespace rocketmq {
 
@@ -83,9 +83,11 @@ class MQClientInstance {
   MQProducerInner* selectProducer(const std::string& group);
   MQConsumerInner* selectConsumer(const std::string& group);
 
-  FindBrokerResult* findBrokerAddressInAdmin(const std::string& brokerName);
+  std::unique_ptr<FindBrokerResult> findBrokerAddressInAdmin(const std::string& brokerName);
   std::string findBrokerAddressInPublish(const std::string& brokerName);
-  FindBrokerResult* findBrokerAddressInSubscribe(const std::string& brokerName, int brokerId, bool onlyThisBroker);
+  std::unique_ptr<FindBrokerResult> findBrokerAddressInSubscribe(const std::string& brokerName,
+                                                                 int brokerId,
+                                                                 bool onlyThisBroker);
 
   void findConsumerIds(const std::string& topic, const std::string& group, std::vector<std::string>& cids);
 
@@ -95,7 +97,7 @@ class MQClientInstance {
                    const std::string& topic,
                    const std::map<MQMessageQueue, int64_t>& offsetTable);
 
-  ConsumerRunningInfo* consumerRunningInfo(const std::string& consumerGroup);
+  std::unique_ptr<ConsumerRunningInfo> consumerRunningInfo(const std::string& consumerGroup);
 
  public:
   TopicPublishInfoPtr tryToFindTopicPublishInfo(const std::string& topic);
@@ -133,7 +135,7 @@ class MQClientInstance {
 
   // heartbeat
   void sendHeartbeatToAllBroker();
-  HeartbeatData* prepareHeartbeatData();
+  std::unique_ptr<HeartbeatData> prepareHeartbeatData();
   void insertConsumerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
   void insertProducerInfoToHeartBeatData(HeartbeatData* pHeartbeatData);
 
diff --git a/src/common/FilterAPI.hpp b/src/common/FilterAPI.hpp
index 9bb18bb..7ff944e 100644
--- a/src/common/FilterAPI.hpp
+++ b/src/common/FilterAPI.hpp
@@ -27,7 +27,7 @@ namespace rocketmq {
 
 class FilterAPI {
  public:
-  static SubscriptionData* buildSubscriptionData(const std::string& topic, const std::string& sub_string) {
+  static std::unique_ptr<SubscriptionData> buildSubscriptionData(const std::string& topic, const std::string& sub_string) {
     // delete in Rebalance
     std::unique_ptr<SubscriptionData> subscription_data(new SubscriptionData(topic, sub_string));
 
@@ -52,7 +52,7 @@ class FilterAPI {
       }
     }
 
-    return subscription_data.release();
+    return subscription_data;
   }
 };
 
diff --git a/src/consumer/DefaultLitePullConsumerImpl.cpp b/src/consumer/DefaultLitePullConsumerImpl.cpp
index 3a5fc81..eb59ebc 100644
--- a/src/consumer/DefaultLitePullConsumerImpl.cpp
+++ b/src/consumer/DefaultLitePullConsumerImpl.cpp
@@ -22,11 +22,11 @@
 
 #include "AssignedMessageQueue.hpp"
 #include "FilterAPI.hpp"
+#include "LocalFileOffsetStore.h"
 #include "MQAdminImpl.h"
 #include "MQClientAPIImpl.h"
 #include "MQClientInstance.h"
 #include "NamespaceUtil.h"
-#include "LocalFileOffsetStore.h"
 #include "PullAPIWrapper.h"
 #include "PullSysFlag.h"
 #include "RebalanceLitePullImpl.h"
@@ -186,7 +186,7 @@ class DefaultLitePullConsumerImpl::PullTaskImpl : public std::enable_shared_from
       if (consumer->subscription_type_ == SubscriptionType::SUBSCRIBE) {
         subscription_data = consumer->rebalance_impl_->getSubscriptionData(message_queue_.topic());
       } else {
-        subscription_data = FilterAPI::buildSubscriptionData(message_queue_.topic(), SUB_ALL);
+        subscription_data = FilterAPI::buildSubscriptionData(message_queue_.topic(), SUB_ALL).release();
       }
 
       std::unique_ptr<PullResult> pull_result(
@@ -327,8 +327,9 @@ void DefaultLitePullConsumerImpl::start() {
       bool registerOK = client_instance_->registerConsumer(client_config_->group_name(), this);
       if (!registerOK) {
         service_state_ = CREATE_JUST;
-        THROW_MQEXCEPTION(MQClientException, "The cousumer group[" + client_config_->group_name() +
-                                                 "] has been created before, specify another name please.",
+        THROW_MQEXCEPTION(MQClientException,
+                          "The cousumer group[" + client_config_->group_name() +
+                              "] has been created before, specify another name please.",
                           -1);
       }
 
@@ -502,8 +503,7 @@ void DefaultLitePullConsumerImpl::subscribe(const std::string& topic, const std:
       THROW_MQEXCEPTION(MQClientException, "Topic can not be null or empty.", -1);
     }
     set_subscription_type(SubscriptionType::SUBSCRIBE);
-    auto* subscription_data = FilterAPI::buildSubscriptionData(topic, subExpression);
-    rebalance_impl_->setSubscriptionData(topic, subscription_data);
+    rebalance_impl_->setSubscriptionData(topic, FilterAPI::buildSubscriptionData(topic, subExpression));
 
     message_queue_listener_.reset(new MessageQueueListenerImpl(shared_from_this()));
     assigned_message_queue_->set_rebalance_impl(rebalance_impl_.get());
@@ -600,29 +600,29 @@ int64_t DefaultLitePullConsumerImpl::fetchConsumeOffset(const MQMessageQueue& me
   return rebalance_impl_->computePullFromWhere(messageQueue);
 }
 
-PullResult* DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
-                                              SubscriptionData* subscription_data,
-                                              int64_t offset,
-                                              int max_nums) {
+std::unique_ptr<PullResult> DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
+                                                              SubscriptionData* subscription_data,
+                                                              int64_t offset,
+                                                              int max_nums) {
   return pull(mq, subscription_data, offset, max_nums,
               getDefaultLitePullConsumerConfig()->consumer_pull_timeout_millis());
 }
 
-PullResult* DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
-                                              SubscriptionData* subscription_data,
-                                              int64_t offset,
-                                              int max_nums,
-                                              long timeout) {
+std::unique_ptr<PullResult> DefaultLitePullConsumerImpl::pull(const MQMessageQueue& mq,
+                                                              SubscriptionData* subscription_data,
+                                                              int64_t offset,
+                                                              int max_nums,
+                                                              long timeout) {
   return pullSyncImpl(mq, subscription_data, offset, max_nums,
                       getDefaultLitePullConsumerConfig()->long_polling_enable(), timeout);
 }
 
-PullResult* DefaultLitePullConsumerImpl::pullSyncImpl(const MQMessageQueue& mq,
-                                                      SubscriptionData* subscription_data,
-                                                      int64_t offset,
-                                                      int max_nums,
-                                                      bool block,
-                                                      long timeout) {
+std::unique_ptr<PullResult> DefaultLitePullConsumerImpl::pullSyncImpl(const MQMessageQueue& mq,
+                                                                      SubscriptionData* subscription_data,
+                                                                      int64_t offset,
+                                                                      int max_nums,
+                                                                      bool block,
+                                                                      long timeout) {
   if (offset < 0) {
     THROW_MQEXCEPTION(MQClientException, "offset < 0", -1);
   }
@@ -838,8 +838,8 @@ void DefaultLitePullConsumerImpl::registerTopicMessageQueueChangeListener(
   }
 }
 
-ConsumerRunningInfo* DefaultLitePullConsumerImpl::consumerRunningInfo() {
-  auto* info = new ConsumerRunningInfo();
+std::unique_ptr<ConsumerRunningInfo> DefaultLitePullConsumerImpl::consumerRunningInfo() {
+  std::unique_ptr<ConsumerRunningInfo> info(new ConsumerRunningInfo());
 
   info->setProperty(ConsumerRunningInfo::PROP_CONSUMER_START_TIMESTAMP, UtilAll::to_string(start_time_));
 
diff --git a/src/consumer/DefaultLitePullConsumerImpl.h b/src/consumer/DefaultLitePullConsumerImpl.h
index 6923799..1f727d9 100755
--- a/src/consumer/DefaultLitePullConsumerImpl.h
+++ b/src/consumer/DefaultLitePullConsumerImpl.h
@@ -21,14 +21,14 @@
 #include <mutex>   // std::mutex
 #include <string>  // std::string
 
-#include "concurrent/blocking_queue.hpp"
-#include "concurrent/executor.hpp"
 #include "DefaultLitePullConsumer.h"
-#include "MessageQueueListener.h"
-#include "MessageQueueLock.hpp"
 #include "MQClientImpl.h"
 #include "MQConsumerInner.h"
+#include "MessageQueueListener.h"
+#include "MessageQueueLock.hpp"
 #include "TopicMessageQueueChangeListener.h"
+#include "concurrent/blocking_queue.hpp"
+#include "concurrent/executor.hpp"
 
 namespace rocketmq {
 
@@ -125,7 +125,7 @@ class DefaultLitePullConsumerImpl : public std::enable_shared_from_this<DefaultL
   // offset persistence
   void persistConsumerOffset() override;
 
-  ConsumerRunningInfo* consumerRunningInfo() override;
+  std::unique_ptr<ConsumerRunningInfo> consumerRunningInfo() override;
 
  private:
   void checkConfig();
@@ -149,18 +149,21 @@ class DefaultLitePullConsumerImpl : public std::enable_shared_from_this<DefaultL
   int64_t nextPullOffset(const MQMessageQueue& messageQueue);
   int64_t fetchConsumeOffset(const MQMessageQueue& messageQueue);
 
-  PullResult* pull(const MQMessageQueue& mq, SubscriptionData* subscription_data, int64_t offset, int max_nums);
-  PullResult* pull(const MQMessageQueue& mq,
-                   SubscriptionData* subscription_data,
-                   int64_t offset,
-                   int max_nums,
-                   long timeout);
-  PullResult* pullSyncImpl(const MQMessageQueue& mq,
-                           SubscriptionData* subscription_data,
-                           int64_t offset,
-                           int max_nums,
-                           bool block,
-                           long timeout);
+  std::unique_ptr<PullResult> pull(const MQMessageQueue& mq,
+                                   SubscriptionData* subscription_data,
+                                   int64_t offset,
+                                   int max_nums);
+  std::unique_ptr<PullResult> pull(const MQMessageQueue& mq,
+                                   SubscriptionData* subscription_data,
+                                   int64_t offset,
+                                   int max_nums,
+                                   long timeout);
+  std::unique_ptr<PullResult> pullSyncImpl(const MQMessageQueue& mq,
+                                           SubscriptionData* subscription_data,
+                                           int64_t offset,
+                                           int max_nums,
+                                           bool block,
+                                           long timeout);
 
   void submitConsumeRequest(ConsumeRequest* consume_request);
 
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 77ba017..1c8a763 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -62,8 +62,8 @@ class DefaultMQPushConsumerImpl::AsyncPullCallback : public AutoDeletePullCallba
       return;
     }
 
-    pull_result.reset(consumer->pull_api_wrapper_->processPullResult(pull_request_->message_queue(),
-                                                                     std::move(pull_result), subscription_data_));
+    pull_result = consumer->pull_api_wrapper_->processPullResult(pull_request_->message_queue(), std::move(pull_result),
+                                                                 subscription_data_);
     switch (pull_result->pull_status()) {
       case FOUND: {
         int64_t prev_request_offset = pull_request_->next_offset();
@@ -298,8 +298,7 @@ void DefaultMQPushConsumerImpl::checkConfig() {
 void DefaultMQPushConsumerImpl::copySubscription() {
   for (const auto& it : subscription_) {
     LOG_INFO_NEW("buildSubscriptionData: {}, {}", it.first, it.second);
-    SubscriptionData* subscriptionData = FilterAPI::buildSubscriptionData(it.first, it.second);
-    rebalance_impl_->setSubscriptionData(it.first, subscriptionData);
+    rebalance_impl_->setSubscriptionData(it.first, FilterAPI::buildSubscriptionData(it.first, it.second));
   }
 
   switch (getDefaultMQPushConsumerConfig()->message_model()) {
@@ -308,8 +307,7 @@ void DefaultMQPushConsumerImpl::copySubscription() {
     case CLUSTERING: {
       // auto subscript retry topic
       std::string retryTopic = UtilAll::getRetryTopic(client_config_->group_name());
-      SubscriptionData* subscriptionData = FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL);
-      rebalance_impl_->setSubscriptionData(retryTopic, subscriptionData);
+      rebalance_impl_->setSubscriptionData(retryTopic, FilterAPI::buildSubscriptionData(retryTopic, SUB_ALL));
       break;
     }
     default:
@@ -572,8 +570,8 @@ void DefaultMQPushConsumerImpl::updateConsumeOffset(const MQMessageQueue& mq, in
   }
 }
 
-ConsumerRunningInfo* DefaultMQPushConsumerImpl::consumerRunningInfo() {
-  auto* info = new ConsumerRunningInfo();
+std::unique_ptr<ConsumerRunningInfo> DefaultMQPushConsumerImpl::consumerRunningInfo() {
+  std::unique_ptr<ConsumerRunningInfo> info(new ConsumerRunningInfo());
 
   info->setProperty(ConsumerRunningInfo::PROP_CONSUME_ORDERLY, UtilAll::to_string(consume_orderly_));
   info->setProperty(ConsumerRunningInfo::PROP_THREADPOOL_CORE_SIZE,
diff --git a/src/consumer/DefaultMQPushConsumerImpl.h b/src/consumer/DefaultMQPushConsumerImpl.h
index ecac586..28fe385 100755
--- a/src/consumer/DefaultMQPushConsumerImpl.h
+++ b/src/consumer/DefaultMQPushConsumerImpl.h
@@ -100,7 +100,7 @@ class DefaultMQPushConsumerImpl : public std::enable_shared_from_this<DefaultMQP
   // offset persistence
   void persistConsumerOffset() override;
 
-  ConsumerRunningInfo* consumerRunningInfo() override;
+  std::unique_ptr<ConsumerRunningInfo> consumerRunningInfo() override;
 
  public:
   void executePullRequestLater(PullRequestPtr pullRequest, long timeDelay);
diff --git a/src/consumer/MQConsumerInner.h b/src/consumer/MQConsumerInner.h
index d38e839..3daa1b8 100644
--- a/src/consumer/MQConsumerInner.h
+++ b/src/consumer/MQConsumerInner.h
@@ -21,6 +21,7 @@
 #include <vector>
 
 #include "ConsumeType.h"
+#include "MQMessageQueue.h"
 #include "protocol/heartbeat/SubscriptionData.hpp"
 
 namespace rocketmq {
@@ -48,7 +49,7 @@ class MQConsumerInner {
   // offset persistence
   virtual void persistConsumerOffset() = 0;
 
-  virtual ConsumerRunningInfo* consumerRunningInfo() = 0;
+  virtual std::unique_ptr<ConsumerRunningInfo> consumerRunningInfo() = 0;
 };
 
 }  // namespace rocketmq
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index e25f423..16049f2 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -16,11 +16,13 @@
  */
 #include "PullAPIWrapper.h"
 
+#include <memory>
+
 #include "ByteBuffer.hpp"
 #include "MQClientAPIImpl.h"
 #include "MQClientInstance.h"
-#include "MessageDecoder.h"
 #include "MessageAccessor.hpp"
+#include "MessageDecoder.h"
 #include "PullResultExt.hpp"
 #include "PullSysFlag.h"
 
@@ -50,12 +52,12 @@ int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue& mq) {
   return MASTER_ID;
 }
 
-PullResult* PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
-                                              std::unique_ptr<PullResult> pull_result,
-                                              SubscriptionData* subscription_data) {
+std::unique_ptr<PullResult> PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
+                                                              std::unique_ptr<PullResult> pull_result,
+                                                              SubscriptionData* subscription_data) {
   auto* pull_result_ext = dynamic_cast<PullResultExt*>(pull_result.get());
   if (pull_result_ext == nullptr) {
-    return pull_result.release();
+    return pull_result;
   }
 
   // update node
@@ -94,28 +96,29 @@ PullResult* PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
     }
   }
 
-  return new PullResult(pull_result_ext->pull_status(), pull_result_ext->next_begin_offset(),
-                        pull_result_ext->min_offset(), pull_result_ext->max_offset(), std::move(msg_list_filter_again));
+  return std::unique_ptr<PullResult>(new PullResult(pull_result_ext->pull_status(),
+                                                    pull_result_ext->next_begin_offset(), pull_result_ext->min_offset(),
+                                                    pull_result_ext->max_offset(), std::move(msg_list_filter_again)));
 }
 
-PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq,
-                                           const std::string& subExpression,
-                                           const std::string& expressionType,
-                                           int64_t subVersion,
-                                           int64_t offset,
-                                           int maxNums,
-                                           int sysFlag,
-                                           int64_t commitOffset,
-                                           int brokerSuspendMaxTimeMillis,
-                                           int timeoutMillis,
-                                           CommunicationMode communicationMode,
-                                           PullCallback* pullCallback) {
+std::unique_ptr<PullResult> PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq,
+                                                           const std::string& subExpression,
+                                                           const std::string& expressionType,
+                                                           int64_t subVersion,
+                                                           int64_t offset,
+                                                           int maxNums,
+                                                           int sysFlag,
+                                                           int64_t commitOffset,
+                                                           int brokerSuspendMaxTimeMillis,
+                                                           int timeoutMillis,
+                                                           CommunicationMode communicationMode,
+                                                           PullCallback* pullCallback) {
   std::unique_ptr<FindBrokerResult> findBrokerResult(
       client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), recalculatePullFromWhichNode(mq), false));
   if (findBrokerResult == nullptr) {
     client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
-    findBrokerResult.reset(
-        client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), recalculatePullFromWhichNode(mq), false));
+    findBrokerResult =
+        client_instance_->findBrokerAddressInSubscribe(mq.broker_name(), recalculatePullFromWhichNode(mq), false);
   }
 
   if (findBrokerResult != nullptr) {
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 7bdbf5b..01727d3 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -32,22 +32,22 @@ class PullAPIWrapper {
   PullAPIWrapper(MQClientInstance* instance, const std::string& consumerGroup);
   ~PullAPIWrapper();
 
-  PullResult* processPullResult(const MQMessageQueue& mq,
-                                std::unique_ptr<PullResult> pull_result,
-                                SubscriptionData* subscriptionData);
+  std::unique_ptr<PullResult> processPullResult(const MQMessageQueue& mq,
+                                                std::unique_ptr<PullResult> pull_result,
+                                                SubscriptionData* subscriptionData);
 
-  PullResult* pullKernelImpl(const MQMessageQueue& mq,
-                             const std::string& subExpression,
-                             const std::string& expressionType,
-                             int64_t subVersion,
-                             int64_t offset,
-                             int maxNums,
-                             int sysFlag,
-                             int64_t commitOffset,
-                             int brokerSuspendMaxTimeMillis,
-                             int timeoutMillis,
-                             CommunicationMode communicationMode,
-                             PullCallback* pullCallback);
+  std::unique_ptr<PullResult> pullKernelImpl(const MQMessageQueue& mq,
+                                             const std::string& subExpression,
+                                             const std::string& expressionType,
+                                             int64_t subVersion,
+                                             int64_t offset,
+                                             int maxNums,
+                                             int sysFlag,
+                                             int64_t commitOffset,
+                                             int brokerSuspendMaxTimeMillis,
+                                             int timeoutMillis,
+                                             CommunicationMode communicationMode,
+                                             PullCallback* pullCallback);
 
  private:
   void updatePullFromWhichNode(const MQMessageQueue& mq, int brokerId);
diff --git a/src/consumer/RebalanceImpl.cpp b/src/consumer/RebalanceImpl.cpp
index f890ea9..f97bc16 100644
--- a/src/consumer/RebalanceImpl.cpp
+++ b/src/consumer/RebalanceImpl.cpp
@@ -30,11 +30,7 @@ RebalanceImpl::RebalanceImpl(const std::string& consumerGroup,
       allocate_mq_strategy_(allocateMqStrategy),
       client_instance_(instance) {}
 
-RebalanceImpl::~RebalanceImpl() {
-  for (auto& it : subscription_inner_) {
-    deleteAndZero(it.second);
-  }
-}
+RebalanceImpl::~RebalanceImpl() = default;
 
 void RebalanceImpl::unlock(const MQMessageQueue& mq, const bool oneway) {
   std::unique_ptr<FindBrokerResult> findBrokerResult(
@@ -474,18 +470,15 @@ TOPIC2SD& RebalanceImpl::getSubscriptionInner() {
 SubscriptionData* RebalanceImpl::getSubscriptionData(const std::string& topic) {
   const auto& it = subscription_inner_.find(topic);
   if (it != subscription_inner_.end()) {
-    return it->second;
+    return it->second.get();
   }
   return nullptr;
 }
 
-void RebalanceImpl::setSubscriptionData(const std::string& topic, SubscriptionData* subscriptionData) noexcept {
-  if (subscriptionData != nullptr) {
-    const auto& it = subscription_inner_.find(topic);
-    if (it != subscription_inner_.end()) {
-      deleteAndZero(it->second);
-    }
-    subscription_inner_[topic] = subscriptionData;
+void RebalanceImpl::setSubscriptionData(const std::string& topic,
+                                        std::unique_ptr<SubscriptionData> subscription_data) noexcept {
+  if (subscription_data != nullptr) {
+    subscription_inner_[topic] = std::move(subscription_data);
   }
 }
 
diff --git a/src/consumer/RebalanceImpl.h b/src/consumer/RebalanceImpl.h
index a44784e..55b33fd 100755
--- a/src/consumer/RebalanceImpl.h
+++ b/src/consumer/RebalanceImpl.h
@@ -32,7 +32,7 @@ namespace rocketmq {
 
 typedef std::map<MQMessageQueue, ProcessQueuePtr> MQ2PQ;
 typedef std::map<std::string, std::vector<MQMessageQueue>> TOPIC2MQS;
-typedef std::map<std::string, SubscriptionData*> TOPIC2SD;
+typedef std::map<std::string, std::unique_ptr<SubscriptionData>> TOPIC2SD;
 typedef std::map<std::string, std::vector<MQMessageQueue>> BROKER2MQS;
 
 class RebalanceImpl {
@@ -75,7 +75,7 @@ class RebalanceImpl {
  public:
   TOPIC2SD& getSubscriptionInner();
   SubscriptionData* getSubscriptionData(const std::string& topic);
-  void setSubscriptionData(const std::string& topic, SubscriptionData* sd) noexcept;
+  void setSubscriptionData(const std::string& topic, std::unique_ptr<SubscriptionData> sd) noexcept;
 
   bool getTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& mqs);
   void setTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& mqs);
diff --git a/src/consumer/RebalanceLitePullImpl.h b/src/consumer/RebalanceLitePullImpl.h
index 56b6189..90bcf8e 100755
--- a/src/consumer/RebalanceLitePullImpl.h
+++ b/src/consumer/RebalanceLitePullImpl.h
@@ -23,9 +23,6 @@
 namespace rocketmq {
 
 typedef std::map<MQMessageQueue, ProcessQueuePtr> MQ2PQ;
-typedef std::map<std::string, std::vector<MQMessageQueue>> TOPIC2MQS;
-typedef std::map<std::string, SubscriptionData*> TOPIC2SD;
-typedef std::map<std::string, std::vector<MQMessageQueue>> BROKER2MQS;
 
 class RebalanceLitePullImpl : public RebalanceImpl {
  public:
diff --git a/src/consumer/RemoteBrokerOffsetStore.cpp b/src/consumer/RemoteBrokerOffsetStore.cpp
index ad2e210..713bf17 100644
--- a/src/consumer/RemoteBrokerOffsetStore.cpp
+++ b/src/consumer/RemoteBrokerOffsetStore.cpp
@@ -151,7 +151,7 @@ void RemoteBrokerOffsetStore::updateConsumeOffsetToBroker(const MQMessageQueue&
 
   if (findBrokerResult == nullptr) {
     client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
-    findBrokerResult.reset(client_instance_->findBrokerAddressInAdmin(mq.broker_name()));
+    findBrokerResult = client_instance_->findBrokerAddressInAdmin(mq.broker_name());
   }
 
   if (findBrokerResult != nullptr) {
@@ -177,7 +177,7 @@ int64_t RemoteBrokerOffsetStore::fetchConsumeOffsetFromBroker(const MQMessageQue
 
   if (findBrokerResult == nullptr) {
     client_instance_->updateTopicRouteInfoFromNameServer(mq.topic());
-    findBrokerResult.reset(client_instance_->findBrokerAddressInAdmin(mq.broker_name()));
+    findBrokerResult = client_instance_->findBrokerAddressInAdmin(mq.broker_name());
   }
 
   if (findBrokerResult != nullptr) {
diff --git a/src/io/ByteBuffer.cpp b/src/io/ByteBuffer.cpp
index f12bd72..0121656 100644
--- a/src/io/ByteBuffer.cpp
+++ b/src/io/ByteBuffer.cpp
@@ -22,16 +22,16 @@
 
 namespace rocketmq {
 
-ByteBuffer* ByteBuffer::allocate(int32_t capacity) {
+std::unique_ptr<ByteBuffer> ByteBuffer::allocate(int32_t capacity) {
   if (capacity < 0) {
     throw std::invalid_argument("");
   }
-  return new DefaultByteBuffer(capacity, capacity);
+  return std::unique_ptr<ByteBuffer>(new DefaultByteBuffer(capacity, capacity));
 }
 
-ByteBuffer* ByteBuffer::wrap(ByteArrayRef array, int32_t offset, int32_t length) {
+std::unique_ptr<ByteBuffer> ByteBuffer::wrap(ByteArrayRef array, int32_t offset, int32_t length) {
   try {
-    return new DefaultByteBuffer(std::move(array), offset, length);
+    return std::unique_ptr<ByteBuffer>(new DefaultByteBuffer(std::move(array), offset, length));
   } catch (const std::exception& x) {
     throw std::runtime_error("IndexOutOfBoundsException");
   }
diff --git a/src/io/ByteBuffer.hpp b/src/io/ByteBuffer.hpp
index 5a30250..4e3ea04 100644
--- a/src/io/ByteBuffer.hpp
+++ b/src/io/ByteBuffer.hpp
@@ -19,18 +19,18 @@
 
 #include <sstream>  // std::stringstream
 
-#include "ByteArray.h"
 #include "Buffer.hpp"
+#include "ByteArray.h"
 #include "ByteOrder.h"
 
 namespace rocketmq {
 
 class ByteBuffer : public Buffer<char> {
  public:
-  static ByteBuffer* allocate(int32_t capacity);
+  static std::unique_ptr<ByteBuffer> allocate(int32_t capacity);
 
-  static ByteBuffer* wrap(ByteArrayRef array, int32_t offset, int32_t length);
-  static ByteBuffer* wrap(ByteArrayRef array) { return wrap(array, 0, array->size()); }
+  static std::unique_ptr<ByteBuffer> wrap(ByteArrayRef array, int32_t offset, int32_t length);
+  static std::unique_ptr<ByteBuffer> wrap(ByteArrayRef array) { return wrap(array, 0, array->size()); }
 
  protected:
   ByteBuffer(int32_t mark, int32_t pos, int32_t lim, int32_t cap, ByteArrayRef byte_array, int32_t offset)
@@ -46,9 +46,9 @@ class ByteBuffer : public Buffer<char> {
   virtual ~ByteBuffer() = default;
 
  public:
-  virtual ByteBuffer* slice() = 0;
-  virtual ByteBuffer* duplicate() = 0;
-  virtual ByteBuffer* asReadOnlyBuffer() = 0;
+  virtual std::unique_ptr<ByteBuffer> slice() = 0;
+  virtual std::unique_ptr<ByteBuffer> duplicate() = 0;
+  virtual std::unique_ptr<ByteBuffer> asReadOnlyBuffer() = 0;
 
   // get/put routine
   // ======================
diff --git a/src/io/DefaultByteBuffer.hpp b/src/io/DefaultByteBuffer.hpp
index ca740ba..55b225d 100644
--- a/src/io/DefaultByteBuffer.hpp
+++ b/src/io/DefaultByteBuffer.hpp
@@ -20,6 +20,7 @@
 #include <cstdlib>  // std::memcpy
 
 #include <algorithm>  // std::move
+#include <memory>     // std::unique_ptr
 #include <typeindex>  // std::type_index
 
 #include "ByteBuffer.hpp"
@@ -38,17 +39,19 @@ class DefaultByteBuffer : public ByteBuffer {
       : ByteBuffer(mark, pos, lim, cap, std::move(buf), off) {}
 
  public:
-  ByteBuffer* slice() override {
-    return new DefaultByteBuffer(byte_array_, -1, 0, remaining(), remaining(), position() + offset_);
+  std::unique_ptr<ByteBuffer> slice() override {
+    return std::unique_ptr<ByteBuffer>(
+        new DefaultByteBuffer(byte_array_, -1, 0, remaining(), remaining(), position() + offset_));
   }
 
-  ByteBuffer* duplicate() override {
-    return new DefaultByteBuffer(byte_array_, mark_value(), position(), limit(), capacity(), offset_);
+  std::unique_ptr<ByteBuffer> duplicate() override {
+    return std::unique_ptr<ByteBuffer>(
+        new DefaultByteBuffer(byte_array_, mark_value(), position(), limit(), capacity(), offset_));
   }
 
-  ByteBuffer* asReadOnlyBuffer() override {
+  std::unique_ptr<ByteBuffer> asReadOnlyBuffer() override {
     // return new HeapByteBufferR(byte_array_, mark_value(), position(), limit(), capacity(), offset_);
-    return nullptr;
+    return std::unique_ptr<ByteBuffer>();
   }
 
   char get() override { return (*byte_array_)[ix(nextGetIndex())]; }
diff --git a/src/message/MessageClientIDSetter.cpp b/src/message/MessageClientIDSetter.cpp
index 5560ba1..5dfba1c 100644
--- a/src/message/MessageClientIDSetter.cpp
+++ b/src/message/MessageClientIDSetter.cpp
@@ -37,7 +37,7 @@ MessageClientIDSetter::MessageClientIDSetter() {
   std::unique_ptr<ByteBuffer> buffer;
   sockaddr* addr = GetSelfIP();
   if (addr != nullptr) {
-    buffer.reset(ByteBuffer::allocate(SockaddrSize(addr) + 2 + 4));
+    buffer = ByteBuffer::allocate(SockaddrSize(addr) + 2 + 4);
     if (addr->sa_family == AF_INET) {
       auto* sin = (struct sockaddr_in*)addr;
       buffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize));
@@ -49,7 +49,7 @@ MessageClientIDSetter::MessageClientIDSetter() {
     }
   }
   if (buffer == nullptr) {
-    buffer.reset(ByteBuffer::allocate(4 + 2 + 4));
+    buffer = ByteBuffer::allocate(4 + 2 + 4);
     buffer->putInt(UtilAll::currentTimeMillis());
   }
   buffer->putShort(UtilAll::getProcessId());
diff --git a/src/producer/DefaultMQProducerImpl.cpp b/src/producer/DefaultMQProducerImpl.cpp
index 0105bd6..9e9ecc2 100644
--- a/src/producer/DefaultMQProducerImpl.cpp
+++ b/src/producer/DefaultMQProducerImpl.cpp
@@ -612,10 +612,10 @@ void DefaultMQProducerImpl::prepareSendRequest(Message& msg, long timeout) {
   }
 }
 
-SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
-                                                   CommunicationMode communicationMode,
-                                                   SendCallback* sendCallback,
-                                                   long timeout) {
+std::unique_ptr<SendResult> DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
+                                                                   CommunicationMode communicationMode,
+                                                                   SendCallback* sendCallback,
+                                                                   long timeout) {
   Validators::checkMessage(*msg, dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size());
 
   uint64_t beginTimestampFirst = UtilAll::currentTimeMillis();
@@ -647,8 +647,7 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
           break;
         }
 
-        sendResult.reset(
-            sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime));
+        sendResult = sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
         endTimestamp = UtilAll::currentTimeMillis();
         updateFaultItem(mq.broker_name(), endTimestamp - beginTimestampPrev, false);
         switch (communicationMode) {
@@ -664,7 +663,7 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
               }
             }
 
-            return sendResult.release();
+            return sendResult;
           default:
             break;
         }
@@ -679,7 +678,7 @@ SendResult* DefaultMQProducerImpl::sendDefaultImpl(MessagePtr msg,
     }  // end of for
 
     if (sendResult != nullptr) {
-      return sendResult.release();
+      return sendResult;
     }
 
     std::string info = "Send [" + UtilAll::to_string(times) + "] times, still failed, cost [" +
@@ -700,12 +699,12 @@ void DefaultMQProducerImpl::updateFaultItem(const std::string& brokerName, const
   mq_fault_strategy_->updateFaultItem(brokerName, currentLatency, isolation);
 }
 
-SendResult* DefaultMQProducerImpl::sendKernelImpl(MessagePtr msg,
-                                                  const MQMessageQueue& mq,
-                                                  CommunicationMode communicationMode,
-                                                  SendCallback* sendCallback,
-                                                  TopicPublishInfoPtr topicPublishInfo,
-                                                  long timeout) {
+std::unique_ptr<SendResult> DefaultMQProducerImpl::sendKernelImpl(MessagePtr msg,
+                                                                  const MQMessageQueue& mq,
+                                                                  CommunicationMode communicationMode,
+                                                                  SendCallback* sendCallback,
+                                                                  TopicPublishInfoPtr topicPublishInfo,
+                                                                  long timeout) {
   uint64_t beginStartTime = UtilAll::currentTimeMillis();
   std::string brokerAddr = client_instance_->findBrokerAddressInPublish(mq.broker_name());
   if (brokerAddr.empty()) {
@@ -763,7 +762,7 @@ SendResult* DefaultMQProducerImpl::sendKernelImpl(MessagePtr msg,
         }
       }
 
-      SendResult* sendResult = nullptr;
+      std::unique_ptr<SendResult> sendResult;
       switch (communicationMode) {
         case ASYNC: {
           long costTimeAsync = UtilAll::currentTimeMillis() - beginStartTime;
@@ -825,12 +824,12 @@ bool DefaultMQProducerImpl::tryToCompressMessage(Message& msg) {
   return false;
 }
 
-SendResult* DefaultMQProducerImpl::sendSelectImpl(MessagePtr msg,
-                                                  MessageQueueSelector* selector,
-                                                  void* arg,
-                                                  CommunicationMode communicationMode,
-                                                  SendCallback* sendCallback,
-                                                  long timeout) {
+std::unique_ptr<SendResult> DefaultMQProducerImpl::sendSelectImpl(MessagePtr msg,
+                                                                  MessageQueueSelector* selector,
+                                                                  void* arg,
+                                                                  CommunicationMode communicationMode,
+                                                                  SendCallback* sendCallback,
+                                                                  long timeout) {
   auto beginStartTime = UtilAll::currentTimeMillis();
   Validators::checkMessage(*msg, dynamic_cast<DefaultMQProducerConfig*>(client_config_.get())->max_message_size());
 
@@ -869,7 +868,9 @@ TransactionListener* DefaultMQProducerImpl::getCheckListener() {
   return nullptr;
 };
 
-TransactionSendResult* DefaultMQProducerImpl::sendMessageInTransactionImpl(MessagePtr msg, void* arg, long timeout) {
+std::unique_ptr<TransactionSendResult> DefaultMQProducerImpl::sendMessageInTransactionImpl(MessagePtr msg,
+                                                                                           void* arg,
+                                                                                           long timeout) {
   auto* transactionListener = getCheckListener();
   if (nullptr == transactionListener) {
     THROW_MQEXCEPTION(MQClientException, "transactionListener is null", -1);
@@ -879,7 +880,7 @@ TransactionSendResult* DefaultMQProducerImpl::sendMessageInTransactionImpl(Messa
   MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_TRANSACTION_PREPARED, "true");
   MessageAccessor::putProperty(*msg, MQMessageConst::PROPERTY_PRODUCER_GROUP, client_config_->group_name());
   try {
-    sendResult.reset(sendDefaultImpl(msg, SYNC, nullptr, timeout));
+    sendResult = sendDefaultImpl(msg, SYNC, nullptr, timeout);
   } catch (MQException& e) {
     THROW_MQEXCEPTION(MQClientException, "send message Exception", -1);
   }
@@ -923,7 +924,7 @@ TransactionSendResult* DefaultMQProducerImpl::sendMessageInTransactionImpl(Messa
   }
 
   // FIXME: setTransactionId will cause OOM?
-  TransactionSendResult* transactionSendResult = new TransactionSendResult(*sendResult.get());
+  std::unique_ptr<TransactionSendResult> transactionSendResult(new TransactionSendResult(*sendResult));
   transactionSendResult->set_transaction_id(msg->transaction_id());
   transactionSendResult->set_local_transaction_state(localTransactionState);
   return transactionSendResult;
diff --git a/src/producer/DefaultMQProducerImpl.h b/src/producer/DefaultMQProducerImpl.h
index 57eef51..3a3073c 100644
--- a/src/producer/DefaultMQProducerImpl.h
+++ b/src/producer/DefaultMQProducerImpl.h
@@ -127,28 +127,28 @@ class DefaultMQProducerImpl : public std::enable_shared_from_this<DefaultMQProdu
                              CheckTransactionStateRequestHeader* checkRequestHeader) override;
 
  private:
-  SendResult* sendDefaultImpl(MessagePtr msg,
-                              CommunicationMode communicationMode,
-                              SendCallback* sendCallback,
-                              long timeout);
-
-  SendResult* sendKernelImpl(MessagePtr msg,
-                             const MQMessageQueue& mq,
-                             CommunicationMode communicationMode,
-                             SendCallback* sendCallback,
-                             std::shared_ptr<const TopicPublishInfo> topicPublishInfo,
-                             long timeout);
+  std::unique_ptr<SendResult> sendDefaultImpl(MessagePtr msg,
+                                              CommunicationMode communicationMode,
+                                              SendCallback* sendCallback,
+                                              long timeout);
+
+  std::unique_ptr<SendResult> sendKernelImpl(MessagePtr msg,
+                                             const MQMessageQueue& mq,
+                                             CommunicationMode communicationMode,
+                                             SendCallback* sendCallback,
+                                             std::shared_ptr<const TopicPublishInfo> topicPublishInfo,
+                                             long timeout);
 
   bool tryToCompressMessage(Message& msg);
 
-  SendResult* sendSelectImpl(MessagePtr msg,
-                             MessageQueueSelector* selector,
-                             void* arg,
-                             CommunicationMode communicationMode,
-                             SendCallback* sendCallback,
-                             long timeout);
+  std::unique_ptr<SendResult> sendSelectImpl(MessagePtr msg,
+                                             MessageQueueSelector* selector,
+                                             void* arg,
+                                             CommunicationMode communicationMode,
+                                             SendCallback* sendCallback,
+                                             long timeout);
 
-  TransactionSendResult* sendMessageInTransactionImpl(MessagePtr msg, void* arg, long timeout);
+  std::unique_ptr<TransactionSendResult> sendMessageInTransactionImpl(MessagePtr msg, void* arg, long timeout);
 
   void endTransaction(SendResult& sendResult,
                       LocalTransactionState localTransactionState,
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index 4e6284f..02038fe 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -22,8 +22,8 @@
 #include <atomic>     // std::atomic
 #include <limits>     // std::numeric_limits
 
-#include "ByteOrder.h"
 #include "ByteBuffer.hpp"
+#include "ByteOrder.h"
 #include "Logging.h"
 #include "MQVersion.h"
 #include "RemotingSerializable.h"
@@ -133,7 +133,7 @@ static inline int32_t getHeaderLength(int32_t length) {
   return length & 0x00FFFFFF;
 }
 
-static RemotingCommand* Decode(ByteBuffer& byteBuffer, bool hasPackageLength) {
+static std::unique_ptr<RemotingCommand> Decode(ByteBuffer& byteBuffer, bool hasPackageLength) {
   // decode package: [4 bytes(packageLength) +] 4 bytes(headerLength) + header + body
 
   int32_t length = byteBuffer.limit();
@@ -196,10 +196,10 @@ static RemotingCommand* Decode(ByteBuffer& byteBuffer, bool hasPackageLength) {
   LOG_DEBUG_NEW("code:{}, language:{}, version:{}, opaque:{}, flag:{}, remark:{}, headLen:{}, bodyLen:{}", code,
                 language, version, opaque, flag, remark, headerLength, bodyLength);
 
-  return cmd.release();
+  return cmd;
 }
 
-RemotingCommand* RemotingCommand::Decode(ByteArrayRef array, bool hasPackageLength) {
+std::unique_ptr<RemotingCommand> RemotingCommand::Decode(ByteArrayRef array, bool hasPackageLength) {
   std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::wrap(std::move(array)));
   return rocketmq::Decode(*byteBuffer, hasPackageLength);
 }
diff --git a/src/protocol/TopicList.h b/src/protocol/TopicList.h
index bbcb7cc..af004c0 100644
--- a/src/protocol/TopicList.h
+++ b/src/protocol/TopicList.h
@@ -26,7 +26,7 @@ namespace rocketmq {
 
 class TopicList {
  public:
-  static TopicList* Decode(const ByteArray& mem) { return new TopicList(); }
+  static std::unique_ptr<TopicList> Decode(const ByteArray& mem) { return std::unique_ptr<TopicList>(new TopicList()); }
 
  private:
   std::vector<std::string> topic_list_;
diff --git a/src/protocol/body/LockBatchResponseBody.hpp b/src/protocol/body/LockBatchResponseBody.hpp
index af16bf2..f443f88 100644
--- a/src/protocol/body/LockBatchResponseBody.hpp
+++ b/src/protocol/body/LockBatchResponseBody.hpp
@@ -28,7 +28,7 @@ namespace rocketmq {
 
 class LockBatchResponseBody {
  public:
-  static LockBatchResponseBody* Decode(const ByteArray& bodyData) {
+  static std::unique_ptr<LockBatchResponseBody> Decode(const ByteArray& bodyData) {
     Json::Value root = RemotingSerializable::fromJson(bodyData);
     auto& mqs = root["lockOKMQSet"];
     std::unique_ptr<LockBatchResponseBody> body(new LockBatchResponseBody());
@@ -37,7 +37,7 @@ class LockBatchResponseBody {
       LOG_INFO_NEW("LockBatchResponseBody MQ:{}", mq.toString());
       body->lock_ok_mq_set().push_back(std::move(mq));
     }
-    return body.release();
+    return body;
   }
 
  public:
diff --git a/src/protocol/body/ResetOffsetBody.hpp b/src/protocol/body/ResetOffsetBody.hpp
index 157e567..f5a5bfc 100644
--- a/src/protocol/body/ResetOffsetBody.hpp
+++ b/src/protocol/body/ResetOffsetBody.hpp
@@ -26,7 +26,7 @@ namespace rocketmq {
 
 class ResetOffsetBody {
  public:
-  static ResetOffsetBody* Decode(const ByteArray& bodyData) {
+  static std::unique_ptr<ResetOffsetBody> Decode(const ByteArray& bodyData) {
     // FIXME: object as key
     Json::Value root = RemotingSerializable::fromJson(bodyData);
     auto& qds = root["offsetTable"];
@@ -38,7 +38,7 @@ class ResetOffsetBody {
       int64_t offset = qds[member].asInt64();
       body->offset_table_.emplace(std::move(mq), offset);
     }
-    return body.release();
+    return body;
   }
 
  public:
diff --git a/src/protocol/body/TopicRouteData.hpp b/src/protocol/body/TopicRouteData.hpp
index 38e1889..887bcc3 100644
--- a/src/protocol/body/TopicRouteData.hpp
+++ b/src/protocol/body/TopicRouteData.hpp
@@ -106,7 +106,7 @@ typedef std::shared_ptr<TopicRouteData> TopicRouteDataPtr;
 
 class TopicRouteData {
  public:
-  static TopicRouteData* Decode(const ByteArray& bodyData) {
+  static std::unique_ptr<TopicRouteData> Decode(const ByteArray& bodyData) {
     Json::Value root = RemotingSerializable::fromJson(bodyData);
 
     std::unique_ptr<TopicRouteData> trd(new TopicRouteData());
@@ -136,7 +136,7 @@ class TopicRouteData {
     }
     sort(trd->broker_datas().begin(), trd->broker_datas().end());
 
-    return trd.release();
+    return trd;
   }
 
   /**
diff --git a/src/protocol/header/CommandHeader.cpp b/src/protocol/header/CommandHeader.cpp
index 4a7e7ca..b75757c 100644
--- a/src/protocol/header/CommandHeader.cpp
+++ b/src/protocol/header/CommandHeader.cpp
@@ -95,7 +95,7 @@ void CreateTopicRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::str
 // CheckTransactionStateRequestHeader
 //######################################
 
-CheckTransactionStateRequestHeader* CheckTransactionStateRequestHeader::Decode(
+std::unique_ptr<CheckTransactionStateRequestHeader> CheckTransactionStateRequestHeader::Decode(
     std::map<std::string, std::string>& extFields) {
   std::unique_ptr<CheckTransactionStateRequestHeader> header(new CheckTransactionStateRequestHeader());
   header->tranStateTableOffset = std::stoll(extFields.at("tranStateTableOffset"));
@@ -116,7 +116,7 @@ CheckTransactionStateRequestHeader* CheckTransactionStateRequestHeader::Decode(
     header->offsetMsgId = it->second;
   }
 
-  return header.release();
+  return header;
 }
 
 void CheckTransactionStateRequestHeader::SetDeclaredFieldOfCommandHeader(
@@ -246,8 +246,9 @@ void SendMessageRequestHeader::setReconsumeTimes(int _reconsumeTimes) {
 // SendMessageRequestHeaderV2
 //######################################
 
-SendMessageRequestHeaderV2* SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(SendMessageRequestHeader* v1) {
-  SendMessageRequestHeaderV2* v2 = new SendMessageRequestHeaderV2();
+std::unique_ptr<SendMessageRequestHeaderV2> SendMessageRequestHeaderV2::createSendMessageRequestHeaderV2(
+    SendMessageRequestHeader* v1) {
+  std::unique_ptr<SendMessageRequestHeaderV2> v2(new SendMessageRequestHeaderV2());
   v2->a = v1->producerGroup;
   v2->b = v1->topic;
   v2->c = v1->defaultTopic;
@@ -312,7 +313,8 @@ void SendMessageRequestHeaderV2::SetDeclaredFieldOfCommandHeader(std::map<std::s
 // SendMessageResponseHeader
 //######################################
 
-SendMessageResponseHeader* SendMessageResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<SendMessageResponseHeader> SendMessageResponseHeader::Decode(
+    std::map<std::string, std::string>& extFields) {
   std::unique_ptr<SendMessageResponseHeader> header(new SendMessageResponseHeader());
   header->msgId = extFields.at("msgId");
   header->queueId = std::stoi(extFields.at("queueId"));
@@ -323,7 +325,7 @@ SendMessageResponseHeader* SendMessageResponseHeader::Decode(std::map<std::strin
     header->transactionId = it->second;
   }
 
-  return header.release();
+  return header;
 }
 
 void SendMessageResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -378,13 +380,14 @@ void PullMessageRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::str
 // PullMessageResponseHeader
 //######################################
 
-PullMessageResponseHeader* PullMessageResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<PullMessageResponseHeader> PullMessageResponseHeader::Decode(
+    std::map<std::string, std::string>& extFields) {
   std::unique_ptr<PullMessageResponseHeader> header(new PullMessageResponseHeader());
   header->suggestWhichBrokerId = std::stoll(extFields.at("suggestWhichBrokerId"));
   header->nextBeginOffset = std::stoll(extFields.at("nextBeginOffset"));
   header->minOffset = std::stoll(extFields.at("minOffset"));
   header->maxOffset = std::stoll(extFields.at("maxOffset"));
-  return header.release();
+  return header;
 }
 
 void PullMessageResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -422,10 +425,11 @@ void GetMinOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::st
 // GetMinOffsetResponseHeader
 //######################################
 
-GetMinOffsetResponseHeader* GetMinOffsetResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<GetMinOffsetResponseHeader> GetMinOffsetResponseHeader::Decode(
+    std::map<std::string, std::string>& extFields) {
   std::unique_ptr<GetMinOffsetResponseHeader> header(new GetMinOffsetResponseHeader());
   header->offset = std::stoll(extFields.at("offset"));
-  return header.release();
+  return header;
 }
 
 void GetMinOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -451,10 +455,11 @@ void GetMaxOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::st
 // GetMaxOffsetResponseHeader
 //######################################
 
-GetMaxOffsetResponseHeader* GetMaxOffsetResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<GetMaxOffsetResponseHeader> GetMaxOffsetResponseHeader::Decode(
+    std::map<std::string, std::string>& extFields) {
   std::unique_ptr<GetMaxOffsetResponseHeader> header(new GetMaxOffsetResponseHeader());
   header->offset = std::stoll(extFields.at("offset"));
-  return header.release();
+  return header;
 }
 
 void GetMaxOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -482,10 +487,11 @@ void SearchOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<std::st
 // SearchOffsetResponseHeader
 //######################################
 
-SearchOffsetResponseHeader* SearchOffsetResponseHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<SearchOffsetResponseHeader> SearchOffsetResponseHeader::Decode(
+    std::map<std::string, std::string>& extFields) {
   std::unique_ptr<SearchOffsetResponseHeader> header(new SearchOffsetResponseHeader());
   header->offset = std::stoll(extFields.at("offset"));
-  return header.release();
+  return header;
 }
 
 void SearchOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) {
@@ -524,11 +530,11 @@ void GetEarliestMsgStoretimeRequestHeader::SetDeclaredFieldOfCommandHeader(
 // GetEarliestMsgStoretimeResponseHeader
 //######################################
 
-GetEarliestMsgStoretimeResponseHeader* GetEarliestMsgStoretimeResponseHeader::Decode(
+std::unique_ptr<GetEarliestMsgStoretimeResponseHeader> GetEarliestMsgStoretimeResponseHeader::Decode(
     std::map<std::string, std::string>& extFields) {
   std::unique_ptr<GetEarliestMsgStoretimeResponseHeader> header(new GetEarliestMsgStoretimeResponseHeader());
   header->timestamp = std::stoll(extFields.at("timestamp"));
-  return header.release();
+  return header;
 }
 
 void GetEarliestMsgStoretimeResponseHeader::SetDeclaredFieldOfCommandHeader(
@@ -570,11 +576,11 @@ void QueryConsumerOffsetRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<
 // QueryConsumerOffsetResponseHeader
 //######################################
 
-QueryConsumerOffsetResponseHeader* QueryConsumerOffsetResponseHeader::Decode(
+std::unique_ptr<QueryConsumerOffsetResponseHeader> QueryConsumerOffsetResponseHeader::Decode(
     std::map<std::string, std::string>& extFields) {
   std::unique_ptr<QueryConsumerOffsetResponseHeader> header(new QueryConsumerOffsetResponseHeader());
   header->offset = std::stoll(extFields.at("offset"));
-  return header.release();
+  return header;
 }
 
 void QueryConsumerOffsetResponseHeader::SetDeclaredFieldOfCommandHeader(
@@ -642,14 +648,15 @@ void ConsumerSendMsgBackRequestHeader::SetDeclaredFieldOfCommandHeader(std::map<
 // GetConsumerListByGroupResponseBody
 //######################################
 
-GetConsumerListByGroupResponseBody* GetConsumerListByGroupResponseBody::Decode(const ByteArray& bodyData) {
+std::unique_ptr<GetConsumerListByGroupResponseBody> GetConsumerListByGroupResponseBody::Decode(
+    const ByteArray& bodyData) {
   Json::Value root = RemotingSerializable::fromJson(bodyData);
   auto& ids = root["consumerIdList"];
   std::unique_ptr<GetConsumerListByGroupResponseBody> body(new GetConsumerListByGroupResponseBody());
   for (unsigned int i = 0; i < ids.size(); i++) {
     body->consumerIdList.push_back(ids[i].asString());
   }
-  return body.release();
+  return body;
 }
 
 void GetConsumerListByGroupResponseBody::SetDeclaredFieldOfCommandHeader(
@@ -659,7 +666,8 @@ void GetConsumerListByGroupResponseBody::SetDeclaredFieldOfCommandHeader(
 // ResetOffsetRequestHeader
 //######################################
 
-ResetOffsetRequestHeader* ResetOffsetRequestHeader::Decode(std::map<std::string, std::string>& extFields) {
+std::unique_ptr<ResetOffsetRequestHeader> ResetOffsetRequestHeader::Decode(
+    std::map<std::string, std::string>& extFields) {
   std::unique_ptr<ResetOffsetRequestHeader> header(new ResetOffsetRequestHeader());
   header->topic = extFields.at("topic");
   header->group = extFields.at("group");
@@ -667,7 +675,7 @@ ResetOffsetRequestHeader* ResetOffsetRequestHeader::Decode(std::map<std::string,
   header->isForce = UtilAll::stob(extFields.at("isForce"));
   LOG_INFO_NEW("topic:{}, group:{}, timestamp:{}, isForce:{}", header->topic, header->group, header->timestamp,
                header->isForce);
-  return header.release();
+  return header;
 }
 
 void ResetOffsetRequestHeader::setTopic(const std::string& tmp) {
@@ -706,7 +714,7 @@ const bool ResetOffsetRequestHeader::getForceFlag() const {
 // GetConsumerRunningInfoRequestHeader
 //######################################
 
-GetConsumerRunningInfoRequestHeader* GetConsumerRunningInfoRequestHeader::Decode(
+std::unique_ptr<GetConsumerRunningInfoRequestHeader> GetConsumerRunningInfoRequestHeader::Decode(
     std::map<std::string, std::string>& extFields) {
   std::unique_ptr<GetConsumerRunningInfoRequestHeader> header(new GetConsumerRunningInfoRequestHeader());
   header->consumerGroup = extFields.at("consumerGroup");
@@ -714,7 +722,7 @@ GetConsumerRunningInfoRequestHeader* GetConsumerRunningInfoRequestHeader::Decode
   header->jstackEnable = UtilAll::stob(extFields.at("jstackEnable"));
   LOG_INFO("consumerGroup:%s, clientId:%s,  jstackEnable:%d", header->consumerGroup.c_str(), header->clientId.c_str(),
            header->jstackEnable);
-  return header.release();
+  return header;
 }
 
 void GetConsumerRunningInfoRequestHeader::Encode(Json::Value& extFields) {
@@ -758,11 +766,11 @@ void GetConsumerRunningInfoRequestHeader::setJstackEnable(const bool& jstackEnab
 // NotifyConsumerIdsChangedRequestHeader
 //######################################
 
-NotifyConsumerIdsChangedRequestHeader* NotifyConsumerIdsChangedRequestHeader::Decode(
+std::unique_ptr<NotifyConsumerIdsChangedRequestHeader> NotifyConsumerIdsChangedRequestHeader::Decode(
     std::map<std::string, std::string>& extFields) {
   std::unique_ptr<NotifyConsumerIdsChangedRequestHeader> header(new NotifyConsumerIdsChangedRequestHeader());
   header->consumerGroup = extFields.at("consumerGroup");
-  return header.release();
+  return header;
 }
 
 const std::string& NotifyConsumerIdsChangedRequestHeader::getConsumerGroup() const {
diff --git a/src/protocol/header/CommandHeader.h b/src/protocol/header/CommandHeader.h
index 31c10eb..8963510 100644
--- a/src/protocol/header/CommandHeader.h
+++ b/src/protocol/header/CommandHeader.h
@@ -17,6 +17,7 @@
 #ifndef ROCKETMQ_PROTOCOL_COMMANDHEADER_H_
 #define ROCKETMQ_PROTOCOL_COMMANDHEADER_H_
 
+#include <memory>  // std::unique_ptr
 #include <vector>  // std::vector
 
 #include "ByteArray.h"
@@ -71,7 +72,7 @@ class CheckTransactionStateRequestHeader : public CommandCustomHeader {
  public:
   CheckTransactionStateRequestHeader() : tranStateTableOffset(0), commitLogOffset(0) {}
 
-  static CheckTransactionStateRequestHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<CheckTransactionStateRequestHeader> Decode(std::map<std::string, std::string>& extFields);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
   std::string toString() const;
 
@@ -139,7 +140,7 @@ class SendMessageRequestHeader : public CommandCustomHeader {
 
 class SendMessageRequestHeaderV2 : public CommandCustomHeader {
  public:
-  static SendMessageRequestHeaderV2* createSendMessageRequestHeaderV2(SendMessageRequestHeader* v1);
+  static std::unique_ptr<SendMessageRequestHeaderV2> createSendMessageRequestHeaderV2(SendMessageRequestHeader* v1);
 
   void Encode(Json::Value& outData) override;
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
@@ -167,7 +168,7 @@ class SendMessageResponseHeader : public CommandCustomHeader {
  public:
   SendMessageResponseHeader() : queueId(0), queueOffset(0) {}
 
-  static SendMessageResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<SendMessageResponseHeader> Decode(std::map<std::string, std::string>& extFields);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
  public:
@@ -209,7 +210,7 @@ class PullMessageResponseHeader : public CommandCustomHeader {
  public:
   PullMessageResponseHeader() : suggestWhichBrokerId(0), nextBeginOffset(0), minOffset(0), maxOffset(0) {}
 
-  static PullMessageResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<PullMessageResponseHeader> Decode(std::map<std::string, std::string>& extFields);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
  public:
@@ -241,7 +242,7 @@ class GetMinOffsetResponseHeader : public CommandCustomHeader {
  public:
   GetMinOffsetResponseHeader() : offset(0) {}
 
-  static GetMinOffsetResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<GetMinOffsetResponseHeader> Decode(std::map<std::string, std::string>& extFields);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
  public:
@@ -264,7 +265,7 @@ class GetMaxOffsetResponseHeader : public CommandCustomHeader {
  public:
   GetMaxOffsetResponseHeader() : offset(0) {}
 
-  static GetMaxOffsetResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<GetMaxOffsetResponseHeader> Decode(std::map<std::string, std::string>& extFields);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
  public:
@@ -288,7 +289,7 @@ class SearchOffsetResponseHeader : public CommandCustomHeader {
  public:
   SearchOffsetResponseHeader() : offset(0) {}
 
-  static SearchOffsetResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<SearchOffsetResponseHeader> Decode(std::map<std::string, std::string>& extFields);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
  public:
@@ -322,7 +323,7 @@ class GetEarliestMsgStoretimeResponseHeader : public CommandCustomHeader {
  public:
   GetEarliestMsgStoretimeResponseHeader() : timestamp(0) {}
 
-  static GetEarliestMsgStoretimeResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<GetEarliestMsgStoretimeResponseHeader> Decode(std::map<std::string, std::string>& extFields);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
  public:
@@ -355,7 +356,7 @@ class QueryConsumerOffsetResponseHeader : public CommandCustomHeader {
  public:
   QueryConsumerOffsetResponseHeader() : offset(0) {}
 
-  static QueryConsumerOffsetResponseHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<QueryConsumerOffsetResponseHeader> Decode(std::map<std::string, std::string>& extFields);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
  public:
@@ -395,7 +396,7 @@ class ConsumerSendMsgBackRequestHeader : public CommandCustomHeader {
 
 class GetConsumerListByGroupResponseBody : public CommandCustomHeader {
  public:
-  static GetConsumerListByGroupResponseBody* Decode(const ByteArray& bodyData);
+  static std::unique_ptr<GetConsumerListByGroupResponseBody> Decode(const ByteArray& bodyData);
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
  public:
@@ -406,7 +407,7 @@ class ResetOffsetRequestHeader : public CommandCustomHeader {
  public:
   ResetOffsetRequestHeader() : timestamp(0), isForce(false) {}
 
-  static ResetOffsetRequestHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<ResetOffsetRequestHeader> Decode(std::map<std::string, std::string>& extFields);
 
   const std::string& getTopic() const;
   void setTopic(const std::string& tmp);
@@ -431,7 +432,7 @@ class GetConsumerRunningInfoRequestHeader : public CommandCustomHeader {
  public:
   GetConsumerRunningInfoRequestHeader() : jstackEnable(false) {}
 
-  static GetConsumerRunningInfoRequestHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<GetConsumerRunningInfoRequestHeader> Decode(std::map<std::string, std::string>& extFields);
   void Encode(Json::Value& extFields) override;
   void SetDeclaredFieldOfCommandHeader(std::map<std::string, std::string>& requestMap) override;
 
@@ -452,7 +453,7 @@ class GetConsumerRunningInfoRequestHeader : public CommandCustomHeader {
 
 class NotifyConsumerIdsChangedRequestHeader : public CommandCustomHeader {
  public:
-  static NotifyConsumerIdsChangedRequestHeader* Decode(std::map<std::string, std::string>& extFields);
+  static std::unique_ptr<NotifyConsumerIdsChangedRequestHeader> Decode(std::map<std::string, std::string>& extFields);
 
   const std::string& getConsumerGroup() const;
   void setConsumerGroup(const std::string& tmp);
diff --git a/src/protocol/header/ReplyMessageRequestHeader.hpp b/src/protocol/header/ReplyMessageRequestHeader.hpp
index 3aaab7b..bfeb35a 100644
--- a/src/protocol/header/ReplyMessageRequestHeader.hpp
+++ b/src/protocol/header/ReplyMessageRequestHeader.hpp
@@ -26,7 +26,7 @@ namespace rocketmq {
 
 class ReplyMessageRequestHeader : public CommandCustomHeader {
  public:
-  static ReplyMessageRequestHeader* Decode(std::map<std::string, std::string>& extFields) {
+  static std::unique_ptr<ReplyMessageRequestHeader> Decode(std::map<std::string, std::string>& extFields) {
     std::unique_ptr<ReplyMessageRequestHeader> header(new ReplyMessageRequestHeader());
 
     header->producer_group_ = extFields.at("producerGroup");
@@ -61,7 +61,7 @@ class ReplyMessageRequestHeader : public CommandCustomHeader {
     header->store_host_ = extFields.at("storeHost");
     header->store_timestamp_ = std::stoll(extFields.at("storeTimestamp"));
 
-    return header.release();
+    return header;
   }
 
  public:
diff --git a/src/transport/RequestProcessor.h b/src/transport/RequestProcessor.h
index 2e62391..15b5861 100644
--- a/src/transport/RequestProcessor.h
+++ b/src/transport/RequestProcessor.h
@@ -26,7 +26,7 @@ class RequestProcessor {
  public:
   virtual ~RequestProcessor() = default;
 
-  virtual RemotingCommand* processRequest(TcpTransportPtr channel, RemotingCommand* request) = 0;
+  virtual std::unique_ptr<RemotingCommand> processRequest(TcpTransportPtr channel, RemotingCommand* request) = 0;
 };
 
 }  // namespace rocketmq
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 655a811..c524cb7 100644
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -564,7 +564,7 @@ void TcpRemotingClient::messageReceived(ByteArrayRef msg, TcpTransportPtr channe
 void TcpRemotingClient::processMessageReceived(ByteArrayRef msg, TcpTransportPtr channel) {
   std::unique_ptr<RemotingCommand> cmd;
   try {
-    cmd.reset(RemotingCommand::Decode(std::move(msg)));
+    cmd = RemotingCommand::Decode(std::move(msg));
   } catch (...) {
     LOG_ERROR_NEW("processMessageReceived error");
     return;
@@ -635,7 +635,7 @@ void TcpRemotingClient::processRequestCommand(std::unique_ptr<RemotingCommand> r
       auto* processor = it->second;
 
       doBeforeRpcHooks(channel->getPeerAddrAndPort(), *requestCommand, false);
-      response.reset(processor->processRequest(channel, requestCommand.get()));
+      response = processor->processRequest(channel, requestCommand.get());
       doAfterRpcHooks(channel->getPeerAddrAndPort(), *response, response.get(), true);
     } catch (std::exception& e) {
       LOG_ERROR_NEW("process request exception. {}", e.what());
diff --git a/test/src/protocol/RemotingCommandTest.cpp b/test/src/protocol/RemotingCommandTest.cpp
index 42ee771..c5d0627 100644
--- a/test/src/protocol/RemotingCommandTest.cpp
+++ b/test/src/protocol/RemotingCommandTest.cpp
@@ -156,7 +156,7 @@ TEST(RemotingCommandTest, EncodeAndDecode) {
   remotingCommand2.set_body("123123");
   package = remotingCommand2.encode();
 
-  decodeRemtingCommand.reset(RemotingCommand::Decode(package, true));
+  decodeRemtingCommand = RemotingCommand::Decode(package, true);
 
   auto* header = decodeRemtingCommand->decodeCommandCustomHeader<GetConsumerRunningInfoRequestHeader>();
   EXPECT_EQ(requestHeader->getClientId(), header->getClientId());