You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/07/24 21:58:58 UTC

[rocketmq-client-cpp] branch master updated: fix: orderly pusher consumer may commit queue offset too early

This is an automated email from the ASF dual-hosted git repository.

ifplusor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new ecfd7d3  fix: orderly pusher consumer may commit queue offset too early
ecfd7d3 is described below

commit ecfd7d3a598a54288817c038cb13f25c26bd370f
Author: fluyu <gi...@fluyu.com>
AuthorDate: Fri Jul 24 19:51:16 2020 +0800

    fix: orderly pusher consumer may commit queue offset too early
---
 src/consumer/DefaultMQPushConsumerImpl.cpp | 16 ++++------------
 src/consumer/PullRequest.cpp               |  2 +-
 2 files changed, 5 insertions(+), 13 deletions(-)

diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 1aa0b76..4a79595 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -96,9 +96,7 @@ class AsyncPullCallback : public PullCallback {
         }
         pullRequest->setNextOffset(result.nextBeginOffset);
 
-        vector<MQMessageExt> msgs;
-        pullRequest->getMessage(msgs);
-        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+        if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
           m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
         }
         if (bProducePullRequest) {
@@ -118,9 +116,7 @@ class AsyncPullCallback : public PullCallback {
         }
         pullRequest->setNextOffset(result.nextBeginOffset);
 
-        vector<MQMessageExt> msgs;
-        pullRequest->getMessage(msgs);
-        if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+        if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
           m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
         }
         if (bProducePullRequest) {
@@ -740,9 +736,7 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
           break;
         }
         request->setNextOffset(pullResult.nextBeginOffset);
-        vector<MQMessageExt> msgs;
-        request->getMessage(msgs);
-        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+        if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
           updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
         }
         producePullMsgTask(request);
@@ -756,9 +750,7 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
           break;
         }
         request->setNextOffset(pullResult.nextBeginOffset);
-        vector<MQMessageExt> msgs;
-        request->getMessage(msgs);
-        if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+        if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
           updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
         }
         producePullMsgTask(request);
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index 9795880..d86ee7f 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -92,7 +92,7 @@ int64 PullRequest::getCacheMaxOffset() {
 
 int PullRequest::getCacheMsgCount() {
   boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
-  return m_msgTreeMap.size();
+  return m_msgTreeMap.size() + m_msgTreeMapTemp.size();
 }
 
 void PullRequest::getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset, int64 maxQueueOffset) {