You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by ma...@apache.org on 2021/11/28 04:23:27 UTC

[rocketmq-client-go] branch master updated: fix: correctly mark messages to be reconsumed (#693)

This is an automated email from the ASF dual-hosted git repository.

maixiaohai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new a8f01df  fix: correctly mark messages to be reconsumed (#693)
a8f01df is described below

commit a8f01df7f70d6c86e0e3751198a0ae78bea2f31f
Author: fluyu <fl...@users.noreply.github.com>
AuthorDate: Sun Nov 28 12:23:21 2021 +0800

    fix: correctly mark messages to be reconsumed (#693)
---
 consumer/push_consumer.go | 2 +-
 1 file changed, 1 insertion(+), 1 deletion(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 9b38d0f..3e4377b 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1162,7 +1162,7 @@ func (pc *pushConsumer) consumeMessageOrderly(pq *processQueue, mq *primitive.Me
 					commitOffset = pq.commit()
 				case SuspendCurrentQueueAMoment:
 					if pc.checkReconsumeTimes(msgs) {
-						pq.putMessage(msgs...)
+						pq.makeMessageToCosumeAgain(msgs...)
 						time.Sleep(time.Duration(orderlyCtx.SuspendCurrentQueueTimeMillis) * time.Millisecond)
 						continueConsume = false
 					} else {