You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/04/17 02:23:55 UTC
[rocketmq-client-cpp] branch master updated: [ISSUE #293] reset
pull call back by new pull request event time
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 5629226 [ISSUE #293] reset pull call back by new pull request event time
5629226 is described below
commit 5629226997fea8bc6ac91e9481d2140d983106a7
Author: dinglei <li...@163.com>
AuthorDate: Fri Apr 17 10:23:47 2020 +0800
[ISSUE #293] reset pull call back by new pull request event time
[ISSUE #293] reset pull call back by new pull request event time
---
src/consumer/DefaultMQPushConsumerImpl.cpp | 3 ++-
src/consumer/PullRequest.cpp | 3 ++-
src/consumer/Rebalance.cpp | 1 +
3 files changed, 5 insertions(+), 2 deletions(-)
diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 12fddd3..39b3ccd 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -790,7 +790,8 @@ AsyncPullCallback* DefaultMQPushConsumerImpl::getAsyncPullCallBack(boost::weak_p
m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
}
AsyncPullCallback* asyncPullCallback = m_PullCallback[msgQueue];
- if (asyncPullCallback && asyncPullCallback->getPullRequest().expired()) {
+ if (asyncPullCallback) {
+ // maybe the pull request has dropped before, replace event time.
asyncPullCallback->setPullRequest(pullRequest);
}
return asyncPullCallback;
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index e578840..9795880 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -208,7 +208,8 @@ bool PullRequest::isPullRequestExpired() const {
uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME;
if (interval <= UtilAll::currentTimeMillis()) {
LOG_WARN("PullRequest for [%s] has been expired %lld ms,m_lastPullTimestamp = %lld ms",
- m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - interval, m_lastPullTimestamp);
+ m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - m_lastPullTimestamp,
+ m_lastPullTimestamp);
return true;
}
return false;
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 6dbb35c..18c8b2f 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -522,6 +522,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQ
int64 nextOffset = computePullFromWhere(*itAdd);
if (nextOffset >= 0) {
pullRequest->setNextOffset(nextOffset);
+ pullRequest->setDropped(false);
changed = true;
addPullRequest(*itAdd, pullRequest);
pullRequestsToAdd.push_back(pullRequest);