You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2019/01/11 16:08:55 UTC
[rocketmq-client-cpp] branch master updated: [ISSUE#43] One
Consumer Consume One Message Twice at the same time (#61)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 938e773 [ISSUE#43] One Consumer Consume One Message Twice at the same time (#61)
938e773 is described below
commit 938e773b133c2eb4a4d920160f378d7189541969
Author: Jonnxu <jo...@163.com>
AuthorDate: Sat Jan 12 00:08:50 2019 +0800
[ISSUE#43] One Consumer Consume One Message Twice at the same time (#61)
Resolve one Consumer consume one message twice at the same time
---
src/MQClientAPIImpl.cpp | 18 +++++++++++++++++-
src/MQClientAPIImpl.h | 2 ++
src/MQClientFactory.cpp | 14 ++++++++++++++
src/MQClientFactory.h | 2 +-
src/common/AsyncArg.h | 3 ++-
src/consumer/DefaultMQPullConsumer.cpp | 1 +
src/consumer/DefaultMQPushConsumer.cpp | 2 +-
src/consumer/PullRequest.cpp | 14 +++++++++++++-
src/consumer/PullRequest.h | 3 +++
src/consumer/Rebalance.cpp | 4 +++-
src/transport/TcpRemotingClient.cpp | 19 +++++++++++++++++--
src/transport/TcpRemotingClient.h | 1 +
12 files changed, 75 insertions(+), 8 deletions(-)
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 110601d..045bfab 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -451,6 +451,10 @@ void MQClientAPIImpl::sendMessageAsync(const string& addr,
}
}
+void MQClientAPIImpl::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
+ m_pRemotingClient->deleteOpaqueForDropPullRequest(mq, opaque);
+}
+
PullResult* MQClientAPIImpl::pullMessage(
const string& addr, PullMessageRequestHeader* pRequestHeader,
int timeoutMillis, int communicationMode, PullCallback* pullCallback,
@@ -480,9 +484,21 @@ void MQClientAPIImpl::pullMessageAsync(const string& addr,
PullCallback* pullCallback, void* pArg) {
//<!delete in future;
AsyncCallbackWrap* cbw = new PullCallbackWarp(pullCallback, this, pArg);
+ MQMessageQueue mq;
+ AsyncArg* pAsyncArg = static_cast<AsyncArg*>(pArg);
+ if (pAsyncArg && pAsyncArg->pPullRequest) {
+ mq = pAsyncArg->mq;
+ pAsyncArg->pPullRequest->setLatestPullRequestOpaque(request.getOpaque());
+ LOG_DEBUG("pullMessageAsync set opaque:%d, mq:%s",
+ pAsyncArg->pPullRequest->getLatestPullRequestOpaque(),mq.toString().c_str());
+ }
+
if (m_pRemotingClient->invokeAsync(addr, request, cbw, timeoutMillis) ==
false) {
- LOG_ERROR("pullMessageAsync failed of addr:%s", addr.c_str());
+ LOG_ERROR("pullMessageAsync failed of addr:%s, opaque:%d, mq:%s", addr.c_str(), request.getOpaque(), mq.toString().data());
+ if (pAsyncArg && pAsyncArg->pPullRequest) {
+ pAsyncArg->pPullRequest->setLatestPullRequestOpaque(0);
+ }
deleteAndZero(cbw);
THROW_MQEXCEPTION(MQClientException, "pullMessageAsync failed", -1);
}
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 9f3ebb4..f2ac273 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -167,6 +167,8 @@ class MQClientAPIImpl {
SendCallback* pSendCallback, int64 timeoutMilliseconds,
int maxRetryTimes=1,
int retrySendTimes=1);
+ void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
+
private:
SendResult sendMessageSync(const string& addr, const string& brokerName,
const MQMessage& msg, RemotingCommand& request,
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index e128077..606ccc1 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -1040,6 +1040,17 @@ void MQClientFactory::findConsumerIds(
}
}
+void MQClientFactory::removeDropedPullRequestOpaque(PullRequest* pullRequest) {
+ //delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
+ if (!pullRequest) return;
+ MQMessageQueue mq = pullRequest->m_messageQueue;
+ int opaque = pullRequest->getLatestPullRequestOpaque();
+ if (opaque > 0) {
+ LOG_INFO("####### need delete the pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+ getMQClientAPIImpl()->deleteOpaqueForDropPullRequest(mq, opaque);
+ }
+}
+
void MQClientFactory::resetOffset(
const string& group, const string& topic,
const map<MQMessageQueue, int64>& offsetTable) {
@@ -1052,6 +1063,9 @@ void MQClientFactory::resetOffset(
PullRequest* pullreq = pConsumer->getRebalance()->getPullRequest(mq);
if (pullreq) {
pullreq->setDroped(true);
+ LOG_INFO("resetOffset setDroped for opaque:%d, mq:%s", pullreq->getLatestPullRequestOpaque(), mq.toString().data());
+ //delete the opaque record that's ignore the response of this pullrequest when drop pullrequest
+ removeDropedPullRequestOpaque(pullreq);
pullreq->clearAllMsgs();
pullreq->updateQueueMaxOffset(it->second);
} else {
diff --git a/src/MQClientFactory.h b/src/MQClientFactory.h
index 34ac2a9..7da2e21 100644
--- a/src/MQClientFactory.h
+++ b/src/MQClientFactory.h
@@ -113,7 +113,7 @@ class MQClientFactory {
map<int, string>& brokerAddrs);
map<string, map<int, string>> getBrokerAddrMap();
void clearBrokerAddrMap();
-
+ void removeDropedPullRequestOpaque(PullRequest* pullRequest);
private:
void unregisterClient(const string& producerGroup,
const string& consumerGroup,
diff --git a/src/common/AsyncArg.h b/src/common/AsyncArg.h
index fc358cb..4e23743 100755
--- a/src/common/AsyncArg.h
+++ b/src/common/AsyncArg.h
@@ -21,7 +21,7 @@
#include "MQMessageQueue.h"
#include "PullAPIWrapper.h"
#include "SubscriptionData.h"
-
+#include "../consumer/PullRequest.h"
namespace rocketmq {
//<!***************************************************************************
@@ -29,6 +29,7 @@ struct AsyncArg {
MQMessageQueue mq;
SubscriptionData subData;
PullAPIWrapper* pPullWrapper;
+ PullRequest* pPullRequest;
};
//<!***************************************************************************
diff --git a/src/consumer/DefaultMQPullConsumer.cpp b/src/consumer/DefaultMQPullConsumer.cpp
index 073a801..bfc4e9d 100755
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -263,6 +263,7 @@ void DefaultMQPullConsumer::pullAsyncImpl(const MQMessageQueue& mq,
arg.mq = mq;
arg.subData = *pSData;
arg.pPullWrapper = m_pPullAPIWrapper;
+ arg.pPullRequest = NULL;
try {
unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(
diff --git a/src/consumer/DefaultMQPushConsumer.cpp b/src/consumer/DefaultMQPushConsumer.cpp
index 2e78ebc..11e16ed 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -840,7 +840,7 @@ namespace rocketmq {
arg.mq = messageQueue;
arg.subData = *pSdata;
arg.pPullWrapper = m_pPullAPIWrapper;
-
+ arg.pPullRequest = request;
try {
request->setLastPullTimestamp(UtilAll::currentTimeMillis());
m_pPullAPIWrapper->pullKernelImpl(
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index bb578f8..8510e43 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -28,7 +28,8 @@ PullRequest::PullRequest(const string& groupname)
m_queueOffsetMax(0),
m_bDroped(false),
m_bLocked(false),
- m_bPullMsgEventInprogress(false) {}
+ m_bPullMsgEventInprogress(false),
+ m_latestPullRequestOpaque(0) {}
PullRequest::~PullRequest() {
m_msgTreeMapTemp.clear();
@@ -45,6 +46,7 @@ PullRequest& PullRequest::operator=(const PullRequest& other) {
m_messageQueue = other.m_messageQueue;
m_msgTreeMap = other.m_msgTreeMap;
m_msgTreeMapTemp = other.m_msgTreeMapTemp;
+ m_latestPullRequestOpaque = other.m_latestPullRequestOpaque;
}
return *this;
}
@@ -260,5 +262,15 @@ bool PullRequest::addPullMsgEvent() {
return false;
}
+int PullRequest::getLatestPullRequestOpaque() const {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ return m_latestPullRequestOpaque;
+}
+
+void PullRequest::setLatestPullRequestOpaque(int opaque) {
+ boost::lock_guard<boost::mutex> lock(m_pullRequestLock);
+ m_latestPullRequestOpaque = opaque;
+}
+
//<!***************************************************************************
} //<!end namespace;
diff --git a/src/consumer/PullRequest.h b/src/consumer/PullRequest.h
index f7abfaf..80c286a 100644
--- a/src/consumer/PullRequest.h
+++ b/src/consumer/PullRequest.h
@@ -69,6 +69,8 @@ class PullRequest {
boost::timed_mutex& getPullRequestCriticalSection();
void removePullMsgEvent();
bool addPullMsgEvent();
+ int getLatestPullRequestOpaque() const;
+ void setLatestPullRequestOpaque(int opaque);
public:
MQMessageQueue m_messageQueue;
@@ -88,6 +90,7 @@ class PullRequest {
//uint64 m_tryUnlockTimes;
uint64 m_lastPullTimestamp;
uint64 m_lastConsumeTimestamp;
+ int m_latestPullRequestOpaque;
boost::timed_mutex m_consumeLock;
boost::atomic<bool> m_bPullMsgEventInprogress;
};
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 4a35673..f23b727 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -471,11 +471,13 @@ bool RebalancePush::updateRequestTableInRebalance(
(find(mqsSelf.begin(), mqsSelf.end(), mqtemp) == mqsSelf.end())) {
if (!(it->second->isDroped())) {
it->second->setDroped(true);
+ //delete the lastest pull request for this mq, which hasn't been response
+ m_pClientFactory->removeDropedPullRequestOpaque(it->second);
removeUnnecessaryMessageQueue(mqtemp);
it->second->clearAllMsgs(); // add clear operation to avoid bad state
// when dropped pullRequest returns
// normal
- LOG_INFO("drop mq:%s", mqtemp.toString().c_str());
+ LOG_INFO("drop mq:%s, delete opaque:%d", mqtemp.toString().c_str(), it->second->getLatestPullRequestOpaque());
}
changed = true;
}
diff --git a/src/transport/TcpRemotingClient.cpp b/src/transport/TcpRemotingClient.cpp
index 551df24..603c17f 100755
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -592,7 +592,7 @@ void TcpRemotingClient::processResponseCommand(
if (!pfuture->getAsyncResponseFlag()) {
pfuture->setAsyncResponseFlag();
pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
- cancelTimerCallback(opaque);
+ cancelTimerCallback(opaque);
pfuture->executeInvokeCallback();
}
}
@@ -707,7 +707,7 @@ void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t,
void TcpRemotingClient::eraseTimerCallback(int opaque) {
boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
- LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
+ LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
boost::asio::deadline_timer* t = m_async_timer_map[opaque];
delete t;
t = NULL;
@@ -739,5 +739,20 @@ void TcpRemotingClient::removeAllTimerCallback() {
m_async_timer_map.clear();
}
+void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque) {
+ //delete the map record of opaque<->ResponseFuture, so the answer for the pull request will discard when receive it later
+ boost::shared_ptr<ResponseFuture> pFuture(findAndDeleteAsyncResponseFuture(opaque));
+ if (!pFuture) {
+ pFuture = findAndDeleteResponseFuture(opaque);
+ if (pFuture) {
+ LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+ }
+ } else {
+ LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", opaque, mq.toString().data());
+ }
+ //delete the timeout timer for opaque for pullrequest
+ cancelTimerCallback(opaque);
+}
+
//<!************************************************************************
} //<!end namespace;
diff --git a/src/transport/TcpRemotingClient.h b/src/transport/TcpRemotingClient.h
index fcdd8a8..c338d53 100755
--- a/src/transport/TcpRemotingClient.h
+++ b/src/transport/TcpRemotingClient.h
@@ -58,6 +58,7 @@ class TcpRemotingClient {
void boost_asio_work();
void handleAsyncPullForResponseTimeout(const boost::system::error_code& e,
int opaque);
+ void deleteOpaqueForDropPullRequest(const MQMessageQueue& mq, int opaque);
private:
static void static_messageReceived(void* context, const MemoryBlock& mem,