You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/07/08 05:03:03 UTC

[rocketmq-client-go] branch master updated: fix(reconsume): subMsgs should be used instead of msgs in consume goroutine (#504)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 80b3acc  fix(reconsume): subMsgs should be used instead of msgs in consume goroutine (#504)
80b3acc is described below

commit 80b3acc5580a9a512597b94a734feeee3001883a
Author: hzjiangjian <ji...@126.com>
AuthorDate: Wed Jul 8 13:02:55 2020 +0800

    fix(reconsume): subMsgs should be used instead of msgs in consume goroutine (#504)
    
    * fix(consume): subMsgs should be used instead of msgs in consuming goroutine
---
 consumer/push_consumer.go | 10 +++++-----
 1 file changed, 5 insertions(+), 5 deletions(-)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d2aee5b..25b8ce8 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -913,7 +913,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 				Properties:    make(map[string]string),
 				ConsumerGroup: pc.consumerGroup,
 				MQ:            mq,
-				Msgs:          msgs,
+				Msgs:          subMsgs,
 			}
 			ctx := context.Background()
 			ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -944,14 +944,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 				} else {
 					increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
 					if pc.model == BroadCasting {
-						for i := 0; i < len(msgs); i++ {
+						for i := 0; i < len(subMsgs); i++ {
 							rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
 								"message": subMsgs[i],
 							})
 						}
 					} else {
-						for i := 0; i < len(msgs); i++ {
-							msg := msgs[i]
+						for i := 0; i < len(subMsgs); i++ {
+							msg := subMsgs[i]
 							if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
 								msg.ReconsumeTimes += 1
 								msgBackFailed = append(msgBackFailed, msg)
@@ -973,7 +973,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
 			} else {
 				rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
 					rlog.LogKeyMessageQueue: mq,
-					"message":               msgs,
+					"message":               subMsgs,
 				})
 			}
 		})