You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by if...@apache.org on 2020/07/24 21:58:58 UTC
[rocketmq-client-cpp] branch master updated: fix: orderly pusher
consumer may commit queue offset too early
This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new ecfd7d3 fix: orderly pusher consumer may commit queue offset too early
ecfd7d3 is described below
commit ecfd7d3a598a54288817c038cb13f25c26bd370f
Author: fluyu <gi...@fluyu.com>
AuthorDate: Fri Jul 24 19:51:16 2020 +0800
fix: orderly pusher consumer may commit queue offset too early
---
src/consumer/DefaultMQPushConsumerImpl.cpp | 16 ++++------------
src/consumer/PullRequest.cpp | 2 +-
2 files changed, 5 insertions(+), 13 deletions(-)
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 1aa0b76..4a79595 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -96,9 +96,7 @@ class AsyncPullCallback : public PullCallback {
}
pullRequest->setNextOffset(result.nextBeginOffset);
- vector<MQMessageExt> msgs;
- pullRequest->getMessage(msgs);
- if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+ if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
}
if (bProducePullRequest) {
@@ -118,9 +116,7 @@ class AsyncPullCallback : public PullCallback {
}
pullRequest->setNextOffset(result.nextBeginOffset);
- vector<MQMessageExt> msgs;
- pullRequest->getMessage(msgs);
- if ((msgs.size() == 0) && (result.nextBeginOffset > 0)) {
+ if ((pullRequest->getCacheMsgCount() == 0) && (result.nextBeginOffset > 0)) {
m_callbackOwner->updateConsumeOffset(pullRequest->m_messageQueue, result.nextBeginOffset);
}
if (bProducePullRequest) {
@@ -740,9 +736,7 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
break;
}
request->setNextOffset(pullResult.nextBeginOffset);
- vector<MQMessageExt> msgs;
- request->getMessage(msgs);
- if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+ if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
@@ -756,9 +750,7 @@ void DefaultMQPushConsumerImpl::pullMessage(boost::weak_ptr<PullRequest> pullReq
break;
}
request->setNextOffset(pullResult.nextBeginOffset);
- vector<MQMessageExt> msgs;
- request->getMessage(msgs);
- if ((msgs.size() == 0) && (pullResult.nextBeginOffset > 0)) {
+ if ((request->getCacheMsgCount() == 0) && (pullResult.nextBeginOffset > 0)) {
updateConsumeOffset(messageQueue, pullResult.nextBeginOffset);
}
producePullMsgTask(request);
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index 9795880..d86ee7f 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -92,7 +92,7 @@ int64 PullRequest::getCacheMaxOffset() {
int PullRequest::getCacheMsgCount() {
boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
- return m_msgTreeMap.size();
+ return m_msgTreeMap.size() + m_msgTreeMapTemp.size();
}
void PullRequest::getMessageByQueueOffset(vector<MQMessageExt>& msgs, int64 minQueueOffset, int64 maxQueueOffset) {