You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@rocketmq.apache.org by di...@apache.org on 2020/03/03 06:49:31 UTC

[rocketmq-client-go] branch native updated: fix(retry): can not consume the messages in the retry topic (#437)

This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch native
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-go.git


The following commit(s) were added to refs/heads/native by this push:
     new 2c78dae  fix(retry): can not consume the messages in the retry topic (#437)
2c78dae is described below

commit 2c78dae5b5e00d20b376dc39b49364c8d90bf49d
Author: 秦宁 <q3...@163.com>
AuthorDate: Tue Mar 3 14:49:22 2020 +0800

    fix(retry): can not consume the messages in the retry topic (#437)
    
    fix(retry): can not consume the messages in the retry topic (#437)
---
 consumer/push_consumer.go | 7 +++++++
 1 file changed, 7 insertions(+)

diff --git a/consumer/push_consumer.go b/consumer/push_consumer.go
index cb14688..321c59f 100644
--- a/consumer/push_consumer.go
+++ b/consumer/push_consumer.go
@@ -22,6 +22,7 @@ import (
 	"fmt"
 	"math"
 	"strconv"
+	"strings"
 	"sync"
 	"sync/atomic"
 	"time"
@@ -803,6 +804,12 @@ func (pc *pushConsumer) consumeInner(ctx context.Context, subMsgs []*primitive.M
 	}
 
 	f, exist := pc.consumeFunc.Contains(subMsgs[0].Topic)
+
+	// fix lost retry message
+	if !exist && strings.HasPrefix(subMsgs[0].Topic, internal.RetryGroupTopicPrefix) {
+		f, exist = pc.consumeFunc.Contains(subMsgs[0].GetProperty(primitive.PropertyRetryTopic))
+	}
+
 	if !exist {
 		return ConsumeRetryLater, fmt.Errorf("the consume callback missing for topic: %s", subMsgs[0].Topic)
 	}