You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2022/07/25 01:59:43 UTC
[rocketmq-client-go] branch master updated: Fix msg lost if consumer crash when send msg back failed. (#860)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 4efd770 Fix msg lost if consumer crash when send msg back failed. (#860)
4efd770 is described below
commit 4efd7709df3f295f2c15aee87385aefbde90e07d
Author: dinglei <li...@163.com>
AuthorDate: Mon Jul 25 09:59:39 2022 +0800
Fix msg lost if consumer crash when send msg back failed. (#860)
Co-authored-by: shannon.dl <sh...@alibaba-inc.com>
---
consumer/push_consumer.go | 8 ++++++--
1 file changed, 6 insertions(+), 2 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index 3221c6c..111a0a8 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -1046,8 +1046,10 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
if !pq.IsDroppd() {
msgBackFailed := make([]*primitive.MessageExt, 0)
+ msgBackSucceed := make([]*primitive.MessageExt, 0)
if result == ConsumeSuccess {
pc.stat.increaseConsumeOKTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
+ msgBackSucceed = subMsgs
} else {
pc.stat.increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
@@ -1059,7 +1061,9 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
} else {
for i := 0; i < len(subMsgs); i++ {
msg := subMsgs[i]
- if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
+ if pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
+ msgBackSucceed = append(msgBackSucceed, msg)
+ } else {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
}
@@ -1067,7 +1071,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
}
}
- offset := pq.removeMessage(subMsgs...)
+ offset := pq.removeMessage(msgBackSucceed...)
if offset >= 0 && !pq.IsDroppd() {
pc.storage.update(mq, int64(offset), true)