You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@rocketmq.apache.org by GitBox <gi...@apache.org> on 2019/01/11 16:08:53 UTC

[GitHub] ShannonDing closed pull request #61: [ISSUE#43] One Consumer Consume One Message Twice at the same time

ShannonDing closed pull request #61: [ISSUE#43] One Consumer Consume One Message Twice at the same time
URL: https://github.com/apache/rocketmq-client-cpp/pull/61
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 110601dd..045bfab2 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -451,6 +451,10 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
   }
 }
 
+void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
+    m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
+}
+
 PullResult* MQClientAPIImpl::pullMessage(
     const string& addr, PullMessageRequestHeader* pRequestHeader,
     int timeoutMillis, int communicationMode, PullCallback* pullCallback,
@@ -480,9 +484,21 @@ void MQClientAPIImpl::pullMessageAsync(const string& addr,
                                        PullCallback* pullCallback, void* pArg) {
   //<!delete in future;
   AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+  MQMessageQueue mq;
+  AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
+  if (pAsyncArg && pAsyncArg->pPullRequest) {
+    mq = pAsyncArg->mq;
+    pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
+    LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", 
+        pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),mq.toString().c_str());
+  }
+
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) ==
       false) {
-    LOG_ERROR("pullMessageAsync failed of addr:%s", addr.c_str());
+    LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(), mq.toString().data());
+    if (pAsyncArg && pAsyncArg->pPullRequest) {
+        pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
+    }
     deleteAndZero(cbw);
     THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
   }
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 9f3ebb46..f2ac2736 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -167,6 +167,8 @@ class MQClientAPIImpl {
                         SendCallback* pSendCallback, int64 timeoutMilliseconds,
                         int maxRetryTimes=1,
                         int retrySendTimes=1);
+  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
+
  private:
   SendResult sendMessageSync(const string& addr, const string& brokerName,
                              const MQMessage& msg, RemotingCommand& request,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index e128077d..606ccc11 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -1040,6 +1040,17 @@ void MQClientFactory::findConsumerIds(
   }
 }
 
+void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
+  //delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
+  if (!pullRequest) return;
+  MQMessageQueue mq = pullRequest->m_messageQueue;
+  int opaque = pullRequest->getLatestPullRequestOpaque();
+  if (opaque > 0) {
+      LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+      getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
+  }
+}
+
 void MQClientFactory::resetOffset(
     const string& group, const string& topic,
     const map<MQMessageQueue, int64>& offsetTable) {
@@ -1052,6 +1063,9 @@ void MQClientFactory::resetOffset(
       PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
       if (pullreq) {
         pullreq->setDroped(true);
+        LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(), mq.toString().data());
+        //delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
+        removeDropedPullRequestOpaque(pullreq);
         pullreq->clearAllMsgs();
         pullreq->updateQueueMaxOffset(it->second);
       } else {
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index 34ac2a94..7da2e211 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -113,7 +113,7 @@ class MQClientFactory {
                           map<int, string>& brokerAddrs);
   map<string, map<int, string>> getBrokerAddrMap();
   void clearBrokerAddrMap();
-
+  void removeDropedPullRequestOpaque(PullRequest* pullRequest);
  private:
   void unregisterClient(const string& producerGroup,
                         const string& consumerGroup,
diff --git a/src/common/AsyncArg.h b/src/common/AsyncArg.h
index fc358cb0..4e237436 100755
--- a/src/common/AsyncArg.h
+++ b/src/common/AsyncArg.h
@@ -21,7 +21,7 @@
 #include "MQMessageQueue.h"
 #include "PullAPIWrapper.h"
 #include "SubscriptionData.h"
-
+#include "../consumer/PullRequest.h"
 namespace rocketmq {
 //<!***************************************************************************
 
@@ -29,6 +29,7 @@ struct AsyncArg {
   MQMessageQueue mq;
   SubscriptionData subData;
   PullAPIWrapper* pPullWrapper;
+  PullRequest* pPullRequest;
 };
 
 //<!***************************************************************************
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 073a8017..bfc4e9d1 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -263,6 +263,7 @@ void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
   arg.mq = mq;
   arg.subData = *pSData;
   arg.pPullWrapper = m_pPullAPIWrapper;
+  arg.pPullRequest = NULL;
 
   try {
     unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 2e78ebcd..11e16edc 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -840,7 +840,7 @@ namespace rocketmq {
         arg.mq = messageQueue;
         arg.subData = *pSdata;
         arg.pPullWrapper = m_pPullAPIWrapper;
-
+        arg.pPullRequest = request;
         try {
             request->setLastPullTimestamp(UtilAll::currentTimeMillis());
             m_pPullAPIWrapper->pullKernelImpl(
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index bb578f87..8510e430 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -28,7 +28,8 @@ PullRequest::PullRequest(const string& groupname)
       m_queueOffsetMax(0),
       m_bDroped(false),
       m_bLocked(false),
-      m_bPullMsgEventInprogress(false) {}
+      m_bPullMsgEventInprogress(false),
+      m_latestPullRequestOpaque(0) {}
 
 PullRequest::~PullRequest() {
   m_msgTreeMapTemp.clear();
@@ -45,6 +46,7 @@ PullRequest& PullRequest::operator=(const PullRequest& other) {
     m_messageQueue = other.m_messageQueue;
     m_msgTreeMap = other.m_msgTreeMap;
     m_msgTreeMapTemp = other.m_msgTreeMapTemp;
+    m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;
   }
   return *this;
 }
@@ -260,5 +262,15 @@ bool PullRequest::addPullMsgEvent() {
   return false;
 }
 
+int PullRequest::getLatestPullRequestOpaque() const {
+    boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+    return m_latestPullRequestOpaque;
+}
+
+void PullRequest::setLatestPullRequestOpaque(int opaque) {
+    boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+    m_latestPullRequestOpaque = opaque;
+}
+
 //<!***************************************************************************
 }  //<!end namespace;
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index f7abfaf8..80c286ae 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -69,6 +69,8 @@ class PullRequest {
   boost::timed_mutex& getPullRequestCriticalSection();
   void removePullMsgEvent();
   bool addPullMsgEvent();
+  int getLatestPullRequestOpaque() const;
+  void setLatestPullRequestOpaque(int opaque);
 
  public:
   MQMessageQueue m_messageQueue;
@@ -88,6 +90,7 @@ class PullRequest {
   //uint64 m_tryUnlockTimes;
   uint64 m_lastPullTimestamp;
   uint64 m_lastConsumeTimestamp;
+  int    m_latestPullRequestOpaque;
   boost::timed_mutex m_consumeLock;
   boost::atomic<bool> m_bPullMsgEventInprogress;
 };
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 4a356737..f23b727c 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -471,11 +471,13 @@ bool RebalancePush::updateRequestTableInRebalance(
           (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
         if (!(it->second->isDroped())) {
           it->second->setDroped(true);
+          //delete the lastest pull request for this mq, which hasn't been response
+          m_pClientFactory->removeDropedPullRequestOpaque(it->second);
           removeUnnecessaryMessageQueue(mqtemp);
           it->second->clearAllMsgs();  // add clear operation to avoid bad state
                                        // when dropped pullRequest returns
                                        // normal
-          LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
+          LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
         }
         changed = true;
       }
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 551df244..603c17ff 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -592,7 +592,7 @@ void TcpRemotingClient::processResponseCommand(
     if (!pfuture->getAsyncResponseFlag()) {
       pfuture->setAsyncResponseFlag();
       pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
-	  cancelTimerCallback(opaque);
+      cancelTimerCallback(opaque);
       pfuture->executeInvokeCallback();	  
     }
   }
@@ -707,7 +707,7 @@ void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t,
 void TcpRemotingClient::eraseTimerCallback(int opaque) {
   boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
   if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-  	LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
+    LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
     boost::asio::deadline_timer* t = m_async_timer_map[opaque];
     delete t;
     t = NULL;
@@ -739,5 +739,20 @@ void TcpRemotingClient::removeAllTimerCallback() {
   m_async_timer_map.clear();
 }
 
+void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
+  //delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it later
+  boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+  if (!pFuture) {
+    pFuture = findAndDeleteResponseFuture(opaque);
+    if (pFuture) {
+      LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+    }
+  } else {
+    LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); 
+  }
+  //delete the timeout timer for opaque for pullrequest
+  cancelTimerCallback(opaque);
+}
+
 //<!************************************************************************
 }  //<!end namespace;
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index fcdd8a83..c338d535 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -58,6 +58,7 @@ class TcpRemotingClient {
   void boost_asio_work();
   void handleAsyncPullForResponseTimeout(const boost::system::error_code& e,
                                          int opaque);
+  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
 
  private:
   static void static_messageReceived(void* context, const MemoryBlock& mem,


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services