You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/07/08 05:03:03 UTC
[rocketmq-client-go] branch master updated: fix(reconsume): subMsgs
should be used instead of msgs in consume goroutine (#504)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 80b3acc fix(reconsume): subMsgs should be used instead of msgs in consume goroutine (#504)
80b3acc is described below
commit 80b3acc5580a9a512597b94a734feeee3001883a
Author: hzjiangjian <ji...@126.com>
AuthorDate: Wed Jul 8 13:02:55 2020 +0800
fix(reconsume): subMsgs should be used instead of msgs in consume goroutine (#504)
* fix(consume): subMsgs should be used instead of msgs in consuming goroutine
---
consumer/push_consumer.go | 10 +++++-----
1 file changed, 5 insertions(+), 5 deletions(-)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index d2aee5b..25b8ce8 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -913,7 +913,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
Properties: make(map[string]string),
ConsumerGroup: pc.consumerGroup,
MQ: mq,
- Msgs: msgs,
+ Msgs: subMsgs,
}
ctx := context.Background()
ctx = primitive.WithConsumerCtx(ctx, msgCtx)
@@ -944,14 +944,14 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
} else {
increaseConsumeFailedTPS(pc.consumerGroup, mq.Topic, len(subMsgs))
if pc.model == BroadCasting {
- for i := 0; i < len(msgs); i++ {
+ for i := 0; i < len(subMsgs); i++ {
rlog.Warning("BROADCASTING, the message consume failed, drop it", map[string]interface{}{
"message": subMsgs[i],
})
}
} else {
- for i := 0; i < len(msgs); i++ {
- msg := msgs[i]
+ for i := 0; i < len(subMsgs); i++ {
+ msg := subMsgs[i]
if !pc.sendMessageBack(mq.BrokerName, msg, concurrentCtx.DelayLevelWhenNextConsume) {
msg.ReconsumeTimes += 1
msgBackFailed = append(msgBackFailed, msg)
@@ -973,7 +973,7 @@ func (pc *pushConsumer) consumeMessageCurrently(pq *processQueue, mq *primitive.
} else {
rlog.Warning("processQueue is dropped without process consume result.", map[string]interface{}{
rlog.LogKeyMessageQueue: mq,
- "message": msgs,
+ "message": subMsgs,
})
}
})