You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/03/03 06:49:31 UTC
[rocketmq-client-go] branch native updated: fix(retry): can not
consume the messages in the retry topic (#437)
This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git
The following commit(s) were added to refs/heads/native by this push:
new 2c78dae fix(retry): can not consume the messages in the retry topic (#437)
2c78dae is described below
commit 2c78dae5b5e00d20b376dc39b49364c8d90bf49d
Author: 秦宁 <q3...@163.com>
AuthorDate: Tue Mar 3 14:49:22 2020 +0800
fix(retry): can not consume the messages in the retry topic (#437)
fix(retry): can not consume the messages in the retry topic (#437)
---
consumer/push_consumer.go | 7 +++++++
1 file changed, 7 insertions(+)
diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index cb14688..321c59f 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -22,6 +22,7 @@ import (
"fmt"
"math"
"strconv"
+ "strings"
"sync"
"sync/atomic"
"time"
@@ -803,6 +804,12 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.M
}
f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
+
+ // fix lost retry message
+ if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {
+ f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
+ }
+
if !exist {
return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
}