You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/01/13 16:08:25 UTC

[GitHub] stcai commented on a change in pull request #60: feature: only one event loop for all TcpTransport.

stcai commented on a change in pull request #60: feature: only one event loop for all TcpTransport.
URL: https://github.com/apache/rocketmq-client-cpp/pull/60#discussion_r247352089
 
 

 ##########
 File path: src/transport/TcpRemotingClient.cpp
 ##########
 @@ -1,758 +1,666 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "TcpRemotingClient.h"
-#include <stddef.h>
-#if !defined(WIN32) && !defined(__APPLE__)
-#include <sys/prctl.h>
-#endif
-#include "Logging.h"
-#include "MemoryOutputStream.h"
-#include "TopAddressing.h"
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-//<!************************************************************************
-TcpRemotingClient::TcpRemotingClient(int pullThreadNum,
-                                     uint64_t tcpConnectTimeout,
-                                     uint64_t tcpTransportTryLockTimeout)
-    : m_pullThreadNum(pullThreadNum),
-      m_tcpConnectTimeout(tcpConnectTimeout),
-      m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
-      m_namesrvIndex(0),
-      m_ioServiceWork(m_ioService) {
-#if !defined(WIN32) && !defined(__APPLE__)  
-  string taskName = UtilAll::getProcessName();
-  prctl(PR_SET_NAME, "networkTP", 0, 0, 0);
-#endif
-  for (int i = 0; i != pullThreadNum; ++i) {
-    m_threadpool.create_thread(
-        boost::bind(&boost::asio::io_service::run, &m_ioService));
-  }
-#if !defined(WIN32) && !defined(__APPLE__)
-  prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
-#endif
-  LOG_INFO(
-      "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, "
-      "m_pullThreadNum:%d",
-      m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
-  m_async_service_thread.reset(new boost::thread(
-      boost::bind(&TcpRemotingClient::boost_asio_work, this)));
-}
-
-void TcpRemotingClient::boost_asio_work() {
-  LOG_INFO("TcpRemotingClient::boost asio async service runing");
-  boost::asio::io_service::work work(m_async_ioService);  // avoid async io
-                                                          // service stops after
-                                                          // first timer timeout
-                                                          // callback
-  m_async_ioService.run();
-}
-
-TcpRemotingClient::~TcpRemotingClient() {
-  m_tcpTable.clear();
-  m_futureTable.clear();
-  m_asyncFutureTable.clear();
-  m_namesrvAddrList.clear();
-  removeAllTimerCallback();
-}
-
-void TcpRemotingClient::stopAllTcpTransportThread() {
-  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
-  m_async_ioService.stop();
-  m_async_service_thread->interrupt();
-  m_async_service_thread->join();
-  removeAllTimerCallback();
-
-  {
-    TcpMap::iterator it = m_tcpTable.begin();
-    for (; it != m_tcpTable.end(); ++it) {
-      it->second->disconnect(it->first);
-    }
-    m_tcpTable.clear();
-  }
-
-  m_ioService.stop();
-  m_threadpool.join_all();
-
-  {
-    boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-    for (ResMap::iterator it = m_futureTable.begin(); it != m_futureTable.end();
-         ++it) {
-      if (it->second) it->second->releaseThreadCondition();
-    }
-  }
-  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
-}
-
-void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
-  if (!addrs.empty()) {
-    boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
-                                                boost::try_to_lock);
-    if (!lock.owns_lock()) {
-      if (!lock.timed_lock(boost::get_system_time() +
-                           boost::posix_time::seconds(10))) {
-        LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
-        return;
-      }
-    }
-    // clear first;
-    m_namesrvAddrList.clear();
-
-    vector<string> out;
-    UtilAll::Split(out, addrs, ";");
-    for (size_t i = 0; i < out.size(); i++) {
-      string addr = out[i];
-      UtilAll::Trim(addr);
-
-      string hostName;
-      short portNumber;
-      if (UtilAll::SplitURL(addr, hostName, portNumber)) {
-        LOG_INFO("update Namesrv:%s", addr.c_str());
-        m_namesrvAddrList.push_back(addr);
-      }
-    }
-    out.clear();
-  }
-}
-
-bool TcpRemotingClient::invokeHeartBeat(const string& addr,
-                                        RemotingCommand& request) {
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    int code = request.getCode();
-    int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(
-        new ResponseFuture(code, opaque, this, 3000, false, NULL));
-    addResponseFuture(opaque, responseFuture);
-    // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
-    // timeoutms:%d", addr.c_str(), code, opaque, 3000);
-
-    if (SendCommand(pTcp, request)) {
-      responseFuture->setSendRequestOK(true);
-      unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
-      if (pRsp == NULL) {
-        LOG_ERROR(
-            "wait response timeout of heartbeat, so closeTransport of addr:%s",
-            addr.c_str());
-        CloseTransport(addr, pTcp);
-        return false;
-      } else if (pRsp->getCode() == SUCCESS_VALUE) {
-        return true;
-      } else {
-        LOG_WARN("get error response:%d of heartbeat to addr:%s",
-                 pRsp->getCode(), addr.c_str());
-        return false;
-      }
-    } else {
-      CloseTransport(addr, pTcp);
-    }
-  }
-  return false;
-}
-
-RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
-                                               RemotingCommand& request,
-                                               int timeoutMillis /* = 3000 */) {
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    int code = request.getCode();
-    int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(
-        new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL));
-    addResponseFuture(opaque, responseFuture);
-
-    if (SendCommand(pTcp, request)) {
-      // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
-      // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
-      responseFuture->setSendRequestOK(true);
-      RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis);
-      if (pRsp == NULL) {
-        if (code != GET_CONSUMER_LIST_BY_GROUP) {
-          LOG_WARN(
-              "wait response timeout or get NULL response of code:%d, so "
-              "closeTransport of addr:%s",
-              code, addr.c_str());
-          CloseTransport(addr, pTcp);
-        }
-        // avoid responseFuture leak;
-        findAndDeleteResponseFuture(opaque);
-        return NULL;
-      } else {
-        return pRsp;
-      }
-    } else {
-      // avoid responseFuture leak;
-      findAndDeleteResponseFuture(opaque);
-      CloseTransport(addr, pTcp);
-    }
-  }
-  return NULL;
-}
-
-bool TcpRemotingClient::invokeAsync(const string& addr,
-                                    RemotingCommand& request,
-                                    AsyncCallbackWrap* cbw,
-                                    int64 timeoutMilliseconds,
-                                    int maxRetrySendTimes,
-                                    int retrySendTimes) {
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    //<!not delete, for callback to delete;
-    int code = request.getCode();
-    int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(
-        new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw));
-	responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
-	responseFuture->setRetrySendTimes(retrySendTimes);
-	responseFuture->setBrokerAddr(addr);
-	responseFuture->setRequestCommand(request);	
-    addAsyncResponseFuture(opaque, responseFuture);
-    if (cbw) {
-      boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
-          m_async_ioService,
-          boost::posix_time::milliseconds(timeoutMilliseconds));
-      addTimerCallback(t, opaque);
-      boost::system::error_code e;
-      t->async_wait(
-          boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout,
-                      this, e, opaque));
-    }
-
-    if (SendCommand(pTcp, request))  // Even if send failed, asyncTimerThread
-                                     // will trigger next pull request or report
-                                     // send msg failed
-    {
-      LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d",
-                addr.c_str(), code, opaque);
-      responseFuture->setSendRequestOK(true);
-    }
-    return true;
-  }
-  LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
-  return false;
-}
-
-void TcpRemotingClient::invokeOneway(const string& addr,
-                                     RemotingCommand& request) {
-  //<!not need callback;
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    request.markOnewayRPC();
-    LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(),
-              request.getCode());
-    SendCommand(pTcp, request);
-  }
-}
-
-boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(
-    const string& addr, bool needRespons) {
-  if (addr.empty()) return CreateNameserverTransport(needRespons);
-
-  return CreateTransport(addr, needRespons);
-}
-
-boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(
-    const string& addr, bool needRespons) {
-  boost::shared_ptr<TcpTransport> tts;
-  {
-    // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
-    // long
-    // time, if could not get m_tcpLock, return NULL
-    bool bGetMutex = false;
-    boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
-    if (!lock.owns_lock()) {
-      if (!lock.timed_lock(
-              boost::get_system_time() +
-              boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-        LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
-        boost::shared_ptr<TcpTransport> pTcp;
-        return pTcp;
-      } else {
-        bGetMutex = true;
-      }
-    } else {
-      bGetMutex = true;
-    }
-    if (bGetMutex) {
-      if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-        boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
-        boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
-        if (tcp) {
-          tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
-          if (connectStatus == e_connectWaitResponse) {
-            boost::shared_ptr<TcpTransport> pTcp;
-            return pTcp;
-          } else if (connectStatus == e_connectFail) {
-            LOG_ERROR("tcpTransport with server disconnected, erase server:%s",
-                      addr.c_str());
-            tcp->disconnect(
-                addr);  // avoid coredump when connection with broker was broken
-            m_tcpTable.erase(addr);
-          } else if (connectStatus == e_connectSuccess) {
-            return tcp;
-          } else {
-            LOG_ERROR(
-                "go to fault state, erase:%s from tcpMap, and reconnect "
-                "it",
-                addr.c_str());
-            m_tcpTable.erase(addr);
-          }
-        }
-      }
-
-      //<!callback;
-      READ_CALLBACK callback =
-          needRespons ? &TcpRemotingClient::static_messageReceived : NULL;
-
-      tts.reset(new TcpTransport(this, callback));
-      tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
-      if (connectStatus != e_connectWaitResponse) {
-        LOG_WARN("can not connect to :%s", addr.c_str());
-        tts->disconnect(addr);
-        boost::shared_ptr<TcpTransport> pTcp;
-        return pTcp;
-      } else {
-        m_tcpTable[addr] = tts;  // even if connecting failed finally, this
-                                 // server transport will be erased by next
-                                 // CreateTransport
-      }
-    } else {
-      LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str());
-      boost::shared_ptr<TcpTransport> pTcp;
-      return pTcp;
-    }
-  }
-
-  tcpConnectStatus connectStatus =
-      tts->waitTcpConnectEvent(m_tcpConnectTimeout);
-  if (connectStatus != e_connectSuccess) {
-    LOG_WARN("can not connect to server:%s", addr.c_str());
-    tts->disconnect(addr);
-    boost::shared_ptr<TcpTransport> pTcp;
-    return pTcp;
-  } else {
-    LOG_INFO("connect server with addr:%s success", addr.c_str());
-    return tts;
-  }
-}
-
-boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameserverTransport(
-    bool needRespons) {
-  // m_namesrvLock was added to avoid operation of nameServer was blocked by
-  // m_tcpLock, it was used by single Thread mostly, so no performance impact
-  // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long
-  // time, if could not get m_namesrvlock, return NULL
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
-                                              boost::try_to_lock);
-  if (!lock.owns_lock()) {
-    if (!lock.timed_lock(
-            boost::get_system_time() +
-            boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
-      boost::shared_ptr<TcpTransport> pTcp;
-      return pTcp;
-    } else {
-      bGetMutex = true;
-    }
-  } else {
-    bGetMutex = true;
-  }
-
-  if (bGetMutex) {
-    if (!m_namesrvAddrChoosed.empty()) {
-      boost::shared_ptr<TcpTransport> pTcp =
-          GetTransport(m_namesrvAddrChoosed, true);
-      if (pTcp)
-        return pTcp;
-      else
-        m_namesrvAddrChoosed.clear();
-    }
-
-    vector<string>::iterator itp = m_namesrvAddrList.begin();
-    for (; itp != m_namesrvAddrList.end(); ++itp) {
-      unsigned int index = m_namesrvIndex % m_namesrvAddrList.size();
-      if (m_namesrvIndex == numeric_limits<unsigned int>::max())
-        m_namesrvIndex = 0;
-      m_namesrvIndex++;
-      LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT
-               "",
-               m_namesrvIndex, index, m_namesrvAddrList.size());
-      boost::shared_ptr<TcpTransport> pTcp =
-          GetTransport(m_namesrvAddrList[index], true);
-      if (pTcp) {
-        m_namesrvAddrChoosed = m_namesrvAddrList[index];
-        return pTcp;
-      }
-    }
-    boost::shared_ptr<TcpTransport> pTcp;
-    return pTcp;
-  } else {
-    LOG_WARN("get nameServer tcpTransport mutex failed");
-    boost::shared_ptr<TcpTransport> pTcp;
-    return pTcp;
-  }
-}
-
-void TcpRemotingClient::CloseTransport(const string& addr,
-                                       boost::shared_ptr<TcpTransport> pTcp) {
-  if (addr.empty()) {
-    return CloseNameServerTransport(pTcp);
-  }
-
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
-  if (!lock.owns_lock()) {
-    if (!lock.timed_lock(
-            boost::get_system_time() +
-            boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-      LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
-      return;
-    } else {
-      bGetMutex = true;
-    }
-  } else {
-    bGetMutex = true;
-  }
-  LOG_ERROR("CloseTransport of:%s", addr.c_str());
-  if (bGetMutex) {
-    bool removeItemFromTable = true;
-    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
-        LOG_INFO(
-            "tcpTransport with addr:%s has been closed before, and has been "
-            "created again, nothing to do",
-            addr.c_str());
-        removeItemFromTable = false;
-      }
-    } else {
-      LOG_INFO(
-          "tcpTransport with addr:%s had been removed from tcpTable before",
-          addr.c_str());
-      removeItemFromTable = false;
-    }
-
-    if (removeItemFromTable == true) {
-      LOG_WARN("closeTransport: disconnect broker:%s with state:%d",
-               addr.c_str(), m_tcpTable[addr]->getTcpConnectStatus());
-      if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
-        m_tcpTable[addr]->disconnect(
-            addr);  // avoid coredump when connection with server was broken
-      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
-      m_tcpTable.erase(addr);
-    }
-  } else {
-    LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str());
-    return;
-  }
-  LOG_ERROR("CloseTransport of:%s end", addr.c_str());
-}
-
-void TcpRemotingClient::CloseNameServerTransport(
-    boost::shared_ptr<TcpTransport> pTcp) {
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
-                                              boost::try_to_lock);
-  if (!lock.owns_lock()) {
-    if (!lock.timed_lock(
-            boost::get_system_time() +
-            boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
-      return;
-    } else {
-      bGetMutex = true;
-    }
-  } else {
-    bGetMutex = true;
-  }
-  if (bGetMutex) {
-    string addr = m_namesrvAddrChoosed;
-    bool removeItemFromTable = true;
-    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
-        LOG_INFO(
-            "tcpTransport with addr:%s has been closed before, and has been "
-            "created again, nothing to do",
-            addr.c_str());
-        removeItemFromTable = false;
-      }
-    } else {
-      LOG_INFO(
-          "tcpTransport with addr:%s had been removed from tcpTable before",
-          addr.c_str());
-      removeItemFromTable = false;
-    }
-
-    if (removeItemFromTable == true) {
-      m_tcpTable[addr]->disconnect(
-          addr);  // avoid coredump when connection with server was broken
-      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
-      m_tcpTable.erase(addr);
-      m_namesrvAddrChoosed.clear();
-    }
-  } else {
-    LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s",
-             m_namesrvAddrChoosed.c_str());
-    return;
-  }
-}
-
-bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts,
-                                    RemotingCommand& msg) {
-  const MemoryBlock* phead = msg.GetHead();
-  const MemoryBlock* pbody = msg.GetBody();
-
-  unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
-  if (phead->getData()) {
-    result->write(phead->getData(), phead->getSize());
-  }
-  if (pbody->getData()) {
-    result->write(pbody->getData(), pbody->getSize());
-  }
-  const char* pData = static_cast<const char*>(result->getData());
-  int len = result->getDataSize();
-  return pTts->sendMessage(pData, len);
-}
-
-void TcpRemotingClient::static_messageReceived(void* context,
-                                               const MemoryBlock& mem,
-                                               const string& addr) {
-  TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context;
-  if (pTcpRemotingClient) pTcpRemotingClient->messageReceived(mem, addr);
-}
-
-void TcpRemotingClient::messageReceived(const MemoryBlock& mem,
-                                        const string& addr) {
-  m_ioService.post(
-      boost::bind(&TcpRemotingClient::ProcessData, this, mem, addr));
-}
-
-void TcpRemotingClient::ProcessData(const MemoryBlock& mem,
-                                    const string& addr) {
-  RemotingCommand* pRespondCmd = NULL;
-  try {
-    pRespondCmd = RemotingCommand::Decode(mem);
-  } catch (...) {
-    LOG_ERROR("processData_error");
-    return;
-  }
-
-  int opaque = pRespondCmd->getOpaque();
-
-  //<!process self;
-  if (pRespondCmd->isResponseType()) {
-    boost::shared_ptr<ResponseFuture> pFuture(
-        findAndDeleteAsyncResponseFuture(opaque));
-    if (!pFuture) {
-      pFuture = findAndDeleteResponseFuture(opaque);
-      if (pFuture) {
-        if (pFuture->getSyncResponseFlag()) {
-          LOG_WARN("waitResponse already timeout of opaque:%d", opaque);
-          deleteAndZero(pRespondCmd);
-          return;
-        }
-        LOG_DEBUG("find_response opaque:%d", opaque);
-      } else {
-        LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", opaque);
-        deleteAndZero(pRespondCmd);
-        return;
-      }
-    }
-    processResponseCommand(pRespondCmd, pFuture);
-  } else {
-    processRequestCommand(pRespondCmd, addr);
-  }
-}
-
-void TcpRemotingClient::processResponseCommand(
-    RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture) {
-  int code = pfuture->getRequestCode();
-  int opaque = pCmd->getOpaque();
-  LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque, pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes());
-  pCmd->SetExtHeader(code);  // set head , for response use
-
-  pfuture->setResponse(pCmd);
-
-  if (pfuture->getASyncFlag()) {
-    if (!pfuture->getAsyncResponseFlag()) {
-      pfuture->setAsyncResponseFlag();
-      pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
-      cancelTimerCallback(opaque);
-      pfuture->executeInvokeCallback();	  
-    }
-  }
-}
-
-void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd,
-                                              const string& addr) {
-  unique_ptr<RemotingCommand> pRequestCommand(pCmd);
-  int requestCode = pRequestCommand->getCode();
-  if (m_requestTable.find(requestCode) == m_requestTable.end()) {
-    LOG_ERROR("can_not_find request:%d processor", requestCode);
-  } else {
-    unique_ptr<RemotingCommand> pResponse(
-        m_requestTable[requestCode]->processRequest(addr,
-                                                    pRequestCommand.get()));
-    if (!pRequestCommand->isOnewayRPC()) {
-      if (pResponse) {
-        pResponse->setOpaque(pRequestCommand->getOpaque());
-        pResponse->markResponseType();
-        pResponse->Encode();
-
-        invokeOneway(addr, *pResponse);
-      }
-    }
-  }
-}
-
-void TcpRemotingClient::addResponseFuture(
-    int opaque, boost::shared_ptr<ResponseFuture> pfuture) {
-  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-  m_futureTable[opaque] = pfuture;
-}
-
-// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
-// be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture>
-TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-  boost::shared_ptr<ResponseFuture> pResponseFuture;
-  if (m_futureTable.find(opaque) != m_futureTable.end()) {
-    pResponseFuture = m_futureTable[opaque];
-    m_futureTable.erase(opaque);
-  }
-  return pResponseFuture;
-}
-
-void TcpRemotingClient::handleAsyncPullForResponseTimeout(
-    const boost::system::error_code& e, int opaque) {
-  if (e == boost::asio::error::operation_aborted) {
-    LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
-    return;
-  }
-
-  LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
-  boost::shared_ptr<ResponseFuture> pFuture(
-      findAndDeleteAsyncResponseFuture(opaque));
-  if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) {
-    if ((pFuture->getAsyncResponseFlag() !=
-         true))  // if no response received, then check timeout or not
-    {
-      LOG_ERROR("no response got for opaque:%d", opaque);
-      pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout);
-      pFuture->executeInvokeCallbackException();
-    }
-  }
-
-  eraseTimerCallback(opaque);
-}
-
-void TcpRemotingClient::addAsyncResponseFuture(
-    int opaque, boost::shared_ptr<ResponseFuture> pfuture) {
-  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
-  m_asyncFutureTable[opaque] = pfuture;
-}
-
-// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] will
-// be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture>
-TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
-  boost::shared_ptr<ResponseFuture> pResponseFuture;
-  if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
-    pResponseFuture = m_asyncFutureTable[opaque];
-    m_asyncFutureTable.erase(opaque);
-  }
-
-  return pResponseFuture;
-}
-
-void TcpRemotingClient::registerProcessor(
-    MQRequestCode requestCode,
-    ClientRemotingProcessor* clientRemotingProcessor) {
-  if (m_requestTable.find(requestCode) != m_requestTable.end())
-    m_requestTable.erase(requestCode);
-  m_requestTable[requestCode] = clientRemotingProcessor;
-}
-
-void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t,
-                                         int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-    LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
-    boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
-    old_t->cancel();
-    delete old_t;
-    old_t = NULL;
-    m_async_timer_map.erase(opaque);
-  }
-  m_async_timer_map[opaque] = t;
-}
-
-void TcpRemotingClient::eraseTimerCallback(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-    LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
-    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
-    delete t;
-    t = NULL;
-    m_async_timer_map.erase(opaque);
-  }
-}
-
-void TcpRemotingClient::cancelTimerCallback(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-    LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);    
-    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
-    t->cancel();
-    delete t;
-    t = NULL;
-    m_async_timer_map.erase(opaque);
-  }
-}
-
-void TcpRemotingClient::removeAllTimerCallback() {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  for (asyncTimerMap::iterator it = m_async_timer_map.begin();
-       it != m_async_timer_map.end(); ++it) {
-    boost::asio::deadline_timer* t = it->second;
-    t->cancel();
-    delete t;
-    t = NULL;
-  }
-  m_async_timer_map.clear();
-}
-
-void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
-  //delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it later
-  boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
-  if (!pFuture) {
-    pFuture = findAndDeleteResponseFuture(opaque);
-    if (pFuture) {
-      LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
-    }
-  } else {
-    LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); 
-  }
-  //delete the timeout timer for opaque for pullrequest
-  cancelTimerCallback(opaque);
-}
-
-//<!************************************************************************
-}  //<!end namespace;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TcpRemotingClient.h"
+#include <stddef.h>
+#if !defined(WIN32) && !defined(__APPLE__)
+#include <sys/prctl.h>
+#endif
+
+#include "Logging.h"
+#include "MemoryOutputStream.h"
+#include "TopAddressing.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+//<!************************************************************************
+TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
+    : m_pullThreadNum(pullThreadNum),
+      m_tcpConnectTimeout(tcpConnectTimeout),
+      m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
+      m_namesrvIndex(0),
+      m_ioServiceWork(m_ioService) {
+
+#if !defined(WIN32) && !defined(__APPLE__)
+  string taskName = UtilAll::getProcessName();
+  prctl(PR_SET_NAME, "NetworkTP", 0, 0, 0);
+#endif
+  for (int i = 0; i != pullThreadNum; ++i) {
+    m_threadpool.create_thread(
+        boost::bind(&boost::asio::io_service::run, &m_ioService));
+  }
+#if !defined(WIN32) && !defined(__APPLE__)
+  prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+#endif
+
+  LOG_INFO("m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, m_pullThreadNum:%d",
+           m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
+
+  m_async_service_thread.reset(new boost::thread(
+      boost::bind(&TcpRemotingClient::boost_asio_work, this)));
+}
+
+void TcpRemotingClient::boost_asio_work() {
+  LOG_INFO("TcpRemotingClient::boost asio async service running");
+
+#if !defined(WIN32) && !defined(__APPLE__)
+  prctl(PR_SET_NAME, "RemotingAsioT", 0, 0, 0);
+#endif
+
+  // avoid async io service stops after first timer timeout callback
+  boost::asio::io_service::work work(m_async_ioService);
+
+  m_async_ioService.run();
+}
+
+TcpRemotingClient::~TcpRemotingClient() {
+  m_tcpTable.clear();
+  m_futureTable.clear();
+  m_asyncFutureTable.clear();
+  m_namesrvAddrList.clear();
+  removeAllTimerCallback();
+}
+
+void TcpRemotingClient::stopAllTcpTransportThread() {
+  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
+
+  m_async_ioService.stop();
+  m_async_service_thread->interrupt();
+  m_async_service_thread->join();
+  removeAllTimerCallback();
+
+  {
+    for (const auto &trans : m_tcpTable) {
+      trans.second->disconnect(trans.first);
+    }
+    m_tcpTable.clear();
+  }
+
+  m_ioService.stop();
+  m_threadpool.join_all();
+
+  {
+    boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+    for (const auto &future : m_futureTable) {
+      if (future.second)
+        future.second->releaseThreadCondition();
+    }
+  }
+
+  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
+}
+
+void TcpRemotingClient::updateNameServerAddressList(const string &addrs) {
+  if (addrs.empty()) return;
+
+  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(10))) {
+      LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
+      return;
+    }
+  }
+
+  // clear first;
+  m_namesrvAddrList.clear();
+
+  vector<string> out;
+  UtilAll::Split(out, addrs, ";");
+  for (auto addr : out) {
+    UtilAll::Trim(addr);
+
+    string hostName;
+    short portNumber;
+    if (UtilAll::SplitURL(addr, hostName, portNumber)) {
+      LOG_INFO("update Namesrv:%s", addr.c_str());
+      m_namesrvAddrList.push_back(addr);
+    }
+  }
+  out.clear();
+}
+
+bool TcpRemotingClient::invokeHeartBeat(const string &addr, RemotingCommand &request) {
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != nullptr) {
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, opaque, this, 3000, false, nullptr));
+    addResponseFuture(opaque, responseFuture);
+    // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
+    // timeoutms:%d", addr.c_str(), code, opaque, 3000);
+
+    if (SendCommand(pTcp, request)) {
+      responseFuture->setSendRequestOK(true);
+      unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
+      if (pRsp == nullptr) {
+        LOG_ERROR("wait response timeout of heartbeat, so closeTransport of addr:%s", addr.c_str());
+        CloseTransport(addr, pTcp);
+        return false;
+      } else if (pRsp->getCode() == SUCCESS_VALUE) {
+        return true;
+      } else {
+        LOG_WARN("get error response:%d of heartbeat to addr:%s", pRsp->getCode(), addr.c_str());
+        return false;
+      }
+    } else {
+      CloseTransport(addr, pTcp);
+    }
+  }
+  return false;
+}
+
+RemotingCommand *TcpRemotingClient::invokeSync(const string &addr, RemotingCommand &request, int timeoutMillis) {
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != nullptr) {
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(
+        new ResponseFuture(code, opaque, this, timeoutMillis, false, nullptr));
+    addResponseFuture(opaque, responseFuture);
+
+    if (SendCommand(pTcp, request)) {
+      // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
+      // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
+      responseFuture->setSendRequestOK(true);
+      RemotingCommand *pRsp = responseFuture->waitResponse(timeoutMillis);
+      if (pRsp == nullptr) {
+        if (code != GET_CONSUMER_LIST_BY_GROUP) {
+          LOG_WARN("wait response timeout or get NULL response of code:%d, so closeTransport of addr:%s",
+                   code, addr.c_str());
+          CloseTransport(addr, pTcp);
+        }
+        // avoid responseFuture leak;
+        findAndDeleteResponseFuture(opaque);
+        return nullptr;
+      } else {
+        return pRsp;
+      }
+    } else {
+      // avoid responseFuture leak;
+      findAndDeleteResponseFuture(opaque);
+      CloseTransport(addr, pTcp);
+    }
+  }
+  return nullptr;
+}
+
+bool TcpRemotingClient::invokeAsync(const string &addr,
+                                    RemotingCommand &request,
+                                    AsyncCallbackWrap *cbw,
+                                    int64 timeoutMilliseconds,
+                                    int maxRetrySendTimes,
+                                    int retrySendTimes) {
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != nullptr) {
+    //<!not delete, for callback to delete;
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(
+        new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw));
+    responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
+    responseFuture->setRetrySendTimes(retrySendTimes);
+    responseFuture->setBrokerAddr(addr);
+    responseFuture->setRequestCommand(request);
+    addAsyncResponseFuture(opaque, responseFuture);
+    if (cbw) {
+      boost::asio::deadline_timer *t = new boost::asio::deadline_timer(
+          m_async_ioService, boost::posix_time::milliseconds(timeoutMilliseconds));
+      addTimerCallback(t, opaque);
+      boost::system::error_code e;
+      t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout, this, e, opaque));
+    }
+
+    if (SendCommand(pTcp, request)) { // Even if send failed, asyncTimerThread
+                                      // will trigger next pull request or report
+                                      // send msg failed
+      LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", addr.c_str(), code, opaque);
+      responseFuture->setSendRequestOK(true);
+    }
+    return true;
+  }
+  LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
+  return false;
+}
+
+void TcpRemotingClient::invokeOneway(const string &addr, RemotingCommand &request) {
+  //<!not need callback;
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != nullptr) {
+    request.markOnewayRPC();
+    LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(), request.getCode());
+    SendCommand(pTcp, request);
+  }
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string &addr, bool needResponse) {
+  if (addr.empty())
+    return CreateNameServerTransport(needResponse);
+
+  return CreateTransport(addr, needResponse);
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const string &addr, bool needResponse) {
+  boost::shared_ptr<TcpTransport> tts;
+
+  {
+    // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
+    // long time, if could not get m_tcpLock, return NULL
+    boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+    if (!lock.owns_lock()) {
+      if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+        LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
+        boost::shared_ptr<TcpTransport> pTcp;
+        return pTcp;
+      }
+    }
+
+    // check for reuse
+    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+      boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
+      boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
+
+      if (tcp) {
+        tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
+        if (connectStatus == e_connectSuccess) {
+          return tcp;
+        } else if (connectStatus == e_connectWaitResponse) {
+          boost::shared_ptr<TcpTransport> pTcp;
+          return pTcp;
+        } else if (connectStatus == e_connectFail) {
+          LOG_ERROR("tcpTransport with server disconnected, erase server:%s", addr.c_str());
+          tcp->disconnect(addr);  // avoid coredump when connection with broker was broken
+          m_tcpTable.erase(addr);
+        } else {
+          LOG_ERROR("go to fault state, erase:%s from tcpMap, and reconnect it", addr.c_str());
+          m_tcpTable.erase(addr);
+        }
+      }
+    }
+
+    //<!callback;
+    READ_CALLBACK callback = needResponse ? &TcpRemotingClient::static_messageReceived : nullptr;
+
+    tts = TcpTransport::CreateTransport(this, callback);
+    tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
+    if (connectStatus != e_connectWaitResponse) {
+      LOG_WARN("can not connect to :%s", addr.c_str());
+      tts->disconnect(addr);
+      boost::shared_ptr<TcpTransport> pTcp;
+      return pTcp;
+    } else {
+      // even if connecting failed finally, this server transport will be erased by next CreateTransport
+      m_tcpTable[addr] = tts;
+    }
+  }
+
+  tcpConnectStatus connectStatus = tts->waitTcpConnectEvent(m_tcpConnectTimeout);
+  if (connectStatus != e_connectSuccess) {
+    LOG_WARN("can not connect to server:%s", addr.c_str());
+    tts->disconnect(addr);
+    boost::shared_ptr<TcpTransport> pTcp;
+    return pTcp;
+  } else {
+    LOG_INFO("connect server with addr:%s success", addr.c_str());
+    return tts;
+  }
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateNameServerTransport(bool needResponse) {
+  // m_namesrvLock was added to avoid operation of nameServer was blocked by
+  // m_tcpLock, it was used by single Thread mostly, so no performance impact
+  // try get m_tcpLock until m_tcpTransportTryLockTimeout to avoid blocking long
+  // time, if could not get m_namesrvlock, return NULL
+  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
+      boost::shared_ptr<TcpTransport> pTcp;
+      return pTcp;
+    }
+  }
+
+  if (!m_namesrvAddrChoosed.empty()) {
+    boost::shared_ptr<TcpTransport> pTcp = CreateTransport(m_namesrvAddrChoosed, true);
+    if (pTcp)
+      return pTcp;
+    else
+      m_namesrvAddrChoosed.clear();
+  }
+
+  for (int i = 0; i < m_namesrvAddrList.size(); i++) {
+    unsigned int index = m_namesrvIndex++ % m_namesrvAddrList.size();
+    LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT "",
+             m_namesrvIndex, index, m_namesrvAddrList.size());
+    boost::shared_ptr<TcpTransport> pTcp = CreateTransport(m_namesrvAddrList[index], true);
+    if (pTcp) {
+      m_namesrvAddrChoosed = m_namesrvAddrList[index];
+      return pTcp;
+    }
+  }
+
+  boost::shared_ptr<TcpTransport> pTcp;
+  return pTcp;
+}
+
+void TcpRemotingClient::CloseTransport(const string &addr, boost::shared_ptr<TcpTransport> pTcp) {
+  if (addr.empty()) {
+    return CloseNameServerTransport(pTcp);
+  }
+
+  boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
+      return;
+    }
+  }
+
+  LOG_ERROR("CloseTransport of:%s", addr.c_str());
+
+  bool removeItemFromTable = true;
+  if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+    if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+      LOG_INFO("tcpTransport with addr:%s has been closed before, and has been created again, nothing to do",
+               addr.c_str());
+      removeItemFromTable = false;
+    }
+  } else {
+    LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
+    removeItemFromTable = false;
+  }
+
+  if (removeItemFromTable) {
+    LOG_WARN("closeTransport: disconnect broker:%s with state:%d",
+             addr.c_str(), m_tcpTable[addr]->getTcpConnectStatus());
+    if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
+      m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection with server was broken
+    LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+    m_tcpTable.erase(addr);
+  }
+
+  LOG_ERROR("CloseTransport of:%s end", addr.c_str());
+}
+
+void TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr<TcpTransport> pTcp) {
+  bool bGetMutex = false;
+  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(boost::get_system_time() + boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CreateNameServerTransport get timed_mutex timeout");
+      return;
+    } else {
+      bGetMutex = true;
+    }
+  } else {
+    bGetMutex = true;
+  }
+
+  if (bGetMutex) {
+    string addr = m_namesrvAddrChoosed;
+    bool removeItemFromTable = true;
+    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+        LOG_INFO("tcpTransport with addr:%s has been closed before, and has been created again, nothing to do",
+                 addr.c_str());
+        removeItemFromTable = false;
+      }
+    } else {
+      LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable before", addr.c_str());
+      removeItemFromTable = false;
+    }
+
+    if (removeItemFromTable) {
+      m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection with server was broken
+      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+      m_tcpTable.erase(addr);
+      m_namesrvAddrChoosed.clear();
+    }
+  } else {
+    LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s",
+             m_namesrvAddrChoosed.c_str());
+    return;
+  }
+}
+
+bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts, RemotingCommand &msg) {
+  const MemoryBlock *phead = msg.GetHead();
+  const MemoryBlock *pbody = msg.GetBody();
+
+  unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
+  if (phead->getData()) {
+    result->write(phead->getData(), (size_t) phead->getSize());
 
 Review comment:
   It's better to use 'static_cast'.
   We would better to use C++ style type casting instead of C style, isn't it?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services