You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/25 01:59:43 UTC

[rocketmq-client-go] branch master updated: Fix msg lost if consumer crash when send msg back failed. (#860)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 4efd770  Fix msg lost if consumer crash when send msg back failed. (#860)
4efd770 is described below

commit 4efd7709df3f295f2c15aee87385aefbde90e07d
Author: dinglei <li...@163.com>
AuthorDate: Mon Jul 25 09:59:39 2022 +0800

    Fix msg lost if consumer crash when send msg back failed. (#860)
    
    Co-authored-by: shannon.dl <sh...@alibaba-inc.com>
---
 consumer/push_consumer.go | 8 ++++++--
 1 file changed, 6 insertions(+), 2 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3221c6c..111a0a8 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1046,8 +1046,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 
 			if !pq.IsDroppd() {
 				msgBackFailed := make([]*primitive.MessageExt, 0)
+				msgBackSucceed := make([]*primitive.MessageExt, 0)
 				if result == ConsumeSuccess {
 					pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+					msgBackSucceed = subMsgs
 				} else {
 					pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
 					if pc.model == BroadCasting {
@@ -1059,7 +1061,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 					} else {
 						for i := 0; i < len(subMsgs); i++ {
 							msg := subMsgs[i]
-							if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
+							if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
+								msgBackSucceed = append(msgBackSucceed, msg)
+							} else {
 								msg.ReconsumeTimes += 1
 								msgBackFailed = append(msgBackFailed, msg)
 							}
@@ -1067,7 +1071,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 					}
 				}
 
-				offset := pq.removeMessage(subMsgs...)
+				offset := pq.removeMessage(msgBackSucceed...)
 
 				if offset >= 0 && !pq.IsDroppd() {
 					pc.storage.update(mq, int64(offset), true)