You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2021/03/24 09:06:08 UTC

[rocketmq-client-cpp] branch re_dev updated (487dfee -> 44b846e)

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a change to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git.


    from 487dfee  refactor: Logging
     new 3bd226e  fix: leak of InvokeCallback
     new cebb202  fix: leak of AsyncPullCallback
     new 40882dc  feat: support ipv6
     new 44b846e  fix: test

The 4 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 src/ClientRemotingProcessor.cpp                    |   6 +-
 src/MQClientAPIImpl.cpp                            |  32 +--
 src/MQClientAPIImpl.h                              |   8 +-
 src/MQClientConfigImpl.hpp                         |   3 +-
 src/common/SendCallbackWrap.cpp                    |   4 +-
 src/common/UtilAll.cpp                             |  53 +----
 src/common/UtilAll.h                               |   8 -
 src/consumer/DefaultMQPushConsumerImpl.cpp         |  12 +-
 src/message/MessageBatch.cpp                       |   3 +-
 src/message/MessageClientIDSetter.cpp              |  40 ++--
 src/message/MessageClientIDSetter.h                |   2 +-
 src/message/MessageDecoder.cpp                     |  42 ++--
 src/message/MessageDecoder.h                       |   7 +-
 src/message/MessageExtImpl.cpp                     |  26 +--
 src/message/MessageExtImpl.h                       |   4 +-
 src/message/MessageId.h                            |  16 +-
 src/transport/EventLoop.cpp                        |   6 +-
 src/transport/ResponseFuture.cpp                   |  11 +-
 src/transport/ResponseFuture.h                     |  11 +-
 src/transport/SocketUtil.cpp                       | 217 +++++++++++++--------
 src/transport/SocketUtil.h                         |  45 +++--
 src/transport/TcpRemotingClient.cpp                |  36 ++--
 src/transport/TcpRemotingClient.h                  |   6 +-
 test/CMakeLists.txt                                |   7 +-
 test/src/message/MQMessageExtTest.cpp              |  14 +-
 test/src/message/MessageDecoderTest.cpp            |  22 ++-
 test/src/message/MessageIdTest.cpp                 |  13 +-
 test/src/protocol/ConsumerRunningInfoTest.cpp      |   4 +-
 test/src/protocol/ProcessQueueInfoTest.cpp         |   3 +-
 test/src/transport/ClientRemotingProcessorTest.cpp |  11 +-
 test/src/transport/ResponseFutureTest.cpp          |   7 +-
 test/src/transport/SocketUtilTest.cpp              |  10 +-
 32 files changed, 357 insertions(+), 332 deletions(-)

[rocketmq-client-cpp] 02/04: fix: leak of AsyncPullCallback

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit cebb202022a280c7da890f4a15f09559db058b55
Author: James Yin <yw...@hotmail.com>
AuthorDate: Thu Mar 11 18:29:26 2021 +0800

    fix: leak of AsyncPullCallback
---
 src/consumer/DefaultMQPushConsumerImpl.cpp | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 79bcdf5..11a12a1 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -478,7 +478,9 @@ void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pull_request) {
                                           false);                  // class filter
 
   try {
-    auto* callback = new AsyncPullCallback(shared_from_this(), pull_request, subscription_data);
+    std::unique_ptr<AsyncPullCallback> callback(
+        new AsyncPullCallback(shared_from_this(), pull_request, subscription_data));
+
     pull_api_wrapper_->pullKernelImpl(message_queue,                                        // mq
                                       subExpression,                                        // subExpression
                                       subscription_data->expression_type(),                 // expressionType
@@ -490,7 +492,9 @@ void DefaultMQPushConsumerImpl::pullMessage(PullRequestPtr pull_request) {
                                       BROKER_SUSPEND_MAX_TIME_MILLIS,        // brokerSuspendMaxTimeMillis
                                       CONSUMER_TIMEOUT_MILLIS_WHEN_SUSPEND,  // timeoutMillis
                                       CommunicationMode::ASYNC,              // communicationMode
-                                      callback);                             // pullCallback
+                                      callback.get());                       // pullCallback
+
+    (void)callback.release();
   } catch (MQException& e) {
     LOG_ERROR_NEW("pullKernelImpl exception: {}", e.what());
     executePullRequestLater(pull_request, getDefaultMQPushConsumerConfig()->pull_time_delay_millis_when_exception());

[rocketmq-client-cpp] 01/04: fix: leak of InvokeCallback

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 3bd226ee201a8e924e88420ba35de911f530ac60
Author: James Yin <yw...@hotmail.com>
AuthorDate: Thu Mar 11 01:32:53 2021 +0800

    fix: leak of InvokeCallback
---
 src/MQClientAPIImpl.cpp             | 32 ++++++++++----------------------
 src/MQClientAPIImpl.h               |  8 ++++----
 src/common/SendCallbackWrap.cpp     |  4 +---
 src/transport/ResponseFuture.cpp    | 11 +++++------
 src/transport/ResponseFuture.h      | 11 ++++++++---
 src/transport/TcpRemotingClient.cpp | 36 +++++++++++++++---------------------
 src/transport/TcpRemotingClient.h   |  6 +++---
 7 files changed, 46 insertions(+), 62 deletions(-)

diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 91c6dc1..e5372d0 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -159,21 +159,16 @@ void MQClientAPIImpl::sendMessageAsync(const std::string& addr,
                                        int64_t timeoutMillis,
                                        int retryTimesWhenSendFailed,
                                        DefaultMQProducerImplPtr producer) {
-  // delete in future
-  auto* cbw = new SendCallbackWrap(addr, brokerName, msg, std::forward<RemotingCommand>(request), sendCallback,
-                                   topicPublishInfo, instance, retryTimesWhenSendFailed, 0, producer);
-
-  try {
-    sendMessageAsyncImpl(cbw, timeoutMillis);
-  } catch (RemotingException& e) {
-    deleteAndZero(cbw);
-    throw;
-  }
+  std::unique_ptr<InvokeCallback> cbw(
+      new SendCallbackWrap(addr, brokerName, msg, std::forward<RemotingCommand>(request), sendCallback,
+                           topicPublishInfo, instance, retryTimesWhenSendFailed, 0, producer));
+  sendMessageAsyncImpl(cbw, timeoutMillis);
 }
 
-void MQClientAPIImpl::sendMessageAsyncImpl(SendCallbackWrap* cbw, int64_t timeoutMillis) {
-  const auto& addr = cbw->getAddr();
-  auto& request = cbw->getRemotingCommand();
+void MQClientAPIImpl::sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw, int64_t timeoutMillis) {
+  auto* scbw = static_cast<SendCallbackWrap*>(cbw.get());
+  const auto& addr = scbw->getAddr();
+  auto& request = scbw->getRemotingCommand();
   remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
 }
 
@@ -261,15 +256,8 @@ void MQClientAPIImpl::pullMessageAsync(const std::string& addr,
                                        RemotingCommand& request,
                                        int timeoutMillis,
                                        PullCallback* pullCallback) {
-  // delete in future
-  auto* cbw = new PullCallbackWrap(pullCallback, this);
-
-  try {
-    remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
-  } catch (RemotingException& e) {
-    deleteAndZero(cbw);
-    throw;
-  }
+  std::unique_ptr<InvokeCallback> cbw(new PullCallbackWrap(pullCallback, this));
+  remoting_client_->invokeAsync(addr, request, cbw, timeoutMillis);
 }
 
 PullResult* MQClientAPIImpl::pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis) {
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 143a6a5..535f33d 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -20,8 +20,8 @@
 #include "CommunicationMode.h"
 #include "DefaultMQProducerImpl.h"
 #include "KVTable.h"
-#include "MQException.h"
 #include "MQClientInstance.h"
+#include "MQException.h"
 #include "MQMessageExt.h"
 #include "PullCallback.h"
 #include "SendCallback.h"
@@ -29,8 +29,8 @@
 #include "TopicConfig.h"
 #include "TopicList.h"
 #include "TopicPublishInfo.hpp"
-#include "protocol/body/TopicRouteData.hpp"
 #include "protocol/body/LockBatchRequestBody.hpp"
+#include "protocol/body/TopicRouteData.hpp"
 #include "protocol/body/UnlockBatchRequestBody.hpp"
 #include "protocol/header/CommandHeader.h"
 #include "protocol/heartbeat/HeartbeatData.hpp"
@@ -40,7 +40,7 @@ namespace rocketmq {
 class TcpRemotingClient;
 class ClientRemotingProcessor;
 class RPCHook;
-class SendCallbackWrap;
+class InvokeCallback;
 
 /**
  * wrap all RPC API
@@ -182,7 +182,7 @@ class MQClientAPIImpl {
                         int retryTimesWhenSendFailed,
                         DefaultMQProducerImplPtr producer);
 
-  void sendMessageAsyncImpl(SendCallbackWrap* cbw, int64_t timeoutMillis);
+  void sendMessageAsyncImpl(std::unique_ptr<InvokeCallback>& cbw, int64_t timeoutMillis);
 
   PullResult* pullMessageSync(const std::string& addr, RemotingCommand& request, int timeoutMillis);
 
diff --git a/src/common/SendCallbackWrap.cpp b/src/common/SendCallbackWrap.cpp
index 92fd6e1..562b30f 100644
--- a/src/common/SendCallbackWrap.cpp
+++ b/src/common/SendCallbackWrap.cpp
@@ -156,9 +156,7 @@ void SendCallbackWrap::onExceptionImpl(ResponseFuture* responseFuture,
       // resend
       addr_ = std::move(addr);
       broker_name_ = std::move(retryBrokerName);
-      instance_->getMQClientAPIImpl()->sendMessageAsyncImpl(this, timeoutMillis);
-
-      responseFuture->releaseInvokeCallback();  // for avoid delete this SendCallbackWrap
+      instance_->getMQClientAPIImpl()->sendMessageAsyncImpl(responseFuture->invoke_callback(), timeoutMillis);
       return;
     } catch (MQException& e1) {
       producer->updateFaultItem(broker_name_, 3000, true);
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
index 9384135..a2887be 100755
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -20,11 +20,14 @@
 
 namespace rocketmq {
 
-ResponseFuture::ResponseFuture(int requestCode, int opaque, int64_t timeoutMillis, InvokeCallback* invokeCallback)
+ResponseFuture::ResponseFuture(int requestCode,
+                               int opaque,
+                               int64_t timeoutMillis,
+                               std::unique_ptr<InvokeCallback> invokeCallback)
     : request_code_(requestCode),
       opaque_(opaque),
       timeout_millis_(timeoutMillis),
-      invoke_callback_(invokeCallback),
+      invoke_callback_(std::move(invokeCallback)),
       begin_timestamp_(UtilAll::currentTimeMillis()),
       send_request_ok_(false),
       response_command_(nullptr),
@@ -47,10 +50,6 @@ bool ResponseFuture::hasInvokeCallback() {
   return invoke_callback_ != nullptr;
 }
 
-InvokeCallback* ResponseFuture::releaseInvokeCallback() {
-  return invoke_callback_.release();
-}
-
 void ResponseFuture::executeInvokeCallback() noexcept {
   if (invoke_callback_ != nullptr) {
     invoke_callback_->operationComplete(this);
diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h
index 56ffc67..f950743 100755
--- a/src/transport/ResponseFuture.h
+++ b/src/transport/ResponseFuture.h
@@ -17,9 +17,10 @@
 #ifndef ROCKETMQ_TRANSPORT_RESPONSEFUTURE_H_
 #define ROCKETMQ_TRANSPORT_RESPONSEFUTURE_H_
 
-#include "concurrent/latch.hpp"
+#include <memory>
 #include "InvokeCallback.h"
 #include "RemotingCommand.h"
+#include "concurrent/latch.hpp"
 
 namespace rocketmq {
 
@@ -28,13 +29,15 @@ typedef std::shared_ptr<ResponseFuture> ResponseFuturePtr;
 
 class ResponseFuture {
  public:
-  ResponseFuture(int requestCode, int opaque, int64_t timeoutMillis, InvokeCallback* invokeCallback = nullptr);
+  ResponseFuture(int requestCode,
+                 int opaque,
+                 int64_t timeoutMillis,
+                 std::unique_ptr<InvokeCallback> invokeCallback = nullptr);
   virtual ~ResponseFuture();
 
   void releaseThreadCondition();
 
   bool hasInvokeCallback();
-  InvokeCallback* releaseInvokeCallback();
 
   void executeInvokeCallback() noexcept;
 
@@ -59,6 +62,8 @@ class ResponseFuture {
   inline bool send_request_ok() const { return send_request_ok_; }
   inline void set_send_request_ok(bool sendRequestOK = true) { send_request_ok_ = sendRequestOK; };
 
+  inline std::unique_ptr<InvokeCallback>& invoke_callback() { return invoke_callback_; }
+
  private:
   int request_code_;
   int opaque_;
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index c6c222d..655a811 100644
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -279,7 +279,7 @@ std::unique_ptr<RemotingCommand> TcpRemotingClient::invokeSyncImpl(TcpTransportP
 
 void TcpRemotingClient::invokeAsync(const std::string& addr,
                                     RemotingCommand& request,
-                                    InvokeCallback* invokeCallback,
+                                    std::unique_ptr<InvokeCallback>& invokeCallback,
                                     int64_t timeoutMillis) {
   auto beginStartTime = UtilAll::currentTimeMillis();
   auto channel = GetTransport(addr);
@@ -304,33 +304,27 @@ void TcpRemotingClient::invokeAsync(const std::string& addr,
 void TcpRemotingClient::invokeAsyncImpl(TcpTransportPtr channel,
                                         RemotingCommand& request,
                                         int64_t timeoutMillis,
-                                        InvokeCallback* invokeCallback) {
+                                        std::unique_ptr<InvokeCallback>& invokeCallback) {
   int code = request.code();
   int opaque = request.opaque();
 
   // delete in callback
-  auto responseFuture = std::make_shared<ResponseFuture>(code, opaque, timeoutMillis, invokeCallback);
+  auto responseFuture = std::make_shared<ResponseFuture>(code, opaque, timeoutMillis, std::move(invokeCallback));
   putResponseFuture(channel, opaque, responseFuture);
 
-  try {
-    if (SendCommand(channel, request)) {
-      responseFuture->set_send_request_ok(true);
-    } else {
-      // requestFail
-      responseFuture = popResponseFuture(channel, opaque);
-      if (responseFuture != nullptr) {
-        responseFuture->set_send_request_ok(false);
-        if (responseFuture->hasInvokeCallback()) {
-          handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, responseFuture));
-        }
+  if (SendCommand(channel, request)) {
+    responseFuture->set_send_request_ok(true);
+  } else {
+    // request fail
+    responseFuture = popResponseFuture(channel, opaque);
+    if (responseFuture != nullptr) {
+      responseFuture->set_send_request_ok(false);
+      if (responseFuture->hasInvokeCallback()) {
+        handle_executor_.submit(std::bind(&ResponseFuture::executeInvokeCallback, responseFuture));
       }
-
-      LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort());
     }
-  } catch (const std::exception& e) {
-    LOG_WARN_NEW("send a request command to channel <{}> Exception.\n{}", channel->getPeerAddrAndPort(), e.what());
-    THROW_MQEXCEPTION(RemotingSendRequestException, "send request to <" + channel->getPeerAddrAndPort() + "> failed",
-                      -1);
+
+    LOG_WARN_NEW("send a request command to channel <{}> failed.", channel->getPeerAddrAndPort());
   }
 }
 
@@ -553,7 +547,7 @@ bool TcpRemotingClient::CloseNameServerTransport(TcpTransportPtr channel) {
   return removeItemFromTable;
 }
 
-bool TcpRemotingClient::SendCommand(TcpTransportPtr channel, RemotingCommand& msg) {
+bool TcpRemotingClient::SendCommand(TcpTransportPtr channel, RemotingCommand& msg) noexcept {
   auto package = msg.encode();
   return channel->sendMessage(package->array(), package->size());
 }
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index 3ffbb66..7d86060 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -52,7 +52,7 @@ class TcpRemotingClient {
 
   void invokeAsync(const std::string& addr,
                    RemotingCommand& request,
-                   InvokeCallback* invokeCallback,
+                   std::unique_ptr<InvokeCallback>& invokeCallback,
                    int64_t timeoutMillis);
 
   void invokeOneway(const std::string& addr, RemotingCommand& request);
@@ -62,7 +62,7 @@ class TcpRemotingClient {
   std::vector<std::string> getNameServerAddressList() const { return namesrv_addr_list_; }
 
  private:
-  static bool SendCommand(TcpTransportPtr channel, RemotingCommand& msg);
+  static bool SendCommand(TcpTransportPtr channel, RemotingCommand& msg) noexcept;
 
   void channelClosed(TcpTransportPtr channel);
 
@@ -88,7 +88,7 @@ class TcpRemotingClient {
   void invokeAsyncImpl(TcpTransportPtr channel,
                        RemotingCommand& request,
                        int64_t timeoutMillis,
-                       InvokeCallback* invokeCallback);
+                       std::unique_ptr<InvokeCallback>& invokeCallback);
   void invokeOnewayImpl(TcpTransportPtr channel, RemotingCommand& request);
 
   // rpc hook

[rocketmq-client-cpp] 03/04: feat: support ipv6

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 40882dc35e8f6bc62edd2fb03864ef031e72c8b3
Author: James Yin <yw...@hotmail.com>
AuthorDate: Thu Mar 11 15:59:04 2021 +0800

    feat: support ipv6
---
 src/ClientRemotingProcessor.cpp            |   6 +-
 src/MQClientConfigImpl.hpp                 |   3 +-
 src/common/UtilAll.cpp                     |  53 +------
 src/common/UtilAll.h                       |   8 --
 src/consumer/DefaultMQPushConsumerImpl.cpp |   4 +-
 src/message/MessageBatch.cpp               |   3 +-
 src/message/MessageClientIDSetter.cpp      |  40 ++++--
 src/message/MessageClientIDSetter.h        |   2 +-
 src/message/MessageDecoder.cpp             |  42 +++---
 src/message/MessageDecoder.h               |   7 +-
 src/message/MessageExtImpl.cpp             |  26 ++--
 src/message/MessageExtImpl.h               |   4 +-
 src/message/MessageId.h                    |  16 +--
 src/transport/EventLoop.cpp                |   6 +-
 src/transport/SocketUtil.cpp               | 217 ++++++++++++++++++-----------
 src/transport/SocketUtil.h                 |  45 ++++--
 16 files changed, 258 insertions(+), 224 deletions(-)

diff --git a/src/ClientRemotingProcessor.cpp b/src/ClientRemotingProcessor.cpp
index cf97306..32366d6 100644
--- a/src/ClientRemotingProcessor.cpp
+++ b/src/ClientRemotingProcessor.cpp
@@ -16,9 +16,9 @@
  */
 #include "ClientRemotingProcessor.h"
 
-#include "MessageDecoder.h"
 #include "MQProtos.h"
 #include "MessageAccessor.hpp"
+#include "MessageDecoder.h"
 #include "MessageSysFlag.h"
 #include "RequestFutureTable.h"
 #include "SocketUtil.h"
@@ -153,11 +153,11 @@ RemotingCommand* ClientRemotingProcessor::receiveReplyMessage(RemotingCommand* r
     msg->set_store_timestamp(requestHeader->store_timestamp());
 
     if (!requestHeader->born_host().empty()) {
-      msg->set_born_host(string2SocketAddress(requestHeader->born_host()));
+      msg->set_born_host(StringToSockaddr(requestHeader->born_host()));
     }
 
     if (!requestHeader->store_host().empty()) {
-      msg->set_store_host(string2SocketAddress(requestHeader->store_host()));
+      msg->set_store_host(StringToSockaddr(requestHeader->store_host()));
     }
 
     auto body = request->body();
diff --git a/src/MQClientConfigImpl.hpp b/src/MQClientConfigImpl.hpp
index 59b4601..b9ad503 100644
--- a/src/MQClientConfigImpl.hpp
+++ b/src/MQClientConfigImpl.hpp
@@ -22,6 +22,7 @@
 
 #include "MQClientConfig.h"
 #include "NamespaceUtil.h"
+#include "SocketUtil.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
@@ -45,7 +46,7 @@ class MQClientConfigImpl : virtual public MQClientConfig {
 
   std::string buildMQClientId() const override {
     std::string clientId;
-    clientId.append(UtilAll::getLocalAddress());  // clientIP
+    clientId.append(GetLocalAddress());  // clientIP
     clientId.append("@");
     clientId.append(instance_name_);  // instanceName
     if (!unit_name_.empty()) {
diff --git a/src/common/UtilAll.cpp b/src/common/UtilAll.cpp
index 2cbe00a..731d8aa 100644
--- a/src/common/UtilAll.cpp
+++ b/src/common/UtilAll.cpp
@@ -40,9 +40,6 @@
 
 namespace rocketmq {
 
-std::string UtilAll::sLocalHostName;
-std::string UtilAll::sLocalIpAddress;
-
 bool UtilAll::try_lock_for(std::timed_mutex& mutex, long timeout) {
   auto now = std::chrono::steady_clock::now();
   auto deadline = now + std::chrono::milliseconds(timeout);
@@ -157,7 +154,7 @@ bool UtilAll::isBlank(const std::string& str) {
 }
 
 bool UtilAll::SplitURL(const std::string& serverURL, std::string& addr, short& nPort) {
-  auto pos = serverURL.find(':');
+  auto pos = serverURL.find_last_of(':');
   if (pos == std::string::npos) {
     return false;
   }
@@ -228,54 +225,6 @@ int UtilAll::Split(std::vector<std::string>& ret_, const std::string& strIn, con
   return ret_.size();
 }
 
-std::string UtilAll::getLocalHostName() {
-  if (sLocalHostName.empty()) {
-    char name[1024];
-    if (::gethostname(name, sizeof(name)) != 0) {
-      return null;
-    }
-    sLocalHostName.append(name, strlen(name));
-  }
-  return sLocalHostName;
-}
-
-std::string UtilAll::getLocalAddress() {
-  if (sLocalIpAddress.empty()) {
-    auto hostname = getLocalHostName();
-    if (!hostname.empty()) {
-      try {
-        sLocalIpAddress = socketAddress2String(lookupNameServers(hostname));
-      } catch (std::exception& e) {
-        LOG_WARN(e.what());
-        sLocalIpAddress = "127.0.0.1";
-      }
-    }
-  }
-  return sLocalIpAddress;
-}
-
-uint32_t UtilAll::getIP() {
-  std::string ip = UtilAll::getLocalAddress();
-  if (ip.empty()) {
-    return 0;
-  }
-
-  char* ip_str = new char[ip.length() + 1];
-  std::strncpy(ip_str, ip.c_str(), ip.length());
-  ip_str[ip.length()] = '\0';
-
-  int i = 3;
-  uint32_t nResult = 0;
-  for (char* token = std::strtok(ip_str, "."); token != nullptr && i >= 0; token = std::strtok(nullptr, ".")) {
-    uint32_t n = std::atoi(token);
-    nResult |= n << (8 * i--);
-  }
-
-  delete[] ip_str;
-
-  return nResult;
-}
-
 std::string UtilAll::getHomeDirectory() {
 #ifndef WIN32
   char* home_env = std::getenv("HOME");
diff --git a/src/common/UtilAll.h b/src/common/UtilAll.h
index 8ad49b0..63989f7 100644
--- a/src/common/UtilAll.h
+++ b/src/common/UtilAll.h
@@ -112,10 +112,6 @@ class UtilAll {
   static int Split(std::vector<std::string>& ret_, const std::string& strIn, const char sep);
   static int Split(std::vector<std::string>& ret_, const std::string& strIn, const std::string& sep);
 
-  static std::string getLocalHostName();
-  static std::string getLocalAddress();
-  static uint32_t getIP();
-
   static std::string getHomeDirectory();
   static void createDirectory(std::string const& dir);
   static bool existDirectory(std::string const& dir);
@@ -138,10 +134,6 @@ class UtilAll {
   // Returns true on success.
   // Returns false on failure..
   static bool ReplaceFile(const std::string& from_path, const std::string& to_path);
-
- private:
-  static std::string sLocalHostName;
-  static std::string sLocalIpAddress;
 };
 
 template <typename T>
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 11a12a1..bf00cc2 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -536,8 +536,8 @@ bool DefaultMQPushConsumerImpl::sendMessageBack(MessageExtPtr msg, int delay_lev
   try {
     msg->set_topic(NamespaceUtil::wrapNamespace(client_config_->name_space(), msg->topic()));
 
-    std::string brokerAddr = brokerName.empty() ? socketAddress2String(msg->store_host())
-                                                : client_instance_->findBrokerAddressInPublish(brokerName);
+    std::string brokerAddr =
+        brokerName.empty() ? msg->store_host_string() : client_instance_->findBrokerAddressInPublish(brokerName);
 
     client_instance_->getMQClientAPIImpl()->consumerSendMessageBack(
         brokerAddr, msg, getDefaultMQPushConsumerConfig()->group_name(), delay_level, 5000,
diff --git a/src/message/MessageBatch.cpp b/src/message/MessageBatch.cpp
index 4030fd6..2ba0c18 100644
--- a/src/message/MessageBatch.cpp
+++ b/src/message/MessageBatch.cpp
@@ -16,8 +16,9 @@
  */
 #include "MessageBatch.h"
 
-#include "MessageDecoder.h"
+#include "MQException.h"
 #include "MessageClientIDSetter.h"
+#include "MessageDecoder.h"
 
 namespace rocketmq {
 
diff --git a/src/message/MessageClientIDSetter.cpp b/src/message/MessageClientIDSetter.cpp
index dda45df..5560ba1 100644
--- a/src/message/MessageClientIDSetter.cpp
+++ b/src/message/MessageClientIDSetter.cpp
@@ -25,7 +25,8 @@
 #include <unistd.h>
 #endif
 
-#include "ByteOrder.h"
+#include "ByteBuffer.hpp"
+#include "SocketUtil.h"
 #include "UtilAll.h"
 
 namespace rocketmq {
@@ -33,16 +34,28 @@ namespace rocketmq {
 MessageClientIDSetter::MessageClientIDSetter() {
   std::srand((uint32_t)std::time(NULL));
 
-  uint32_t pid = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(UtilAll::getProcessId()));
-  uint32_t ip = ByteOrderUtil::NorminalBigEndian(UtilAll::getIP());
-  uint32_t random_num = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(std::rand()));
-
-  char bin_buf[10];
-  std::memcpy(bin_buf + 2, &pid, 4);
-  std::memcpy(bin_buf, &ip, 4);
-  std::memcpy(bin_buf + 6, &random_num, 4);
+  std::unique_ptr<ByteBuffer> buffer;
+  sockaddr* addr = GetSelfIP();
+  if (addr != nullptr) {
+    buffer.reset(ByteBuffer::allocate(SockaddrSize(addr) + 2 + 4));
+    if (addr->sa_family == AF_INET) {
+      auto* sin = (struct sockaddr_in*)addr;
+      buffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize));
+    } else if (addr->sa_family == AF_INET6) {
+      auto* sin6 = (struct sockaddr_in6*)addr;
+      buffer->put(ByteArray(reinterpret_cast<char*>(&sin6->sin6_addr), kIPv6AddrSize));
+    } else {
+      (void)buffer.release();
+    }
+  }
+  if (buffer == nullptr) {
+    buffer.reset(ByteBuffer::allocate(4 + 2 + 4));
+    buffer->putInt(UtilAll::currentTimeMillis());
+  }
+  buffer->putShort(UtilAll::getProcessId());
+  buffer->putInt(std::rand());
 
-  fix_string_ = UtilAll::bytes2string(bin_buf, 10);
+  fixed_string_ = UtilAll::bytes2string(buffer->array(), buffer->position());
 
   setStartTime(UtilAll::currentTimeMillis());
 
@@ -93,11 +106,8 @@ std::string MessageClientIDSetter::createUniqueID() {
   uint32_t period = ByteOrderUtil::NorminalBigEndian(static_cast<uint32_t>(current - start_time_));
   uint16_t seqid = ByteOrderUtil::NorminalBigEndian(counter_++);
 
-  char bin_buf[6];
-  std::memcpy(bin_buf, &period, 4);
-  std::memcpy(bin_buf + 4, &seqid, 2);
-
-  return fix_string_ + UtilAll::bytes2string(bin_buf, 6);
+  return fixed_string_ + UtilAll::bytes2string(reinterpret_cast<char*>(&period), sizeof(period)) +
+         UtilAll::bytes2string(reinterpret_cast<char*>(&seqid), sizeof(seqid));
 }
 
 }  // namespace rocketmq
diff --git a/src/message/MessageClientIDSetter.h b/src/message/MessageClientIDSetter.h
index 222115e..e688786 100644
--- a/src/message/MessageClientIDSetter.h
+++ b/src/message/MessageClientIDSetter.h
@@ -68,7 +68,7 @@ class MessageClientIDSetter {
   uint64_t next_start_time_;
   std::atomic<uint16_t> counter_;
 
-  std::string fix_string_;
+  std::string fixed_string_;
 };
 
 }  // namespace rocketmq
diff --git a/src/message/MessageDecoder.cpp b/src/message/MessageDecoder.cpp
index a36d3bf..08df407 100644
--- a/src/message/MessageDecoder.cpp
+++ b/src/message/MessageDecoder.cpp
@@ -20,14 +20,18 @@
 #include <sstream>    // std::stringstream
 
 #ifndef WIN32
-#include <netinet/in.h>  // struct sockaddr, sockaddr_in, sockaddr_in6
+#include <arpa/inet.h>   // htons
+#include <netinet/in.h>  // sockaddr_in, sockaddr_in6
+#else
+#include "Winsock2.h"
 #endif
 
 #include "ByteOrder.h"
 #include "Logging.h"
-#include "MessageExtImpl.h"
 #include "MessageAccessor.hpp"
+#include "MessageExtImpl.h"
 #include "MessageSysFlag.h"
+#include "SocketUtil.h"
 #include "UtilAll.h"
 
 static const char NAME_VALUE_SEPARATOR = 1;
@@ -36,37 +40,35 @@ static const char PROPERTY_SEPARATOR = 2;
 namespace rocketmq {
 
 std::string MessageDecoder::createMessageId(const struct sockaddr* sa, int64_t offset) {
-  int msgIDLength = sa->sa_family == AF_INET ? 16 : 28;
-  std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(msgIDLength));
+  int msgIdLength = IpaddrSize(sa) + /* port field size */ 4 + sizeof(offset);
+  std::unique_ptr<ByteBuffer> byteBuffer(ByteBuffer::allocate(msgIdLength));
   if (sa->sa_family == AF_INET) {
     struct sockaddr_in* sin = (struct sockaddr_in*)sa;
-    byteBuffer->put(ByteArray((char*)&sin->sin_addr, 4));
-    byteBuffer->putInt(ByteOrderUtil::NorminalBigEndian(sin->sin_port));
+    byteBuffer->put(ByteArray(reinterpret_cast<char*>(&sin->sin_addr), kIPv4AddrSize));
+    byteBuffer->putInt(ntohs(sin->sin_port));
   } else {
     struct sockaddr_in6* sin6 = (struct sockaddr_in6*)sa;
-    byteBuffer->put(ByteArray((char*)&sin6->sin6_addr, 16));
-    byteBuffer->putInt(ByteOrderUtil::NorminalBigEndian(sin6->sin6_port));
+    byteBuffer->put(ByteArray(reinterpret_cast<char*>(&sin6->sin6_addr), kIPv6AddrSize));
+    byteBuffer->putInt(ntohs(sin6->sin6_port));
   }
   byteBuffer->putLong(offset);
   byteBuffer->flip();
-  return UtilAll::bytes2string(byteBuffer->array(), msgIDLength);
+  return UtilAll::bytes2string(byteBuffer->array(), msgIdLength);
 }
 
 MessageId MessageDecoder::decodeMessageId(const std::string& msgId) {
-  size_t ip_length = msgId.length() == 32 ? 4 * 2 : 16 * 2;
+  size_t ip_length = msgId.length() == 32 ? kIPv4AddrSize * 2 : kIPv6AddrSize * 2;
 
   ByteArray byteArray(ip_length / 2);
   std::string ip = msgId.substr(0, ip_length);
   UtilAll::string2bytes(byteArray.array(), ip);
 
   std::string port = msgId.substr(ip_length, 8);
-  // uint32_t portInt = ByteOrderUtil::NorminalBigEndian<uint32_t>(std::stoul(port, nullptr, 16));
   uint32_t portInt = std::stoul(port, nullptr, 16);
 
-  auto* sin = ipPort2SocketAddress(byteArray, portInt);
+  auto* sin = IPPortToSockaddr(byteArray, portInt);
 
   std::string offset = msgId.substr(ip_length + 8);
-  // uint64_t offsetInt = ByteOrderUtil::NorminalBigEndian<uint64_t>(std::stoull(offset, nullptr, 16));
   uint64_t offsetInt = std::stoull(offset, nullptr, 16);
 
   return MessageId(sin, offsetInt);
@@ -123,22 +125,22 @@ MessageExtPtr MessageDecoder::decode(ByteBuffer& byteBuffer, bool readBody, bool
   msgExt->set_born_timestamp(bornTimeStamp);
 
   // 10 BORNHOST
-  int bornhostIPLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? 4 : 16;
-  ByteArray bornHost(bornhostIPLength);
-  byteBuffer.get(bornHost, 0, bornhostIPLength);
+  int bornHostLength = (sysFlag & MessageSysFlag::BORNHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
+  ByteArray bornHost(bornHostLength);
+  byteBuffer.get(bornHost, 0, bornHostLength);
   int32_t bornPort = byteBuffer.getInt();
-  msgExt->set_born_host(ipPort2SocketAddress(bornHost, bornPort));
+  msgExt->set_born_host(IPPortToSockaddr(bornHost, bornPort));
 
   // 11 STORETIMESTAMP
   int64_t storeTimestamp = byteBuffer.getLong();
   msgExt->set_store_timestamp(storeTimestamp);
 
   // 12 STOREHOST
-  int storehostIPLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? 4 : 16;
-  ByteArray storeHost(bornhostIPLength);
+  int storehostIPLength = (sysFlag & MessageSysFlag::STOREHOST_V6_FLAG) == 0 ? kIPv4AddrSize : kIPv6AddrSize;
+  ByteArray storeHost(bornHostLength);
   byteBuffer.get(storeHost, 0, storehostIPLength);
   int32_t storePort = byteBuffer.getInt();
-  msgExt->set_store_host(ipPort2SocketAddress(storeHost, storePort));
+  msgExt->set_store_host(IPPortToSockaddr(storeHost, storePort));
 
   // 13 RECONSUMETIMES
   int32_t reconsumeTimes = byteBuffer.getInt();
diff --git a/src/message/MessageDecoder.h b/src/message/MessageDecoder.h
index ad0b82d..5c45d8e 100644
--- a/src/message/MessageDecoder.h
+++ b/src/message/MessageDecoder.h
@@ -17,8 +17,13 @@
 #ifndef ROCKETMQ_MESSAGE_MESSAGEDECODER_H_
 #define ROCKETMQ_MESSAGE_MESSAGEDECODER_H_
 
+#ifndef WIN32
+#include <sys/socket.h>  // sockaddr
+#else
+#include <Winsock2.h>
+#endif
+
 #include "ByteBuffer.hpp"
-#include "MQException.h"
 #include "MQMessageExt.h"
 #include "MessageId.h"
 
diff --git a/src/message/MessageExtImpl.cpp b/src/message/MessageExtImpl.cpp
index f43ac35..cfd6b99 100644
--- a/src/message/MessageExtImpl.cpp
+++ b/src/message/MessageExtImpl.cpp
@@ -44,20 +44,14 @@ MessageExtImpl::MessageExtImpl(int queueId,
       commit_log_offset_(0),
       sys_flag_(0),
       born_timestamp_(bornTimestamp),
-      born_host_(nullptr),
+      born_host_(SockaddrToStorage(bornHost)),
       store_timestamp_(storeTimestamp),
-      store_host_(nullptr),
+      store_host_(SockaddrToStorage(storeHost)),
       reconsume_times_(3),
       prepared_transaction_offset_(0),
-      msg_id_(msgId) {
-  born_host_ = copySocketAddress(born_host_, bornHost);
-  store_host_ = copySocketAddress(store_host_, storeHost);
-}
+      msg_id_(msgId) {}
 
-MessageExtImpl::~MessageExtImpl() {
-  free(born_host_);
-  free(store_host_);
-}
+MessageExtImpl::~MessageExtImpl() = default;
 
 TopicFilterType MessageExtImpl::parseTopicFilterType(int32_t sysFlag) {
   if ((sysFlag & MessageSysFlag::MULTI_TAGS_FLAG) == MessageSysFlag::MULTI_TAGS_FLAG) {
@@ -123,15 +117,15 @@ void MessageExtImpl::set_born_timestamp(int64_t bornTimestamp) {
 }
 
 const struct sockaddr* MessageExtImpl::born_host() const {
-  return born_host_;
+  return reinterpret_cast<sockaddr*>(born_host_.get());
 }
 
 std::string MessageExtImpl::born_host_string() const {
-  return socketAddress2String(born_host_);
+  return SockaddrToString(born_host());
 }
 
 void MessageExtImpl::set_born_host(const struct sockaddr* bornHost) {
-  born_host_ = copySocketAddress(born_host_, bornHost);
+  born_host_ = SockaddrToStorage(bornHost);
 }
 
 int64_t MessageExtImpl::store_timestamp() const {
@@ -143,15 +137,15 @@ void MessageExtImpl::set_store_timestamp(int64_t storeTimestamp) {
 }
 
 const struct sockaddr* MessageExtImpl::store_host() const {
-  return store_host_;
+  return reinterpret_cast<sockaddr*>(store_host_.get());
 }
 
 std::string MessageExtImpl::store_host_string() const {
-  return socketAddress2String(store_host_);
+  return SockaddrToString(store_host());
 }
 
 void MessageExtImpl::set_store_host(const struct sockaddr* storeHost) {
-  store_host_ = copySocketAddress(store_host_, storeHost);
+  store_host_ = SockaddrToStorage(storeHost);
 }
 
 const std::string& MessageExtImpl::msg_id() const {
diff --git a/src/message/MessageExtImpl.h b/src/message/MessageExtImpl.h
index 5d85cea..fc44471 100644
--- a/src/message/MessageExtImpl.h
+++ b/src/message/MessageExtImpl.h
@@ -94,9 +94,9 @@ class MessageExtImpl : public MessageImpl,        // base
   int64_t commit_log_offset_;
   int32_t sys_flag_;
   int64_t born_timestamp_;
-  struct sockaddr* born_host_;
+  std::unique_ptr<sockaddr_storage> born_host_;
   int64_t store_timestamp_;
-  struct sockaddr* store_host_;
+  std::unique_ptr<sockaddr_storage> store_host_;
   int32_t reconsume_times_;
   int64_t prepared_transaction_offset_;
   std::string msg_id_;
diff --git a/src/message/MessageId.h b/src/message/MessageId.h
index 64e4603..e9a501b 100644
--- a/src/message/MessageId.h
+++ b/src/message/MessageId.h
@@ -27,29 +27,29 @@ namespace rocketmq {
 class MessageId {
  public:
   MessageId() : MessageId(nullptr, 0) {}
-  MessageId(struct sockaddr* address, int64_t offset) : address_(nullptr), offset_(offset) { setAddress(address); }
+  MessageId(const struct sockaddr* address, int64_t offset) : address_(SockaddrToStorage(address)), offset_(offset) {}
 
-  MessageId(const MessageId& other) : MessageId(other.address_, other.offset_) {}
-  MessageId(MessageId&& other) : address_(other.address_), offset_(other.offset_) { other.address_ = nullptr; }
+  MessageId(const MessageId& other) : MessageId(other.getAddress(), other.offset_) {}
+  MessageId(MessageId&& other) : address_(std::move(other.address_)), offset_(other.offset_) {}
 
-  virtual ~MessageId() { std::free(address_); }
+  virtual ~MessageId() = default;
 
   MessageId& operator=(const MessageId& other) {
     if (&other != this) {
-      setAddress(other.address_);
+      setAddress(other.getAddress());
       this->offset_ = other.offset_;
     }
     return *this;
   }
 
-  const struct sockaddr* getAddress() const { return address_; }
-  void setAddress(struct sockaddr* address) { address_ = copySocketAddress(address_, address); }
+  const struct sockaddr* getAddress() const { return reinterpret_cast<sockaddr*>(address_.get()); }
+  void setAddress(const struct sockaddr* address) { address_ = SockaddrToStorage(address); }
 
   int64_t getOffset() const { return offset_; }
   void setOffset(int64_t offset) { offset_ = offset; }
 
  private:
-  struct sockaddr* address_;
+  std::unique_ptr<sockaddr_storage> address_;
   int64_t offset_;
 };
 
diff --git a/src/transport/EventLoop.cpp b/src/transport/EventLoop.cpp
index 85ea113..5dce2d1 100644
--- a/src/transport/EventLoop.cpp
+++ b/src/transport/EventLoop.cpp
@@ -207,9 +207,9 @@ int BufferEvent::connect(const std::string& addr) {
   }
 
   try {
-    auto* sa = string2SocketAddress(addr);  // resolve domain
-    peer_addr_port_ = socketAddress2String(sa);
-    return bufferevent_socket_connect(buffer_event_, sa, sockaddr_size(sa));
+    auto* sa = StringToSockaddr(addr);  // resolve domain
+    peer_addr_port_ = SockaddrToString(sa);
+    return bufferevent_socket_connect(buffer_event_, sa, SockaddrSize(sa));
   } catch (const std::exception& e) {
     LOG_ERROR_NEW("can not connect to {}, {}", addr, e.what());
     return -1;
diff --git a/src/transport/SocketUtil.cpp b/src/transport/SocketUtil.cpp
index 0470088..f207263 100644
--- a/src/transport/SocketUtil.cpp
+++ b/src/transport/SocketUtil.cpp
@@ -16,73 +16,115 @@
  */
 #include "SocketUtil.h"
 
-#include <cstdlib>  // std::realloc
-#include <cstring>  // std::memset, std::memcpy
+#include <cstdlib>  // std::abort
+#include <cstring>  // std::memcpy, std::memset
 
-#include <sstream>
-#include <stdexcept>
-
-#include <event2/event.h>
+#include <iostream>
+#include <stdexcept>  // std::invalid_argument, std::runtime_error
+#include <string>
 
 #ifndef WIN32
-#include <netdb.h>
-#include <unistd.h>
+#include <arpa/inet.h>  // htons
+#include <unistd.h>     // gethostname
+#else
+#include <Winsock2.h>
 #endif
 
-#include "ByteOrder.h"
+#include <event2/event.h>
+
 #include "MQException.h"
-#include "UtilAll.h"
 
 namespace rocketmq {
 
-union sockaddr_union {
-  struct sockaddr_in sin;
-  struct sockaddr_in6 sin6;
-};
+std::unique_ptr<sockaddr_storage> SockaddrToStorage(const sockaddr* src) {
+  if (src == nullptr) {
+    return nullptr;
+  }
+  std::unique_ptr<sockaddr_storage> ss(new sockaddr_storage);
+  std::memcpy(ss.get(), src, SockaddrSize(src));
+  return ss;
+}
 
-thread_local static sockaddr_union sin_buf;
+thread_local sockaddr_storage ss_buffer;
 
-struct sockaddr* ipPort2SocketAddress(const ByteArray& ip, uint16_t port) {
-  if (ip.size() == 4) {
-    struct sockaddr_in* sin = &sin_buf.sin;
+sockaddr* IPPortToSockaddr(const ByteArray& ip, uint16_t port) {
+  sockaddr_storage* ss = &ss_buffer;
+  if (ip.size() == kIPv4AddrSize) {
+    auto* sin = reinterpret_cast<sockaddr_in*>(ss);
     sin->sin_family = AF_INET;
-    sin->sin_port = ByteOrderUtil::NorminalBigEndian<uint16_t>(port);
-    ByteOrderUtil::Read<decltype(sin->sin_addr)>(&sin->sin_addr, ip.array());
-    return (struct sockaddr*)sin;
-  } else if (ip.size() == 16) {
-    struct sockaddr_in6* sin6 = &sin_buf.sin6;
+    sin->sin_port = htons(port);
+    std::memcpy(&sin->sin_addr, ip.array(), kIPv4AddrSize);
+  } else if (ip.size() == kIPv6AddrSize) {
+    auto* sin6 = reinterpret_cast<sockaddr_in6*>(&ss);
     sin6->sin6_family = AF_INET6;
-    sin6->sin6_port = ByteOrderUtil::NorminalBigEndian<uint16_t>(port);
-    ByteOrderUtil::Read<decltype(sin6->sin6_addr)>(&sin6->sin6_addr, ip.array());
-    return (struct sockaddr*)sin6;
+    sin6->sin6_port = htons(port);
+    std::memcpy(&sin6->sin6_addr, ip.array(), kIPv6AddrSize);
+  } else {
+    throw std::invalid_argument("invalid ip size");
   }
-  return nullptr;
+  return reinterpret_cast<sockaddr*>(ss);
 }
 
-struct sockaddr* string2SocketAddress(const std::string& addr) {
+sockaddr* StringToSockaddr(const std::string& addr) {
+  if (addr.empty()) {
+    throw std::invalid_argument("invalid address");
+  }
+
   std::string::size_type start_pos = addr[0] == '/' ? 1 : 0;
-  auto colon_pos = addr.find_last_of(":");
-  std::string host = addr.substr(start_pos, colon_pos - start_pos);
-  std::string port = addr.substr(colon_pos + 1, addr.length() - colon_pos);
-  auto* sa = lookupNameServers(host);
-  if (sa != nullptr) {
-    if (sa->sa_family == AF_INET) {
-      auto* sin = (struct sockaddr_in*)sa;
-      sin->sin_port = htons((uint16_t)std::stoi(port));
-    } else {
-      auto* sin6 = (struct sockaddr_in6*)sa;
-      sin6->sin6_port = htons((uint16_t)std::stoi(port));
+  auto colon_pos = addr.find_last_of(':');
+  auto bracket_pos = addr.find_last_of(']');
+  if (bracket_pos != std::string::npos) {
+    // ipv6 address
+    if (addr.at(start_pos) != '[') {
+      throw std::invalid_argument("invalid address");
+    }
+    if (colon_pos == std::string::npos) {
+      throw std::invalid_argument("invalid address");
     }
+    if (colon_pos < bracket_pos) {
+      // have not port
+      if (bracket_pos != addr.size() - 1) {
+        throw std::invalid_argument("invalid address");
+      }
+      colon_pos = addr.size();
+    } else if (colon_pos != bracket_pos + 1) {
+      throw std::invalid_argument("invalid address");
+    }
+  } else if (colon_pos == std::string::npos) {
+    // have not port
+    colon_pos = addr.size();
   }
+
+  decltype(bracket_pos) fix_bracket = bracket_pos == std::string::npos ? 0 : 1;
+  std::string host = addr.substr(start_pos + fix_bracket, colon_pos - start_pos - fix_bracket * 2);
+  auto* sa = LookupNameServers(host);
+
+  std::string port = colon_pos >= addr.size() ? "0" : addr.substr(colon_pos + 1);
+  uint32_t n = std::stoul(port);
+  if (n > std::numeric_limits<uint16_t>::max()) {
+    throw std::out_of_range("port is to large");
+  }
+  uint16_t port_num = htons(static_cast<uint16_t>(n));
+
+  if (sa->sa_family == AF_INET) {
+    auto* sin = reinterpret_cast<sockaddr_in*>(sa);
+    sin->sin_port = port_num;
+  } else if (sa->sa_family == AF_INET6) {
+    auto* sin6 = reinterpret_cast<sockaddr_in6*>(sa);
+    sin6->sin6_port = port_num;
+  } else {
+    throw std::runtime_error("don't support non-inet address families");
+  }
+
   return sa;
 }
 
 /**
- * converts an address from network format to presentation format (a.b.c.d)
+ * converts an address from network format to presentation format
  */
-std::string socketAddress2String(const struct sockaddr* addr) {
+std::string SockaddrToString(const sockaddr* addr) {
   if (nullptr == addr) {
-    return "127.0.0.1";
+    return std::string();
   }
 
   char buf[128];
@@ -90,38 +132,40 @@ std::string socketAddress2String(const struct sockaddr* addr) {
   uint16_t port = 0;
 
   if (addr->sa_family == AF_INET) {
-    auto* sin = (struct sockaddr_in*)addr;
-    if (nullptr != evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf))) {
-      address = buf;
+    const auto* sin = reinterpret_cast<const sockaddr_in*>(addr);
+    if (nullptr == evutil_inet_ntop(AF_INET, &sin->sin_addr, buf, sizeof(buf))) {
+      throw std::runtime_error("can not convert AF_INET address to text form");
     }
+    address = buf;
     port = ntohs(sin->sin_port);
   } else if (addr->sa_family == AF_INET6) {
-    auto* sin6 = (struct sockaddr_in6*)addr;
-    if (nullptr != evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf))) {
-      address = buf;
+    const auto* sin6 = reinterpret_cast<const sockaddr_in6*>(addr);
+    if (nullptr == evutil_inet_ntop(AF_INET6, &sin6->sin6_addr, buf, sizeof(buf))) {
+      throw std::runtime_error("can not convert AF_INET6 address to text form");
     }
+    address = buf;
     port = ntohs(sin6->sin6_port);
   } else {
-    throw std::runtime_error("don't support non-inet Address families.");
+    throw std::runtime_error("don't support non-inet address families");
   }
 
-  if (!address.empty() && port != 0) {
-    if (addr->sa_family == AF_INET6) {
-      address = "[" + address + "]";
-    }
-    address += ":" + UtilAll::to_string(port);
+  if (addr->sa_family == AF_INET6) {
+    address = "[" + address + "]";
+  }
+  if (port != 0) {
+    address += ":" + std::to_string(port);
   }
 
   return address;
 }
 
-struct sockaddr* lookupNameServers(const std::string& hostname) {
+sockaddr* LookupNameServers(const std::string& hostname) {
   if (hostname.empty()) {
-    return nullptr;
+    throw std::invalid_argument("invalid hostname");
   }
 
-  struct evutil_addrinfo hints;
-  struct evutil_addrinfo* answer = NULL;
+  evutil_addrinfo hints;
+  evutil_addrinfo* answer = nullptr;
 
   /* Build the hints to tell getaddrinfo how to act. */
   std::memset(&hints, 0, sizeof(hints));
@@ -131,48 +175,63 @@ struct sockaddr* lookupNameServers(const std::string& hostname) {
   hints.ai_flags = EVUTIL_AI_ADDRCONFIG; /* Only return addresses we can use. */
 
   // Look up the hostname.
-  int err = evutil_getaddrinfo(hostname.c_str(), NULL, &hints, &answer);
+  int err = evutil_getaddrinfo(hostname.c_str(), nullptr, &hints, &answer);
   if (err != 0) {
-    std::string info = "Failed to resolve host name(" + hostname + "): " + evutil_gai_strerror(err);
+    std::string info = "Failed to resolve hostname(" + hostname + "): " + evutil_gai_strerror(err);
     THROW_MQEXCEPTION(UnknownHostException, info, -1);
   }
 
-  struct sockaddr* sin = nullptr;
+  sockaddr_storage* ss = &ss_buffer;
 
-  for (struct evutil_addrinfo* ai = answer; ai != NULL; ai = ai->ai_next) {
+  bool hit = false;
+  for (struct evutil_addrinfo* ai = answer; ai != nullptr; ai = ai->ai_next) {
     auto* ai_addr = ai->ai_addr;
     if (ai_addr->sa_family != AF_INET && ai_addr->sa_family != AF_INET6) {
       continue;
     }
-    sin = (struct sockaddr*)&sin_buf;
-    std::memcpy(sin, ai_addr, sockaddr_size(ai_addr));
+    std::memcpy(ss, ai_addr, SockaddrSize(ai_addr));
+    hit = true;
     break;
   }
 
   evutil_freeaddrinfo(answer);
 
-  return sin;
+  if (!hit) {
+    throw std::runtime_error("hostname is non-inet address family");
+  }
+
+  return reinterpret_cast<sockaddr*>(ss);
 }
 
-struct sockaddr* copySocketAddress(struct sockaddr* dst, const struct sockaddr* src) {
-  if (src != nullptr) {
-    if (dst == nullptr || dst->sa_family != src->sa_family) {
-      dst = (struct sockaddr*)std::realloc(dst, sizeof(union sockaddr_union));
-    }
-    std::memcpy(dst, src, sockaddr_size(src));
-  } else {
-    free(dst);
-    dst = nullptr;
+sockaddr* GetSelfIP() {
+  try {
+    return LookupNameServers(GetLocalHostname());
+  } catch (const UnknownHostException& e) {
+    return LookupNameServers("localhost");
   }
-  return dst;
 }
 
-uint64_t h2nll(uint64_t v) {
-  return ByteOrderUtil::NorminalBigEndian(v);
+const std::string& GetLocalHostname() {
+  static std::string local_hostname = []() {
+    char name[1024];
+    if (::gethostname(name, sizeof(name)) != 0) {
+      return std::string();
+    }
+    return std::string(name);
+  }();
+  return local_hostname;
 }
 
-uint64_t n2hll(uint64_t v) {
-  return ByteOrderUtil::NorminalBigEndian(v);
+const std::string& GetLocalAddress() {
+  static std::string local_address = []() {
+    try {
+      return SockaddrToString(GetSelfIP());
+    } catch (std::exception& e) {
+      std::cerr << e.what() << std::endl;
+      std::abort();
+    }
+  }();
+  return local_address;
 }
 
 }  // namespace rocketmq
diff --git a/src/transport/SocketUtil.h b/src/transport/SocketUtil.h
index be18a38..6bfe535 100644
--- a/src/transport/SocketUtil.h
+++ b/src/transport/SocketUtil.h
@@ -17,13 +17,14 @@
 #ifndef ROCKETMQ_TRANSPORT_SOCKETUTIL_H_
 #define ROCKETMQ_TRANSPORT_SOCKETUTIL_H_
 
-#include <cstdint>
+#include <cstddef>  // size_t
+#include <cstdint>  // uint16_t
 
 #include <string>
 
 #ifndef WIN32
-#include <arpa/inet.h>
-#include <sys/socket.h>
+#include <netinet/in.h>  // sockaddr_in, AF_INET, sockaddr_in6, AF_INET6
+#include <sys/socket.h>  // sockaddr, sockaddr_storage
 #else
 #include <Winsock2.h>
 #pragma comment(lib, "ws2_32.lib")
@@ -33,21 +34,41 @@
 
 namespace rocketmq {
 
-static inline size_t sockaddr_size(const struct sockaddr* sa) {
-  return sa->sa_family == AF_INET ? sizeof(struct sockaddr_in) : sizeof(struct sockaddr_in6);
+const size_t kIPv4AddrSize = 4;
+const size_t kIPv6AddrSize = 16;
+
+static inline size_t IpaddrSize(const sockaddr* sa) {
+  assert(sa != nullptr);
+  assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6);
+  return sa->sa_family == AF_INET6 ? kIPv6AddrSize : kIPv4AddrSize;
+}
+
+static inline size_t IpaddrSize(const sockaddr_storage* ss) {
+  return IpaddrSize(reinterpret_cast<const sockaddr*>(ss));
+}
+
+static inline size_t SockaddrSize(const sockaddr* sa) {
+  assert(sa != nullptr);
+  assert(sa->sa_family == AF_INET || sa->sa_family == AF_INET6);
+  return sa->sa_family == AF_INET6 ? sizeof(sockaddr_in6) : sizeof(sockaddr_in);
+}
+
+static inline size_t SockaddrSize(const sockaddr_storage* ss) {
+  return SockaddrSize(reinterpret_cast<const sockaddr*>(ss));
 }
 
-struct sockaddr* ipPort2SocketAddress(const ByteArray& ip, uint16_t port);
+std::unique_ptr<sockaddr_storage> SockaddrToStorage(const sockaddr* src);
 
-struct sockaddr* string2SocketAddress(const std::string& addr);
-std::string socketAddress2String(const struct sockaddr* addr);
+sockaddr* IPPortToSockaddr(const ByteArray& ip, uint16_t port);
 
-struct sockaddr* lookupNameServers(const std::string& hostname);
+sockaddr* StringToSockaddr(const std::string& addr);
+std::string SockaddrToString(const sockaddr* addr);
 
-struct sockaddr* copySocketAddress(struct sockaddr* dst, const struct sockaddr* src);
+sockaddr* LookupNameServers(const std::string& hostname);
 
-uint64_t h2nll(uint64_t v);
-uint64_t n2hll(uint64_t v);
+sockaddr* GetSelfIP();
+const std::string& GetLocalHostname();
+const std::string& GetLocalAddress();
 
 }  // namespace rocketmq
 

[rocketmq-client-cpp] 04/04: fix: test

Posted by if...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch re_dev
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git

commit 44b846ed14ab6833c5625449ea54685c6a32acce
Author: James Yin <yw...@hotmail.com>
AuthorDate: Wed Mar 24 16:47:24 2021 +0800

    fix: test
---
 test/CMakeLists.txt                                |  7 +++----
 test/src/message/MQMessageExtTest.cpp              | 14 ++++++--------
 test/src/message/MessageDecoderTest.cpp            | 22 ++++++++++++----------
 test/src/message/MessageIdTest.cpp                 | 13 +++++++------
 test/src/protocol/ConsumerRunningInfoTest.cpp      |  4 ++--
 test/src/protocol/ProcessQueueInfoTest.cpp         |  3 ++-
 test/src/transport/ClientRemotingProcessorTest.cpp | 11 ++++++-----
 test/src/transport/ResponseFutureTest.cpp          |  7 ++++---
 test/src/transport/SocketUtilTest.cpp              | 10 +++++-----
 9 files changed, 47 insertions(+), 44 deletions(-)

diff --git a/test/CMakeLists.txt b/test/CMakeLists.txt
index 41cff82..d6e56eb 100755
--- a/test/CMakeLists.txt
+++ b/test/CMakeLists.txt
@@ -22,9 +22,8 @@ set(EXECUTABLE_OUTPUT_PATH ${PROJECT_SOURCE_DIR}/bin)
 set(CMAKE_PREFIX_PATH "${CMAKE_PREFIX_PATH};${CMAKE_SOURCE_DIR}/bin/lib64/cmake;${CMAKE_SOURCE_DIR}/bin/lib/cmake")
 
 # Find dependencies
-if (CMAKE_VERSION VERSION_LESS "3.0")
-    find_package(GTest 1.10.0 REQUIRED CONFIG)
-else ()
+#find_package(GTest REQUIRED CONFIG)
+if (NOT GTest_FOUND)
     include_directories("${CMAKE_SOURCE_DIR}/bin/include")
 
     if (EXISTS "${CMAKE_SOURCE_DIR}/bin/lib64/libgtest.a")
@@ -55,7 +54,7 @@ function(config_test file)
         endif()
     endif()
 
-    if (CMAKE_VERSION VERSION_LESS "3.0")
+    if (GTest_FOUND)
         if (BUILD_ROCKETMQ_SHARED)
             target_link_libraries(${basename} rocketmq_shared
                 GTest::gtest GTest::gtest_main GTest::gmock GTest::gmock_main)
diff --git a/test/src/message/MQMessageExtTest.cpp b/test/src/message/MQMessageExtTest.cpp
index b889aa8..702233c 100644
--- a/test/src/message/MQMessageExtTest.cpp
+++ b/test/src/message/MQMessageExtTest.cpp
@@ -89,18 +89,19 @@ TEST(MessageExtTest, MessageClientExtImpl) {
   messageClientExt.set_store_timestamp(2222);
   EXPECT_EQ(messageClientExt.store_timestamp(), 2222);
 
-  messageClientExt.set_born_host(rocketmq::string2SocketAddress("127.0.0.1:10091"));
+  messageClientExt.set_born_host(rocketmq::StringToSockaddr("127.0.0.1:10091"));
   EXPECT_EQ(messageClientExt.born_host_string(), "127.0.0.1:10091");
 
-  messageClientExt.set_store_host(rocketmq::string2SocketAddress("127.0.0.2:10092"));
+  messageClientExt.set_store_host(rocketmq::StringToSockaddr("127.0.0.2:10092"));
   EXPECT_EQ(messageClientExt.store_host_string(), "127.0.0.2:10092");
 }
 
 TEST(MessageExtTest, MessageExt) {
-  struct sockaddr* bronHost = rocketmq::copySocketAddress(nullptr, rocketmq::string2SocketAddress("127.0.0.1:10091"));
-  struct sockaddr* storeHost = rocketmq::copySocketAddress(nullptr, rocketmq::string2SocketAddress("127.0.0.2:10092"));
+  auto bronHost = rocketmq::SockaddrToStorage(rocketmq::StringToSockaddr("127.0.0.1:10091"));
+  auto storeHost = rocketmq::SockaddrToStorage(rocketmq::StringToSockaddr("127.0.0.2:10092"));
 
-  MQMessageExt messageExt(2, 1024, bronHost, 2048, storeHost, "msgId");
+  MQMessageExt messageExt(2, 1024, reinterpret_cast<sockaddr*>(bronHost.get()), 2048,
+                          reinterpret_cast<sockaddr*>(storeHost.get()), "msgId");
   EXPECT_EQ(messageExt.queue_offset(), 0);
   EXPECT_EQ(messageExt.commit_log_offset(), 0);
   EXPECT_EQ(messageExt.born_timestamp(), 1024);
@@ -113,9 +114,6 @@ TEST(MessageExtTest, MessageExt) {
   EXPECT_EQ(messageExt.msg_id(), "msgId");
   EXPECT_EQ(messageExt.born_host_string(), "127.0.0.1:10091");
   EXPECT_EQ(messageExt.store_host_string(), "127.0.0.2:10092");
-
-  free(bronHost);
-  free(storeHost);
 }
 
 TEST(MessageExtTest, ParseTopicFilterType) {
diff --git a/test/src/message/MessageDecoderTest.cpp b/test/src/message/MessageDecoderTest.cpp
index d602d80..74e55ce 100644
--- a/test/src/message/MessageDecoderTest.cpp
+++ b/test/src/message/MessageDecoderTest.cpp
@@ -14,16 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-#include <gmock/gmock.h>
-#include <gtest/gtest.h>
+#include "MessageDecoder.h"
 
 #include <string>
 #include <vector>
 
+#include <arpa/inet.h>
+
+#include <gmock/gmock.h>
+#include <gtest/gtest.h>
+
 #include "ByteArray.h"
 #include "ByteBuffer.hpp"
-#include "MessageDecoder.h"
 #include "MQMessage.h"
 #include "MQMessageConst.h"
 #include "MQMessageExt.h"
@@ -39,12 +41,12 @@ using testing::Return;
 
 using rocketmq::ByteArray;
 using rocketmq::ByteBuffer;
-using rocketmq::MessageSysFlag;
 using rocketmq::MessageDecoder;
+using rocketmq::MessageId;
+using rocketmq::MessageSysFlag;
 using rocketmq::MQMessage;
 using rocketmq::MQMessageConst;
 using rocketmq::MQMessageExt;
-using rocketmq::MessageId;
 using rocketmq::RemotingCommand;
 using rocketmq::SendMessageRequestHeader;
 using rocketmq::stoba;
@@ -52,13 +54,13 @@ using rocketmq::UtilAll;
 
 // TODO
 TEST(MessageDecoderTest, MessageId) {
-  std::string strMsgId = MessageDecoder::createMessageId(rocketmq::string2SocketAddress("127.0.0.1:10091"), 1024LL);
+  std::string strMsgId = MessageDecoder::createMessageId(rocketmq::StringToSockaddr("127.0.0.1:10091"), 1024LL);
   EXPECT_EQ(strMsgId, "7F0000010000276B0000000000000400");
 
   MessageId msgId = MessageDecoder::decodeMessageId(strMsgId);
   EXPECT_EQ(msgId.getOffset(), 1024LL);
 
-  std::string strMsgId2 = MessageDecoder::createMessageId(rocketmq::string2SocketAddress("/172.16.2.114:0"), 123456LL);
+  std::string strMsgId2 = MessageDecoder::createMessageId(rocketmq::StringToSockaddr("/172.16.2.114:0"), 123456LL);
   EXPECT_EQ(strMsgId2, "AC10027200000000000000000001E240");
 
   MessageId msgId2 = MessageDecoder::decodeMessageId(strMsgId2);
@@ -107,7 +109,7 @@ TEST(MessageDecoderTest, Decode) {
   // 10 BORNHOST  56=48+4+4
   byteBuffer->putInt(ntohl(inet_addr("127.0.0.1")));
   byteBuffer->putInt(10091);
-  msgExt.set_born_host(rocketmq::string2SocketAddress("127.0.0.1:10091"));
+  msgExt.set_born_host(rocketmq::StringToSockaddr("127.0.0.1:10091"));
 
   // 11 STORETIMESTAMP  64=56+8
   byteBuffer->putLong(4096LL);
@@ -116,7 +118,7 @@ TEST(MessageDecoderTest, Decode) {
   // 12 STOREHOST  72=64+4+4
   byteBuffer->putInt(ntohl(inet_addr("127.0.0.2")));
   byteBuffer->putInt(10092);
-  msgExt.set_store_host(rocketmq::string2SocketAddress("127.0.0.2:10092"));
+  msgExt.set_store_host(rocketmq::StringToSockaddr("127.0.0.2:10092"));
 
   // 13 RECONSUMETIMES 76=72+4
   byteBuffer->putInt(18);
diff --git a/test/src/message/MessageIdTest.cpp b/test/src/message/MessageIdTest.cpp
index aa09006..19efbf4 100644
--- a/test/src/message/MessageIdTest.cpp
+++ b/test/src/message/MessageIdTest.cpp
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "MessageId.h"
+#include "SocketUtil.h"
+
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "MessageId.h"
-
 using namespace std;
 using testing::InitGoogleMock;
 using testing::InitGoogleTest;
@@ -27,12 +28,12 @@ using testing::Return;
 using rocketmq::MessageId;
 
 TEST(MessageIdTest, MessageId) {
-  MessageId msgId(rocketmq::string2SocketAddress("127.0.0.1:10091"), 1024);
-  EXPECT_EQ(rocketmq::socketAddress2String(msgId.getAddress()), "127.0.0.1:10091");
+  MessageId msgId(rocketmq::StringToSockaddr("127.0.0.1:10091"), 1024);
+  EXPECT_EQ(rocketmq::SockaddrToString(msgId.getAddress()), "127.0.0.1:10091");
   EXPECT_EQ(msgId.getOffset(), 1024);
 
-  msgId.setAddress(rocketmq::string2SocketAddress("127.0.0.2:10092"));
-  EXPECT_EQ(rocketmq::socketAddress2String(msgId.getAddress()), "127.0.0.2:10092");
+  msgId.setAddress(rocketmq::StringToSockaddr("127.0.0.2:10092"));
+  EXPECT_EQ(rocketmq::SockaddrToString(msgId.getAddress()), "127.0.0.2:10092");
 
   msgId.setOffset(2048);
   EXPECT_EQ(msgId.getOffset(), 2048);
diff --git a/test/src/protocol/ConsumerRunningInfoTest.cpp b/test/src/protocol/ConsumerRunningInfoTest.cpp
index 7c3dc8f..74c21aa 100644
--- a/test/src/protocol/ConsumerRunningInfoTest.cpp
+++ b/test/src/protocol/ConsumerRunningInfoTest.cpp
@@ -14,6 +14,8 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "protocol/body/ConsumerRunningInfo.h"
+
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 #include <json/reader.h>
@@ -23,8 +25,6 @@
 #include <map>
 #include <string>
 
-#include "ConsumerRunningInfo.h"
-
 using std::map;
 using std::string;
 
diff --git a/test/src/protocol/ProcessQueueInfoTest.cpp b/test/src/protocol/ProcessQueueInfoTest.cpp
index cfd1a56..dcb544a 100644
--- a/test/src/protocol/ProcessQueueInfoTest.cpp
+++ b/test/src/protocol/ProcessQueueInfoTest.cpp
@@ -14,11 +14,12 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "protocol/body/ProcessQueueInfo.hpp"
+
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 #include "MQMessageExt.h"
-#include "ProcessQueueInfo.h"
 
 using testing::InitGoogleMock;
 using testing::InitGoogleTest;
diff --git a/test/src/transport/ClientRemotingProcessorTest.cpp b/test/src/transport/ClientRemotingProcessorTest.cpp
index 1a840d9..e751a14 100644
--- a/test/src/transport/ClientRemotingProcessorTest.cpp
+++ b/test/src/transport/ClientRemotingProcessorTest.cpp
@@ -14,18 +14,18 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "ClientRemotingProcessor.h"
+
+#include <map>
+#include <memory>
+
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 #include <json/value.h>
 #include <json/writer.h>
 
-#include <map>
-#include <memory>
-
 #include "ByteArray.h"
 #include "ClientRPCHook.h"
-#include "ClientRemotingProcessor.h"
-#include "ConsumerRunningInfo.h"
 #include "MQClientConfigImpl.hpp"
 #include "MQClientInstance.h"
 #include "MQMessageQueue.h"
@@ -34,6 +34,7 @@
 #include "SessionCredentials.h"
 #include "TcpTransport.h"
 #include "UtilAll.h"
+#include "protocol/body/ConsumerRunningInfo.h"
 #include "protocol/body/ResetOffsetBody.hpp"
 #include "protocol/header/CommandHeader.h"
 
diff --git a/test/src/transport/ResponseFutureTest.cpp b/test/src/transport/ResponseFutureTest.cpp
index 9826c82..15680d9 100644
--- a/test/src/transport/ResponseFutureTest.cpp
+++ b/test/src/transport/ResponseFutureTest.cpp
@@ -14,12 +14,13 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "ResponseFuture.h"
+
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
 #include "InvokeCallback.h"
 #include "RemotingCommand.h"
-#include "ResponseFuture.h"
 #include "UtilAll.h"
 #include "protocol/RequestCode.h"
 
@@ -48,8 +49,8 @@ TEST(ResponseFutureTest, Init) {
   EXPECT_FALSE(responseFuture.hasInvokeCallback());
 
   // ~ResponseFuture delete callback
-  auto* callback = new MockInvokeCallback();
-  ResponseFuture twoResponseFuture(MQRequestCode::QUERY_BROKER_OFFSET, 4, 1000, callback);
+  ResponseFuture twoResponseFuture(MQRequestCode::QUERY_BROKER_OFFSET, 4, 1000,
+                                   std::unique_ptr<InvokeCallback>(new MockInvokeCallback()));
   EXPECT_TRUE(twoResponseFuture.hasInvokeCallback());
 }
 
diff --git a/test/src/transport/SocketUtilTest.cpp b/test/src/transport/SocketUtilTest.cpp
index e7b8a70..985d826 100644
--- a/test/src/transport/SocketUtilTest.cpp
+++ b/test/src/transport/SocketUtilTest.cpp
@@ -14,11 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
+#include "SocketUtil.h"
+
 #include <gmock/gmock.h>
 #include <gtest/gtest.h>
 
-#include "SocketUtil.h"
-
 using testing::InitGoogleMock;
 using testing::InitGoogleTest;
 using testing::Return;
@@ -29,14 +29,14 @@ using namespace rocketmq;
 
 TEST(SocketUtilTest, Convert) {
   char ip[] = {0x7F, 0x00, 0x00, 0x01};
-  struct sockaddr* sa = ipPort2SocketAddress(ByteArray(ip, sizeof(ip)), 0x276B);
+  struct sockaddr* sa = IPPortToSockaddr(ByteArray(ip, sizeof(ip)), 0x276B);
   struct sockaddr_in* sin = (struct sockaddr_in*)sa;
   EXPECT_EQ(sin->sin_addr.s_addr, 0x0100007F);
   EXPECT_EQ(sin->sin_port, 0x6B27);
 
-  EXPECT_EQ(socketAddress2String(sa), "127.0.0.1:10091");
+  EXPECT_EQ(SockaddrToString(sa), "127.0.0.1:10091");
 
-  sa = string2SocketAddress("127.0.0.1:10091");
+  sa = StringToSockaddr("127.0.0.1:10091");
   sin = (struct sockaddr_in*)sa;
   EXPECT_EQ(sin->sin_addr.s_addr, 0x0100007F);
   EXPECT_EQ(sin->sin_port, 0x6B27);