You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/01/07 02:44:14 UTC

[GitHub] ShannonDing closed pull request #48: [Issue#44] Resolve RetrySendTimes doesn't Work for Async Send

ShannonDing closed pull request #48: [Issue#44] Resolve RetrySendTimes doesn't Work for Async Send
URL: https://github.com/apache/rocketmq-client-cpp/pull/48
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index 9a8f41f3..25799b6a 100755
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -76,6 +76,9 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
   int getRetryTimes() const;
   void setRetryTimes(int times);
 
+  int getRetryTimes4Async() const;
+  void setRetryTimes4Async(int times);
+
  protected:
   SendResult sendAutoRetrySelectImpl(MQMessage& msg,
                                      MessageQueueSelector* pSelector,
@@ -100,6 +103,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public MQProducer {
   bool m_retryAnotherBrokerWhenNotStoreOK;
   int m_compressLevel;
   int m_retryTimes;
+  int m_retryTimes4Async;  
 };
 //<!***************************************************************************
 }  //<!end namespace;
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 244d674d..110601dd 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -217,7 +217,7 @@ void MQClientAPIImpl::createTopic(
 
 SendResult MQClientAPIImpl::sendMessage(
     const string& addr, const string& brokerName, const MQMessage& msg,
-    SendMessageRequestHeader* pRequestHeader, int timeoutMillis,
+    SendMessageRequestHeader* pRequestHeader, int timeoutMillis, int maxRetrySendTimes,
     int communicationMode, SendCallback* pSendCallback,
     const SessionCredentials& sessionCredentials) {
   RemotingCommand request(SEND_MESSAGE, pRequestHeader);
@@ -232,8 +232,7 @@ SendResult MQClientAPIImpl::sendMessage(
       m_pRemotingClient->invokeOneway(addr, request);
       break;
     case ComMode_ASYNC:
-      sendMessageAsync(addr, brokerName, msg, request, pSendCallback,
-                       timeoutMillis);
+      sendMessageAsync(addr, brokerName, msg, request, pSendCallback, timeoutMillis, maxRetrySendTimes, 1);
       break;
     case ComMode_SYNC:
       return sendMessageSync(addr, brokerName, msg, request, timeoutMillis);
@@ -411,13 +410,38 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
                                        const MQMessage& msg,
                                        RemotingCommand& request,
                                        SendCallback* pSendCallback,
-                                       int64 timeoutMilliseconds) {
+                                       int64 timeoutMilliseconds,
+                                       int maxRetryTimes,
+                                       int retrySendTimes) {
+  int64 begin_time = UtilAll::currentTimeMillis();
   //<!delete in future;
-  AsyncCallbackWrap* cbw =
-      new SendCallbackWrap(brokerName, msg, pSendCallback, this);
-  if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds) ==
-      false) {
-    LOG_ERROR("sendMessageAsync failed to addr:%s", addr.c_str());
+  AsyncCallbackWrap* cbw = new SendCallbackWrap(brokerName, msg, pSendCallback, this);
+
+  LOG_DEBUG("sendMessageAsync request:%s, timeout:%lld, maxRetryTimes:%d retrySendTimes:%d", request.ToString().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
+  
+  if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMilliseconds, maxRetryTimes, retrySendTimes) ==
+    false) {
+    LOG_WARN("invokeAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", 
+	  addr.c_str(), msg.getTopic().data(), timeoutMilliseconds, maxRetryTimes, retrySendTimes);
+	  //when getTcp return false, need consider retrySendTimes
+	  int retry_time = retrySendTimes + 1;
+	  int64 time_out = timeoutMilliseconds - (UtilAll::currentTimeMillis() - begin_time);
+	  while (retry_time < maxRetryTimes && time_out > 0) {
+		  begin_time = UtilAll::currentTimeMillis();
+		  if (m_pRemotingClient->invokeAsync(addr, request, cbw, time_out, maxRetryTimes, retry_time) == false) {
+		    retry_time += 1;
+		    time_out = time_out - (UtilAll::currentTimeMillis() - begin_time);
+			  LOG_WARN("invokeAsync retry failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", 
+				addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retry_time);
+			  continue;
+		  } else {
+			  return; //invokeAsync success
+		  }
+	  }
+
+    LOG_ERROR("sendMessageAsync failed to addr:%s,topic:%s, timeout:%lld, maxRetryTimes:%d, retrySendTimes:%d", 
+	  addr.c_str(), msg.getTopic().data(), time_out, maxRetryTimes, retrySendTimes);
+
     if (cbw) {
       cbw->onException();
       deleteAndZero(cbw);
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 31e61a0c..9f3ebb46 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -60,7 +60,8 @@ class MQClientAPIImpl {
   SendResult sendMessage(const string& addr, const string& brokerName,
                          const MQMessage& msg,
                          SendMessageRequestHeader* pRequestHeader,
-                         int timeoutMillis, int communicationMode,
+                         int timeoutMillis, int maxRetrySendTimes,
+                         int communicationMode,
                          SendCallback* pSendCallback,
                          const SessionCredentials& sessionCredentials);
 
@@ -161,15 +162,20 @@ class MQClientAPIImpl {
                      int timeoutMillis,
                      const SessionCredentials& sessionCredentials);
 
+  void sendMessageAsync(const string& addr, const string& brokerName,
+                        const MQMessage& msg, RemotingCommand& request,
+                        SendCallback* pSendCallback, int64 timeoutMilliseconds,
+                        int maxRetryTimes=1,
+                        int retrySendTimes=1);
  private:
   SendResult sendMessageSync(const string& addr, const string& brokerName,
                              const MQMessage& msg, RemotingCommand& request,
                              int timeoutMillis);
-
+  /*
   void sendMessageAsync(const string& addr, const string& brokerName,
                         const MQMessage& msg, RemotingCommand& request,
                         SendCallback* pSendCallback, int64 timeoutMilliseconds);
-
+  */
   PullResult* pullMessageSync(const string& addr, RemotingCommand& request,
                               int timeoutMillis);
 
diff --git a/src/common/AsyncCallbackWrap.cpp b/src/common/AsyncCallbackWrap.cpp
index a61e0ee4..42ce04e2 100755
--- a/src/common/AsyncCallbackWrap.cpp
+++ b/src/common/AsyncCallbackWrap.cpp
@@ -67,6 +67,7 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
   if (m_pAsyncCallBack == NULL) {
     return;
   }
+  int opaque = pResponseFuture->getOpaque();
   SendCallback* pCallback = static_cast<SendCallback*>(m_pAsyncCallBack);
 
   if (!pResponse) {
@@ -85,16 +86,44 @@ void SendCallbackWrap::operationComplete(ResponseFuture* pResponseFuture,
     LOG_ERROR("send failed of:%d", pResponseFuture->getOpaque());
   } else {
     try {
-      SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg,
-                                                         pResponse.get());
-      if (pCallback) pCallback->onSuccess(ret);
+        SendResult ret = m_pClientAPI->processSendResponse(m_brokerName, m_msg, pResponse.get());
+        if (pCallback) { 
+			LOG_DEBUG("operationComplete: processSendResponse success, opaque:%d, maxRetryTime:%d, retrySendTimes:%d", opaque, pResponseFuture->getMaxRetrySendTimes(), pResponseFuture->getRetrySendTimes());
+			pCallback->onSuccess(ret); 
+		}
     } catch (MQException& e) {
-      LOG_ERROR(e.what());
-      if (pCallback) {
-        MQException exception("process send response error", -1, __FILE__,
-                              __LINE__);
-        pCallback->onException(exception);
-      }
+        LOG_ERROR("operationComplete: processSendResponse exception: %s", e.what());
+
+		//broker may return exception, need consider retry send
+		int maxRetryTimes = pResponseFuture->getMaxRetrySendTimes();
+		int retryTimes = pResponseFuture->getRetrySendTimes();
+		if (pResponseFuture->getASyncFlag() && retryTimes < maxRetryTimes && maxRetryTimes > 1) {
+
+			int64 left_timeout_ms = pResponseFuture->leftTime(); 
+			string brokerAddr = pResponseFuture->getBrokerAddr();
+			const RemotingCommand& requestCommand = pResponseFuture->getRequestCommand();
+			retryTimes += 1;
+			LOG_WARN("retry send, opaque:%d, sendTimes:%d, maxRetryTimes:%d, left_timeout:%lld, brokerAddr:%s, msg:%s", 
+					opaque, retryTimes, maxRetryTimes, left_timeout_ms, brokerAddr.data(), m_msg.toString().data());
+
+			bool exception_flag = false;
+			try {
+				m_pClientAPI->sendMessageAsync(pResponseFuture->getBrokerAddr(), m_brokerName, m_msg, (RemotingCommand&)requestCommand, pCallback, left_timeout_ms, maxRetryTimes, retryTimes);
+			} catch (MQClientException& e) {
+				LOG_ERROR("retry send exception:%s, opaque:%d, retryTimes:%d, msg:%s, not retry send again", e.what(), opaque, retryTimes, m_msg.toString().data());
+				exception_flag = true;
+			}
+
+			if (exception_flag == false) {
+				return; //send retry again, here need return
+			}
+		}
+	  
+		if (pCallback) {
+			MQException exception("process send response error", -1, __FILE__,
+			                      __LINE__);
+			pCallback->onException(exception);
+		}
     }
   }
   if (pCallback && pCallback->getSendCallbackType() == autoDeleteSendCallback) {
diff --git a/src/common/AsyncCallbackWrap.h b/src/common/AsyncCallbackWrap.h
index 50968c9c..02ae07fc 100755
--- a/src/common/AsyncCallbackWrap.h
+++ b/src/common/AsyncCallbackWrap.h
@@ -21,11 +21,14 @@
 #include "AsyncCallback.h"
 #include "MQMessage.h"
 #include "UtilAll.h"
+#include "RemotingCommand.h"
 
 namespace rocketmq {
 
 class ResponseFuture;
 class MQClientAPIImpl;
+class DefaultMQProducer;
+class SendMessageRequestHeader;
 //<!***************************************************************************
 enum asyncCallBackType {
   asyncCallbackWrap = 0,
diff --git a/src/producer/DefaultMQProducer.cpp b/src/producer/DefaultMQProducer.cpp
index 1411ebdf..748ffcab 100755
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -39,7 +39,8 @@ DefaultMQProducer::DefaultMQProducer(const string& groupname)
       m_maxMessageSize(1024 * 128),
       m_retryAnotherBrokerWhenNotStoreOK(false),
       m_compressLevel(5),
-      m_retryTimes(5) {
+      m_retryTimes(5),
+      m_retryTimes4Async(1) {
   //<!set default group name;
   string gname = groupname.empty() ? DEFAULT_PRODUCER_GROUP : groupname;
   setGroupName(gname);
@@ -284,7 +285,7 @@ SendResult DefaultMQProducer::sendDefaultImpl(MQMessage& msg,
       }
 
       try {
-        LOG_DEBUG("send to brokerName:%s", mq.getBrokerName().c_str());
+        LOG_DEBUG("send to mq:%s", mq.toString().data());
         sendResult = sendKernelImpl(msg, mq, communicationMode, pSendCallback);
         switch (communicationMode) {
           case ComMode_ASYNC:
@@ -315,7 +316,8 @@ SendResult DefaultMQProducer::sendDefaultImpl(MQMessage& msg,
     }  // end of for
     LOG_WARN("Retry many times, still failed");
   }
-  THROW_MQEXCEPTION(MQClientException, "No route info of this topic, ", -1);
+  string info = "No route info of this topic: " + msg.getTopic();
+  THROW_MQEXCEPTION(MQClientException, info, -1);
 }
 
 SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
@@ -355,7 +357,7 @@ SendResult DefaultMQProducer::sendKernelImpl(MQMessage& msg,
 
       return getFactory()->getMQClientAPIImpl()->sendMessage(
           brokerAddr, mq.getBrokerName(), msg, requestHeader,
-          getSendMsgTimeout(), communicationMode, sendCallback,
+          getSendMsgTimeout(), getRetryTimes4Async(), communicationMode, sendCallback,
           getSessionCredentials());
     } catch (MQException& e) {
       throw e;
@@ -493,5 +495,27 @@ void DefaultMQProducer::setRetryTimes(int times) {
   LOG_WARN("set retry times to:%d", times);
   m_retryTimes = times;
 }
+
+int DefaultMQProducer::getRetryTimes4Async() const 
+{ 
+  return m_retryTimes4Async; 
+}
+void DefaultMQProducer::setRetryTimes4Async(int times) 
+{
+  if (times <= 0) {
+    LOG_WARN("set retry times illegal, use default value:1");
+	m_retryTimes4Async = 1;
+    return;
+  }
+
+  if (times > 15) {
+    LOG_WARN("set retry times illegal, use max value:15");
+    m_retryTimes4Async = 15;
+    return;
+  }
+  LOG_INFO("set retry times to:%d", times);
+  m_retryTimes4Async = times;
+}
+
 //<!***************************************************************************
 }  //<!end namespace;
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index 83bb1d0d..d4c67565 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -50,8 +50,39 @@ RemotingCommand::RemotingCommand(int code, string language, int version,
       m_remark(remark),
       m_pExtHeader(pExtHeader) {}
 
+RemotingCommand::RemotingCommand(const RemotingCommand& command) {
+    Assign(command);
+}
+
+RemotingCommand& RemotingCommand::operator=(const RemotingCommand& command) {
+  if (this != &command) {
+    Assign(command);
+  }
+  return *this;
+}
+
 RemotingCommand::~RemotingCommand() { m_pExtHeader = NULL; }
 
+void RemotingCommand::Assign(const RemotingCommand& command)
+{
+    m_code = command.m_code;
+    m_language = command.m_language;
+    m_version = command.m_version;
+    m_opaque = command.m_opaque;
+    m_flag = command.m_flag;
+    m_remark = command.m_remark;
+    m_msgBody = command.m_msgBody;
+    
+    for (auto& it : command.m_extFields) {
+      m_extFields[it.first] = it.second;
+    }
+    m_head = command.m_head;
+    m_body = command.m_body;
+    s_seqNumber.store(command.s_seqNumber.load());
+    m_parsedJson = command.m_parsedJson;
+    //m_pExtHeader = command.m_pExtHeader; //ignore this filed at this moment, if need please add it
+}
+
 void RemotingCommand::Encode() {
   Json::Value root;
   root["code"] = m_code;
@@ -250,4 +281,15 @@ void RemotingCommand::addExtField(const string& key, const string& value) {
   m_extFields[key] = value;
 }
 
+std::string RemotingCommand::ToString() const {
+	 std::stringstream ss;
+	 ss << "code:" << m_code
+	  <<",opaque:"<< m_opaque
+	  <<",flag:"<< m_flag
+	  <<",seqNumber:" << s_seqNumber
+	  <<",body.size:" << m_body.getSize()
+	  <<",header.size:" << m_head.getSize();
+	 return ss.str();
+}
+
 }  //<!end namespace;
diff --git a/src/protocol/RemotingCommand.h b/src/protocol/RemotingCommand.h
index 633a5113..53f20140 100755
--- a/src/protocol/RemotingCommand.h
+++ b/src/protocol/RemotingCommand.h
@@ -31,18 +31,18 @@ const int RPC_ONEWAY = 1;  // 0, RPC // 1, Oneway;
 //<!***************************************************************************
 class RemotingCommand {
  public:
+  RemotingCommand() : m_code(0) {};
   RemotingCommand(int code, CommandHeader* pCustomHeader = NULL);
   RemotingCommand(int code, string language, int version, int opaque, int flag,
                   string remark, CommandHeader* pCustomHeader);
+  RemotingCommand(const RemotingCommand& command);
+  RemotingCommand& operator=(const RemotingCommand& command);
   virtual ~RemotingCommand();
-
   const MemoryBlock* GetHead() const;
   const MemoryBlock* GetBody() const;
-
   void SetBody(const char* pData, int len);
   void setOpaque(const int opa);
   void SetExtHeader(int code);
-
   void setCode(int code);
   int getCode() const;
   int getOpaque() const;
@@ -53,19 +53,18 @@ class RemotingCommand {
   void markOnewayRPC();
   bool isOnewayRPC();
   void setParsedJson(Json::Value json);
-
   CommandHeader* getCommandHeader() const;
   const int getFlag() const;
   const int getVersion() const;
-
   void addExtField(const string& key, const string& value);
   string getMsgBody() const;
   void setMsgBody(const string& body);
-
  public:
   void Encode();
   static RemotingCommand* Decode(const MemoryBlock& mem);
-
+  std::string ToString() const;
+ private:
+ void Assign(const RemotingCommand& command);
  private:
   int m_code;
   string m_language;
diff --git a/src/transport/ResponseFuture.cpp b/src/transport/ResponseFuture.cpp
index 05cef84d..9307616c 100755
--- a/src/transport/ResponseFuture.cpp
+++ b/src/transport/ResponseFuture.cpp
@@ -31,6 +31,9 @@ ResponseFuture::ResponseFuture(int requestCode, int opaque,
   m_pCallbackWrap = pcall;
   m_pResponseCommand = NULL;
   m_sendRequestOK = false;
+  m_maxRetrySendTimes = 1;
+  m_retrySendTimes = 1;
+  m_brokerAddr = "";
   m_beginTimestamp = UtilAll::currentTimeMillis();
   m_asyncCallbackStatus = asyncCallBackStatus_init;
   if (getASyncFlag()) {
@@ -148,6 +151,10 @@ void ResponseFuture::executeInvokeCallbackException() {
     return;
   } else {
     if (m_asyncCallbackStatus == asyncCallBackStatus_timeout) {
+
+	//here no need retrySendTimes process because of it have timeout
+	LOG_ERROR("send msg, callback timeout, opaque:%d, sendTimes:%d, maxRetryTimes:%d", getOpaque(), getRetrySendTimes(), getMaxRetrySendTimes());
+
       m_pCallbackWrap->onException();
     } else {
       LOG_WARN(
@@ -164,6 +171,39 @@ bool ResponseFuture::isTimeOut() const {
   return m_bAsync.load() == 1 && diff > m_timeout;
 }
 
+int ResponseFuture::getMaxRetrySendTimes() const {
+	return m_maxRetrySendTimes;
+} 
+int ResponseFuture::getRetrySendTimes() const {
+	return m_retrySendTimes;
+}
+
+void ResponseFuture::setMaxRetrySendTimes(int maxRetryTimes) {
+	m_maxRetrySendTimes = maxRetryTimes;
+}
+void ResponseFuture::setRetrySendTimes(int retryTimes) {
+	m_retrySendTimes = retryTimes;
+}
+
+void ResponseFuture::setBrokerAddr(const std::string& brokerAddr) {
+	m_brokerAddr = brokerAddr;
+}
+void ResponseFuture::setRequestCommand(const RemotingCommand& requestCommand) {
+	m_requestCommand = requestCommand;
+}
+
+const RemotingCommand& ResponseFuture::getRequestCommand() {
+	return m_requestCommand;
+}
+std::string ResponseFuture::getBrokerAddr() const {
+	return m_brokerAddr;
+}
+
+int64 ResponseFuture::leftTime() const {
+	int64 diff = UtilAll::currentTimeMillis() - m_beginTimestamp;
+	return m_timeout - diff;
+}
+
 RemotingCommand* ResponseFuture::getCommand() const {
   return m_pResponseCommand;
 }
diff --git a/src/transport/ResponseFuture.h b/src/transport/ResponseFuture.h
index 92fa7722..6564f537 100755
--- a/src/transport/ResponseFuture.h
+++ b/src/transport/ResponseFuture.h
@@ -51,7 +51,10 @@ class ResponseFuture {
   //<!callback;
   void executeInvokeCallback();
   void executeInvokeCallbackException();
-  bool isTimeOut() const;
+  bool isTimeOut() const; 
+  int getMaxRetrySendTimes() const; 
+  int getRetrySendTimes() const;   
+  int64 leftTime() const;
   // bool    isTimeOutMoreThan30s() const;
   const bool getASyncFlag();
   void setAsyncResponseFlag();
@@ -59,7 +62,12 @@ class ResponseFuture {
   const bool getSyncResponseFlag();
   AsyncCallbackWrap* getAsyncCallbackWrap();
   void setAsyncCallBackStatus(asyncCallBackStatus asyncCallbackStatus);
-
+  void setMaxRetrySendTimes(int maxRetryTimes); 
+  void setRetrySendTimes(int retryTimes);   
+  void setBrokerAddr(const std::string& brokerAddr); 
+  void setRequestCommand(const RemotingCommand& requestCommand);
+  const RemotingCommand& getRequestCommand();
+  std::string getBrokerAddr() const;
  private:
   int m_requestCode;
   int m_opaque;
@@ -75,6 +83,11 @@ class ResponseFuture {
   asyncCallBackStatus m_asyncCallbackStatus;
   boost::atomic<bool> m_asyncResponse;
   boost::atomic<bool> m_syncResponse;
+
+  int   m_maxRetrySendTimes;
+  int   m_retrySendTimes; 
+  std::string m_brokerAddr;
+  RemotingCommand m_requestCommand;
   // TcpRemotingClient*    m_tcpRemoteClient;
 };
 //<!************************************************************************
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 5cd70eb4..551df244 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -207,7 +207,9 @@ RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
 bool TcpRemotingClient::invokeAsync(const string& addr,
                                     RemotingCommand& request,
                                     AsyncCallbackWrap* cbw,
-                                    int64 timeoutMilliseconds) {
+                                    int64 timeoutMilliseconds,
+                                    int maxRetrySendTimes,
+                                    int retrySendTimes) {
   boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
   if (pTcp != NULL) {
     //<!not delete, for callback to delete;
@@ -215,6 +217,10 @@ bool TcpRemotingClient::invokeAsync(const string& addr,
     int opaque = request.getOpaque();
     boost::shared_ptr<ResponseFuture> responseFuture(
         new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, cbw));
+	responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
+	responseFuture->setRetrySendTimes(retrySendTimes);
+	responseFuture->setBrokerAddr(addr);
+	responseFuture->setRequestCommand(request);	
     addAsyncResponseFuture(opaque, responseFuture);
     if (cbw) {
       boost::asio::deadline_timer* t = new boost::asio::deadline_timer(
@@ -577,7 +583,7 @@ void TcpRemotingClient::processResponseCommand(
     RemotingCommand* pCmd, boost::shared_ptr<ResponseFuture> pfuture) {
   int code = pfuture->getRequestCode();
   int opaque = pCmd->getOpaque();
-  LOG_DEBUG("processResponseCommand, code:%d,opaque:%d", code, opaque);
+  LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d, retrySendTimes:%d", code, opaque, pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes());
   pCmd->SetExtHeader(code);  // set head , for response use
 
   pfuture->setResponse(pCmd);
@@ -586,8 +592,8 @@ void TcpRemotingClient::processResponseCommand(
     if (!pfuture->getAsyncResponseFlag()) {
       pfuture->setAsyncResponseFlag();
       pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
-      pfuture->executeInvokeCallback();
-      cancelTimerCallback(opaque);
+	  cancelTimerCallback(opaque);
+      pfuture->executeInvokeCallback();	  
     }
   }
 }
@@ -636,9 +642,11 @@ TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
 void TcpRemotingClient::handleAsyncPullForResponseTimeout(
     const boost::system::error_code& e, int opaque) {
   if (e == boost::asio::error::operation_aborted) {
+    LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
     return;
   }
 
+  LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s", opaque, e.value(), e.message().data());
   boost::shared_ptr<ResponseFuture> pFuture(
       findAndDeleteAsyncResponseFuture(opaque));
   if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) {
@@ -686,7 +694,7 @@ void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t,
                                          int opaque) {
   boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
   if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-    // AGENT_INFO("addTimerCallback:erase timerCallback opaque:%lld", opaque);
+    LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
     boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
     old_t->cancel();
     delete old_t;
@@ -699,6 +707,7 @@ void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t,
 void TcpRemotingClient::eraseTimerCallback(int opaque) {
   boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
   if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+  	LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
     boost::asio::deadline_timer* t = m_async_timer_map[opaque];
     delete t;
     t = NULL;
@@ -709,7 +718,7 @@ void TcpRemotingClient::eraseTimerCallback(int opaque) {
 void TcpRemotingClient::cancelTimerCallback(int opaque) {
   boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
   if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-    // AGENT_INFO("cancel timerCallback opaque:%lld", opaque);
+    LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);    
     boost::asio::deadline_timer* t = m_async_timer_map[opaque];
     t->cancel();
     delete t;
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index 832b49ad..fcdd8a83 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -46,11 +46,10 @@ class TcpRemotingClient {
 
   bool invokeHeartBeat(const string& addr, RemotingCommand& request);
 
-  bool invokeAsync(const string& addr, RemotingCommand& request,
-                   AsyncCallbackWrap* cbw, int64 timeoutMilliseconds);
-
+  bool invokeAsync(const string& addr, RemotingCommand& request, AsyncCallbackWrap* cbw, 
+                   int64 timeoutMilliseconds, int maxRetrySendTimes=1, int retrySendTimes=1);
   void invokeOneway(const string& addr, RemotingCommand& request);
-
+  
   void ProcessData(const MemoryBlock& mem, const string& addr);
 
   void registerProcessor(MQRequestCode requestCode,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services