You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/01/11 16:08:55 UTC

[rocketmq-client-cpp] branch master updated: [ISSUE#43] One Consumer Consume One Message Twice at the same time (#61)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 938e773  [ISSUE#43] One Consumer Consume One Message Twice at the same time (#61)
938e773 is described below

commit 938e773b133c2eb4a4d920160f378d7189541969
Author: Jonnxu <jo...@163.com>
AuthorDate: Sat Jan 12 00:08:50 2019 +0800

    [ISSUE#43] One Consumer Consume One Message Twice at the same time (#61)
    
    Resolve one Consumer consume one message twice at the same time
---
 src/MQClientAPIImpl.cpp                | 18 +++++++++++++++++-
 src/MQClientAPIImpl.h                  |  2 ++
 src/MQClientFactory.cpp                | 14 ++++++++++++++
 src/MQClientFactory.h                  |  2 +-
 src/common/AsyncArg.h                  |  3 ++-
 src/consumer/DefaultMQPullConsumer.cpp |  1 +
 src/consumer/DefaultMQPushConsumer.cpp |  2 +-
 src/consumer/PullRequest.cpp           | 14 +++++++++++++-
 src/consumer/PullRequest.h             |  3 +++
 src/consumer/Rebalance.cpp             |  4 +++-
 src/transport/TcpRemotingClient.cpp    | 19 +++++++++++++++++--
 src/transport/TcpRemotingClient.h      |  1 +
 12 files changed, 75 insertions(+), 8 deletions(-)

diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 110601d..045bfab 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -451,6 +451,10 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
   }
 }
 
+void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
+    m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
+}
+
 PullResult* MQClientAPIImpl::pullMessage(
     const string& addr, PullMessageRequestHeader* pRequestHeader,
     int timeoutMillis, int communicationMode, PullCallback* pullCallback,
@@ -480,9 +484,21 @@ void MQClientAPIImpl::pullMessageAsync(const string& addr,
                                        PullCallback* pullCallback, void* pArg) {
   //<!delete in future;
   AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+  MQMessageQueue mq;
+  AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
+  if (pAsyncArg && pAsyncArg->pPullRequest) {
+    mq = pAsyncArg->mq;
+    pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
+    LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s", 
+        pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),mq.toString().c_str());
+  }
+
   if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) ==
       false) {
-    LOG_ERROR("pullMessageAsync failed of addr:%s", addr.c_str());
+    LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(), mq.toString().data());
+    if (pAsyncArg && pAsyncArg->pPullRequest) {
+        pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
+    }
     deleteAndZero(cbw);
     THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
   }
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 9f3ebb4..f2ac273 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -167,6 +167,8 @@ class MQClientAPIImpl {
                         SendCallback* pSendCallback, int64 timeoutMilliseconds,
                         int maxRetryTimes=1,
                         int retrySendTimes=1);
+  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
+
  private:
   SendResult sendMessageSync(const string& addr, const string& brokerName,
                              const MQMessage& msg, RemotingCommand& request,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index e128077..606ccc1 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -1040,6 +1040,17 @@ void MQClientFactory::findConsumerIds(
   }
 }
 
+void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
+  //delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
+  if (!pullRequest) return;
+  MQMessageQueue mq = pullRequest->m_messageQueue;
+  int opaque = pullRequest->getLatestPullRequestOpaque();
+  if (opaque > 0) {
+      LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+      getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
+  }
+}
+
 void MQClientFactory::resetOffset(
     const string& group, const string& topic,
     const map<MQMessageQueue, int64>& offsetTable) {
@@ -1052,6 +1063,9 @@ void MQClientFactory::resetOffset(
       PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
       if (pullreq) {
         pullreq->setDroped(true);
+        LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(), mq.toString().data());
+        //delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
+        removeDropedPullRequestOpaque(pullreq);
         pullreq->clearAllMsgs();
         pullreq->updateQueueMaxOffset(it->second);
       } else {
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index 34ac2a9..7da2e21 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -113,7 +113,7 @@ class MQClientFactory {
                           map<int, string>& brokerAddrs);
   map<string, map<int, string>> getBrokerAddrMap();
   void clearBrokerAddrMap();
-
+  void removeDropedPullRequestOpaque(PullRequest* pullRequest);
  private:
   void unregisterClient(const string& producerGroup,
                         const string& consumerGroup,
diff --git a/src/common/AsyncArg.h b/src/common/AsyncArg.h
index fc358cb..4e23743 100755
--- a/src/common/AsyncArg.h
+++ b/src/common/AsyncArg.h
@@ -21,7 +21,7 @@
 #include "MQMessageQueue.h"
 #include "PullAPIWrapper.h"
 #include "SubscriptionData.h"
-
+#include "../consumer/PullRequest.h"
 namespace rocketmq {
 //<!***************************************************************************
 
@@ -29,6 +29,7 @@ struct AsyncArg {
   MQMessageQueue mq;
   SubscriptionData subData;
   PullAPIWrapper* pPullWrapper;
+  PullRequest* pPullRequest;
 };
 
 //<!***************************************************************************
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 073a801..bfc4e9d 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -263,6 +263,7 @@ void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
   arg.mq = mq;
   arg.subData = *pSData;
   arg.pPullWrapper = m_pPullAPIWrapper;
+  arg.pPullRequest = NULL;
 
   try {
     unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 2e78ebc..11e16ed 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -840,7 +840,7 @@ namespace rocketmq {
         arg.mq = messageQueue;
         arg.subData = *pSdata;
         arg.pPullWrapper = m_pPullAPIWrapper;
-
+        arg.pPullRequest = request;
         try {
             request->setLastPullTimestamp(UtilAll::currentTimeMillis());
             m_pPullAPIWrapper->pullKernelImpl(
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index bb578f8..8510e43 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -28,7 +28,8 @@ PullRequest::PullRequest(const string& groupname)
       m_queueOffsetMax(0),
       m_bDroped(false),
       m_bLocked(false),
-      m_bPullMsgEventInprogress(false) {}
+      m_bPullMsgEventInprogress(false),
+      m_latestPullRequestOpaque(0) {}
 
 PullRequest::~PullRequest() {
   m_msgTreeMapTemp.clear();
@@ -45,6 +46,7 @@ PullRequest& PullRequest::operator=(const PullRequest& other) {
     m_messageQueue = other.m_messageQueue;
     m_msgTreeMap = other.m_msgTreeMap;
     m_msgTreeMapTemp = other.m_msgTreeMapTemp;
+    m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;
   }
   return *this;
 }
@@ -260,5 +262,15 @@ bool PullRequest::addPullMsgEvent() {
   return false;
 }
 
+int PullRequest::getLatestPullRequestOpaque() const {
+    boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+    return m_latestPullRequestOpaque;
+}
+
+void PullRequest::setLatestPullRequestOpaque(int opaque) {
+    boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+    m_latestPullRequestOpaque = opaque;
+}
+
 //<!***************************************************************************
 }  //<!end namespace;
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index f7abfaf..80c286a 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -69,6 +69,8 @@ class PullRequest {
   boost::timed_mutex& getPullRequestCriticalSection();
   void removePullMsgEvent();
   bool addPullMsgEvent();
+  int getLatestPullRequestOpaque() const;
+  void setLatestPullRequestOpaque(int opaque);
 
  public:
   MQMessageQueue m_messageQueue;
@@ -88,6 +90,7 @@ class PullRequest {
   //uint64 m_tryUnlockTimes;
   uint64 m_lastPullTimestamp;
   uint64 m_lastConsumeTimestamp;
+  int    m_latestPullRequestOpaque;
   boost::timed_mutex m_consumeLock;
   boost::atomic<bool> m_bPullMsgEventInprogress;
 };
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 4a35673..f23b727 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -471,11 +471,13 @@ bool RebalancePush::updateRequestTableInRebalance(
           (find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
         if (!(it->second->isDroped())) {
           it->second->setDroped(true);
+          //delete the lastest pull request for this mq, which hasn't been response
+          m_pClientFactory->removeDropedPullRequestOpaque(it->second);
           removeUnnecessaryMessageQueue(mqtemp);
           it->second->clearAllMsgs();  // add clear operation to avoid bad state
                                        // when dropped pullRequest returns
                                        // normal
-          LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
+          LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
         }
         changed = true;
       }
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 551df24..603c17f 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -592,7 +592,7 @@ void TcpRemotingClient::processResponseCommand(
     if (!pfuture->getAsyncResponseFlag()) {
       pfuture->setAsyncResponseFlag();
       pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
-	  cancelTimerCallback(opaque);
+      cancelTimerCallback(opaque);
       pfuture->executeInvokeCallback();	  
     }
   }
@@ -707,7 +707,7 @@ void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t,
 void TcpRemotingClient::eraseTimerCallback(int opaque) {
   boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
   if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-  	LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
+    LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
     boost::asio::deadline_timer* t = m_async_timer_map[opaque];
     delete t;
     t = NULL;
@@ -739,5 +739,20 @@ void TcpRemotingClient::removeAllTimerCallback() {
   m_async_timer_map.clear();
 }
 
+void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
+  //delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it later
+  boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+  if (!pFuture) {
+    pFuture = findAndDeleteResponseFuture(opaque);
+    if (pFuture) {
+      LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+    }
+  } else {
+    LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data()); 
+  }
+  //delete the timeout timer for opaque for pullrequest
+  cancelTimerCallback(opaque);
+}
+
 //<!************************************************************************
 }  //<!end namespace;
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index fcdd8a8..c338d53 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -58,6 +58,7 @@ class TcpRemotingClient {
   void boost_asio_work();
   void handleAsyncPullForResponseTimeout(const boost::system::error_code& e,
                                          int opaque);
+  void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
 
  private:
   static void static_messageReceived(void* context, const MemoryBlock& mem,