You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/04/17 02:23:55 UTC

[rocketmq-client-cpp] branch master updated: [ISSUE #293] reset pull call back by new pull request event time

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 5629226  [ISSUE #293] reset pull call back by new pull request event time
5629226 is described below

commit 5629226997fea8bc6ac91e9481d2140d983106a7
Author: dinglei <li...@163.com>
AuthorDate: Fri Apr 17 10:23:47 2020 +0800

    [ISSUE #293] reset pull call back by new pull request event time
    
    [ISSUE #293] reset pull call back by new pull request event time
---
 src/consumer/DefaultMQPushConsumerImpl.cpp | 3 ++-
 src/consumer/PullRequest.cpp               | 3 ++-
 src/consumer/Rebalance.cpp                 | 1 +
 3 files changed, 5 insertions(+), 2 deletions(-)

diff --git a/src/consumer/DefaultMQPushConsumerImpl.cpp b/src/consumer/DefaultMQPushConsumerImpl.cpp
index 12fddd3..39b3ccd 100644
--- a/src/consumer/DefaultMQPushConsumerImpl.cpp
+++ b/src/consumer/DefaultMQPushConsumerImpl.cpp
@@ -790,7 +790,8 @@ AsyncPullCallback* DefaultMQPushConsumerImpl::getAsyncPullCallBack(boost::weak_p
       m_PullCallback[msgQueue] = new AsyncPullCallback(this, request);
     }
     AsyncPullCallback* asyncPullCallback = m_PullCallback[msgQueue];
-    if (asyncPullCallback && asyncPullCallback->getPullRequest().expired()) {
+    if (asyncPullCallback) {
+      // maybe the pull request has dropped before, replace event time.
       asyncPullCallback->setPullRequest(pullRequest);
     }
     return asyncPullCallback;
diff --git a/src/consumer/PullRequest.cpp b/src/consumer/PullRequest.cpp
index e578840..9795880 100644
--- a/src/consumer/PullRequest.cpp
+++ b/src/consumer/PullRequest.cpp
@@ -208,7 +208,8 @@ bool PullRequest::isPullRequestExpired() const {
   uint64 interval = m_lastPullTimestamp + MAX_PULL_IDLE_TIME;
   if (interval <= UtilAll::currentTimeMillis()) {
     LOG_WARN("PullRequest for [%s] has been expired %lld ms,m_lastPullTimestamp = %lld ms",
-             m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - interval, m_lastPullTimestamp);
+             m_messageQueue.toString().c_str(), UtilAll::currentTimeMillis() - m_lastPullTimestamp,
+             m_lastPullTimestamp);
     return true;
   }
   return false;
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 6dbb35c..18c8b2f 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -522,6 +522,7 @@ bool RebalancePush::updateRequestTableInRebalance(const string& topic, vector<MQ
     int64 nextOffset = computePullFromWhere(*itAdd);
     if (nextOffset >= 0) {
       pullRequest->setNextOffset(nextOffset);
+      pullRequest->setDropped(false);
       changed = true;
       addPullRequest(*itAdd, pullRequest);
       pullRequestsToAdd.push_back(pullRequest);