You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ma...@apache.org on 2021/11/28 04:23:27 UTC
[rocketmq-client-go] branch master updated: fix: correctly mark messages to be reconsumed (#693)
This is an automated email from the ASF dual-hosted git repository.
maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new a8f01df fix: correctly mark messages to be reconsumed (#693)
a8f01df is described below
commit a8f01df7f70d6c86e0e3751198a0ae78bea2f31f
Author: fluyu <fl...@users.noreply.github.com>
AuthorDate: Sun Nov 28 12:23:21 2021 +0800
fix: correctly mark messages to be reconsumed (#693)
---
consumer/push_consumer.go | 2 +-
1 file changed, 1 insertion(+), 1 deletion(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 9b38d0f..3e4377b 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1162,7 +1162,7 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
commitOffset = pq.commit()
case SuspendCurrentQueueAMoment:
if pc.checkReconsumeTimes(msgs) {
- pq.putMessage(msgs...)
+ pq.makeMessageToCosumeAgain(msgs...)
time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
continueConsume = false
} else {