You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/09/09 14:03:14 UTC
[pulsar-client-go] branch master updated: Support retry letter
topic (#359)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 3c523ba Support retry letter topic (#359)
3c523ba is described below
commit 3c523ba8288929ec4299b5a362fe8fc9088858e2
Author: wuYin <wu...@gmail.com>
AuthorDate: Wed Sep 9 22:00:43 2020 +0800
Support retry letter topic (#359)
### Motivation
Follow [pulsar#6449](https://github.com/apache/pulsar/pull/6449) to support retry letter topic in go client
### Modifications
- Add `retryRouter` for sending reconsume messages to retry letter topic
- Add `ReconsumeLater(msg Message, delay time.Duration)` to Consumer interface
- Add configureable retry letter topic name in `DLQPolicy`
```go
type DLQPolicy struct {
// ...
// Name of the topic where the retry messages will be sent.
RetryLetterTopic string
}
```
enable it explicitly while creating consumer, default unenable
```go
type ConsumerOptions struct {
// ...
// Auto retry send messages to default filled DLQPolicy topics
RetryEnable bool
}
```
- Add 2 `TestRLQ*` test cases
---
pulsar/consumer.go | 12 +-
pulsar/consumer_impl.go | 104 ++++++++++++++-
pulsar/consumer_multitopic.go | 16 ++-
pulsar/consumer_regex.go | 21 ++-
pulsar/consumer_regex_test.go | 6 +-
pulsar/consumer_test.go | 207 +++++++++++++++++++++++++++++-
pulsar/dlq_router.go | 6 +-
pulsar/impl_message.go | 5 +
pulsar/internal/connection.go | 32 ++++-
pulsar/internal/topic_name.go | 4 +-
pulsar/producer_impl.go | 4 +-
pulsar/{dlq_router.go => retry_router.go} | 91 ++++++-------
12 files changed, 423 insertions(+), 85 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c1fe454..a99c030 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -66,7 +66,10 @@ type DLQPolicy struct {
MaxDeliveries uint32
// Name of the topic where the failing messages will be sent.
- Topic string
+ DeadLetterTopic string
+
+ // Name of the topic where the retry messages will be sent.
+ RetryLetterTopic string
}
// ConsumerOptions is used to configure and create instances of Consumer
@@ -107,6 +110,10 @@ type ConsumerOptions struct {
// By default is nil and there's no DLQ
DLQ *DLQPolicy
+ // Auto retry send messages to default filled DLQPolicy topics
+ // Default is false
+ RetryEnable bool
+
// Sets a `MessageChannel` for the consumer
// When a message is received, it will be pushed to the channel for consumption
MessageChannel chan ConsumerMessage
@@ -163,6 +170,9 @@ type Consumer interface {
// AckID the consumption of a single message, identified by its MessageID
AckID(MessageID)
+ // ReconsumeLater mark a message for redelivery after custom delay
+ ReconsumeLater(msg Message, delay time.Duration)
+
// Acknowledge the failure to process a single message.
//
// When a message is "negatively acked" it will be marked for redelivery after
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index b5de1f9..7e266a2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -22,6 +22,7 @@ import (
"errors"
"fmt"
"math/rand"
+ "strconv"
"sync"
"time"
@@ -73,6 +74,7 @@ type consumer struct {
messageCh chan ConsumerMessage
dlq *dlqRouter
+ rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
errorCh chan error
@@ -108,10 +110,50 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
messageCh = make(chan ConsumerMessage, 10)
}
+ if options.RetryEnable {
+ usingTopic := ""
+ if options.Topic != "" {
+ usingTopic = options.Topic
+ } else if len(options.Topics) > 0 {
+ usingTopic = options.Topics[0]
+ }
+ tn, err := internal.ParseTopicName(usingTopic)
+ if err != nil {
+ return nil, err
+ }
+
+ retryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
+ dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
+ if options.DLQ == nil {
+ options.DLQ = &DLQPolicy{
+ MaxDeliveries: MaxReconsumeTimes,
+ DeadLetterTopic: dlqTopic,
+ RetryLetterTopic: retryTopic,
+ }
+ } else {
+ if options.DLQ.DeadLetterTopic == "" {
+ options.DLQ.DeadLetterTopic = dlqTopic
+ }
+ if options.DLQ.RetryLetterTopic == "" {
+ options.DLQ.RetryLetterTopic = retryTopic
+ }
+ }
+ if options.Topic != "" && len(options.Topics) == 0 {
+ options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
+ options.Topic = ""
+ } else if options.Topic == "" && len(options.Topics) > 0 {
+ options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
+ }
+ }
+
dlq, err := newDlqRouter(client, options.DLQ)
if err != nil {
return nil, err
}
+ rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable)
+ if err != nil {
+ return nil, err
+ }
// single topic consumer
if options.Topic != "" || len(options.Topics) == 1 {
@@ -124,7 +166,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}
- return topicSubscribe(client, options, topic, messageCh, dlq)
+ return topicSubscribe(client, options, topic, messageCh, dlq, rlq)
}
if len(options.Topics) > 1 {
@@ -132,7 +174,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
return nil, err
}
- return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq)
+ return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
}
if options.TopicsPattern != "" {
@@ -145,14 +187,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
if err != nil {
return nil, err
}
- return newRegexConsumer(client, options, tn, pattern, messageCh, dlq)
+ return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
}
return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
}
func newInternalConsumer(client *client, options ConsumerOptions, topic string,
- messageCh chan ConsumerMessage, dlq *dlqRouter, disableForceTopicCreation bool) (*consumer, error) {
+ messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {
consumer := &consumer{
topic: topic,
@@ -163,6 +205,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
closeCh: make(chan struct{}),
errorCh: make(chan error),
dlq: dlq,
+ rlq: rlq,
log: log.WithField("topic", topic),
consumerName: options.Name,
}
@@ -306,8 +349,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
}
func topicSubscribe(client *client, options ConsumerOptions, topic string,
- messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
- c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, false)
+ messageCh chan ConsumerMessage, dlqRouter *dlqRouter, retryRouter *retryRouter) (Consumer, error) {
+ c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, retryRouter, false)
if err == nil {
consumersOpened.Inc()
}
@@ -375,6 +418,54 @@ func (c *consumer) AckID(msgID MessageID) {
c.consumers[mid.partitionIdx].AckID(mid)
}
+// ReconsumeLater mark a message for redelivery after custom delay
+func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
+ if delay < 0 {
+ delay = 0
+ }
+ msgID, ok := c.messageID(msg.ID())
+ if !ok {
+ return
+ }
+ props := make(map[string]string)
+ for k, v := range msg.Properties() {
+ props[k] = v
+ }
+
+ reconsumeTimes := 1
+ if s, ok := props[SysPropertyReconsumeTimes]; ok {
+ reconsumeTimes, _ = strconv.Atoi(s)
+ reconsumeTimes++
+ } else {
+ props[SysPropertyRealTopic] = msg.Topic()
+ props[SysPropertyOriginMessageID] = msgID.messageID.String()
+ }
+ props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
+ props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
+
+ consumerMsg := ConsumerMessage{
+ Consumer: c,
+ Message: &message{
+ payLoad: msg.Payload(),
+ properties: props,
+ msgID: msgID,
+ },
+ }
+ if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
+ c.dlq.Chan() <- consumerMsg
+ } else {
+ c.rlq.Chan() <- RetryMessage{
+ consumerMsg: consumerMsg,
+ producerMsg: ProducerMessage{
+ Payload: msg.Payload(),
+ Key: msg.Key(),
+ Properties: props,
+ DeliverAfter: delay,
+ },
+ }
+ }
+}
+
func (c *consumer) Nack(msg Message) {
c.NackID(msg.ID())
}
@@ -411,6 +502,7 @@ func (c *consumer) Close() {
c.ticker.Stop()
c.client.handlers.Del(c)
c.dlq.close()
+ c.rlq.close()
consumersClosed.Inc()
consumersPartitions.Sub(float64(len(c.consumers)))
})
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 8d34203..0e823ca 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -38,6 +38,7 @@ type multiTopicConsumer struct {
consumers map[string]Consumer
dlq *dlqRouter
+ rlq *retryRouter
closeOnce sync.Once
closeCh chan struct{}
@@ -45,19 +46,20 @@ type multiTopicConsumer struct {
}
func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
- messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
+ messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
mtc := &multiTopicConsumer{
options: options,
messageCh: messageCh,
consumers: make(map[string]Consumer, len(topics)),
closeCh: make(chan struct{}),
dlq: dlq,
+ rlq: rlq,
log: &log.Entry{},
consumerName: options.Name,
}
var errs error
- for ce := range subscriber(client, topics, options, messageCh, dlq) {
+ for ce := range subscriber(client, topics, options, messageCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
@@ -134,6 +136,15 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) {
mid.Ack()
}
+func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+ consumer, ok := c.consumers[msg.Topic()]
+ if !ok {
+ c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic())
+ return
+ }
+ consumer.ReconsumeLater(msg, delay)
+}
+
func (c *multiTopicConsumer) Nack(msg Message) {
c.NackID(msg.ID())
}
@@ -166,6 +177,7 @@ func (c *multiTopicConsumer) Close() {
wg.Wait()
close(c.closeCh)
c.dlq.close()
+ c.rlq.close()
consumersClosed.Inc()
})
}
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index e0fdbcb..13ef600 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -40,6 +40,7 @@ const (
type regexConsumer struct {
client *client
dlq *dlqRouter
+ rlq *retryRouter
options ConsumerOptions
@@ -64,10 +65,11 @@ type regexConsumer struct {
}
func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
- msgCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
+ msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
rc := ®exConsumer{
client: c,
dlq: dlq,
+ rlq: rlq,
options: opts,
messageCh: msgCh,
@@ -90,7 +92,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
}
var errs error
- for ce := range subscriber(c, topics, opts, msgCh, dlq) {
+ for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
if ce.err != nil {
errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
} else {
@@ -163,6 +165,10 @@ func (c *regexConsumer) Ack(msg Message) {
c.AckID(msg.ID())
}
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+ c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+}
+
// Ack the consumption of a single message, identified by its MessageID
func (c *regexConsumer) AckID(msgID MessageID) {
mid, ok := toTrackingMessageID(msgID)
@@ -215,6 +221,7 @@ func (c *regexConsumer) Close() {
}
wg.Wait()
c.dlq.close()
+ c.rlq.close()
consumersClosed.Inc()
})
}
@@ -253,7 +260,7 @@ func (c *regexConsumer) monitor() {
}
case topics := <-c.subscribeCh:
if len(topics) > 0 && !c.closed() {
- c.subscribe(topics, c.dlq)
+ c.subscribe(topics, c.dlq, c.rlq)
}
case topics := <-c.unsubscribeCh:
if len(topics) > 0 && !c.closed() {
@@ -298,12 +305,12 @@ func (c *regexConsumer) knownTopics() []string {
return topics
}
-func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter) {
+func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) {
if log.GetLevel() == log.DebugLevel {
c.log.WithField("topics", topics).Debug("subscribe")
}
consumers := make(map[string]Consumer, len(topics))
- for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq) {
+ for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) {
if ce.err != nil {
c.log.Warnf("Failed to subscribe to topic=%s", ce.topic)
} else {
@@ -359,7 +366,7 @@ type consumerError struct {
}
func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage,
- dlq *dlqRouter) <-chan consumerError {
+ dlq *dlqRouter, rlq *retryRouter) <-chan consumerError {
consumerErrorCh := make(chan consumerError, len(topics))
var wg sync.WaitGroup
wg.Add(len(topics))
@@ -371,7 +378,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
for _, t := range topics {
go func(topic string) {
defer wg.Done()
- c, err := newInternalConsumer(c, opts, topic, ch, dlq, true)
+ c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true)
consumerErrorCh <- consumerError{
err: err,
topic: topic,
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index c45c40a..e4acf5f 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -154,7 +154,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
}
dlq, _ := newDlqRouter(c.(*client), nil)
- consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq)
+ rlq, _ := newRetryRouter(c.(*client), nil, false)
+ consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
}
@@ -202,7 +203,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
}
dlq, _ := newDlqRouter(c.(*client), nil)
- consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq)
+ rlq, _ := newRetryRouter(c.(*client), nil, false)
+ consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
if err != nil {
t.Fatal(err)
}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index a3a22b6..97d2de2 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -973,8 +973,8 @@ func TestDLQ(t *testing.T) {
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &DLQPolicy{
- MaxDeliveries: 3,
- Topic: dlqTopic,
+ MaxDeliveries: 3,
+ DeadLetterTopic: dlqTopic,
},
})
assert.Nil(t, err)
@@ -1076,8 +1076,8 @@ func TestDLQMultiTopics(t *testing.T) {
NackRedeliveryDelay: 1 * time.Second,
Type: Shared,
DLQ: &DLQPolicy{
- MaxDeliveries: 3,
- Topic: dlqTopic,
+ MaxDeliveries: 3,
+ DeadLetterTopic: dlqTopic,
},
})
assert.Nil(t, err)
@@ -1146,6 +1146,205 @@ func TestDLQMultiTopics(t *testing.T) {
assert.Nil(t, msg)
}
+func TestRLQ(t *testing.T) {
+ topic := "persistent://public/default/" + newTopicName()
+ subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+ maxRedeliveries := 2
+ N := 100
+ ctx := context.Background()
+
+ client, err := NewClient(ClientOptions{URL: lookupURL})
+ assert.Nil(t, err)
+ defer client.Close()
+
+ // 1. Pre-produce N messages
+ producer, err := client.CreateProducer(ProducerOptions{Topic: topic})
+ assert.Nil(t, err)
+ defer producer.Close()
+
+ for i := 0; i < N; i++ {
+ _, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))})
+ assert.Nil(t, err)
+ }
+
+ // 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times
+ rlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subName,
+ Type: Shared,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ DLQ: &DLQPolicy{
+ MaxDeliveries: uint32(maxRedeliveries),
+ },
+ RetryEnable: true,
+ NackRedeliveryDelay: 1 * time.Second,
+ })
+ assert.Nil(t, err)
+ defer rlqConsumer.Close()
+
+ rlqReceived := 0
+ for rlqReceived < N*(maxRedeliveries+1) {
+ msg, err := rlqConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+ rlqReceived++
+ }
+ fmt.Println("retry consumed:", rlqReceived) // 300
+
+ // No more messages on the Retry Topic
+ rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer rlqCancel()
+ msg, err := rlqConsumer.Receive(rlqCtx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+
+ // 3. Create consumer on the DLQ topic to verify the routing
+ dlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "persistent://public/default/" + subName + "-DLQ",
+ SubscriptionName: subName,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer dlqConsumer.Close()
+
+ dlqReceived := 0
+ for dlqReceived < N {
+ msg, err := dlqConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ dlqConsumer.Ack(msg)
+ dlqReceived++
+ }
+ fmt.Println("dlq received:", dlqReceived) // 100
+
+ // No more messages on the DLQ Topic
+ dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer dlqCancel()
+ msg, err = dlqConsumer.Receive(dlqCtx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+
+ // 4. No more messages for same subscription
+ checkConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: topic,
+ SubscriptionName: subName,
+ Type: Shared,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer checkConsumer.Close()
+
+ checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer checkCancel()
+ checkMsg, err := checkConsumer.Receive(checkCtx)
+ assert.Error(t, err)
+ assert.Nil(t, checkMsg)
+}
+
+func TestRLQMultiTopics(t *testing.T) {
+ now := time.Now().Unix()
+ topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
+ topic02 := fmt.Sprintf("persistent://public/default/topic-%d-2", now)
+ topics := []string{topic01, topic02}
+
+ subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+ maxRedeliveries := 2
+ N := 100
+ ctx := context.Background()
+
+ client, err := NewClient(ClientOptions{URL: lookupURL})
+ assert.Nil(t, err)
+ defer client.Close()
+
+ // subscribe multi topics with Retry Topics
+ rlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topics: topics,
+ SubscriptionName: subName,
+ Type: Shared,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ DLQ: &DLQPolicy{MaxDeliveries: uint32(maxRedeliveries)},
+ RetryEnable: true,
+ NackRedeliveryDelay: 1 * time.Second,
+ })
+ assert.Nil(t, err)
+ defer rlqConsumer.Close()
+
+ // subscribe DLQ Topic
+ dlqConsumer, err := client.Subscribe(ConsumerOptions{
+ Topic: "persistent://public/default/" + subName + "-DLQ",
+ SubscriptionName: subName,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer dlqConsumer.Close()
+
+ // create multi producers
+ producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01})
+ assert.Nil(t, err)
+ defer producer01.Close()
+
+ producer02, err := client.CreateProducer(ProducerOptions{Topic: topic02})
+ assert.Nil(t, err)
+ defer producer02.Close()
+
+ // 1. Pre-produce N messages for every topic
+ for i := 0; i < N; i++ {
+ _, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
+ assert.Nil(t, err)
+ _, err = producer02.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_02_%d", i))})
+ assert.Nil(t, err)
+ }
+
+ // 2. Create consumer on the Retry Topics to reconsume 2*N messages (maxRedeliveries+1) times
+ rlqReceived := 0
+ for rlqReceived < 2*N*(maxRedeliveries+1) {
+ msg, err := rlqConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+ rlqReceived++
+ }
+ fmt.Println("retry consumed:", rlqReceived) // 600
+
+ // No more messages on the Retry Topic
+ rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer rlqCancel()
+ msg, err := rlqConsumer.Receive(rlqCtx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+
+ // 3. Create consumer on the DLQ topic to verify the routing
+ dlqReceived := 0
+ for dlqReceived < 2*N {
+ msg, err := dlqConsumer.Receive(ctx)
+ assert.Nil(t, err)
+ dlqConsumer.Ack(msg)
+ dlqReceived++
+ }
+ fmt.Println("dlq received:", dlqReceived) // 200
+
+ // No more messages on the DLQ Topic
+ dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer dlqCancel()
+ msg, err = dlqConsumer.Receive(dlqCtx)
+ assert.Error(t, err)
+ assert.Nil(t, msg)
+
+ // 4. No more messages for same subscription
+ checkConsumer, err := client.Subscribe(ConsumerOptions{
+ Topics: []string{topic01, topic02},
+ SubscriptionName: subName,
+ Type: Shared,
+ SubscriptionInitialPosition: SubscriptionPositionEarliest,
+ })
+ assert.Nil(t, err)
+ defer checkConsumer.Close()
+
+ timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+ defer cancel()
+ checkMsg, err := checkConsumer.Receive(timeoutCtx)
+ assert.Error(t, err)
+ assert.Nil(t, checkMsg)
+}
+
func TestGetDeliveryCount(t *testing.T) {
client, err := NewClient(ClientOptions{
URL: lookupURL,
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index d6e7b30..68b263b 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -46,13 +46,13 @@ func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {
return nil, errors.New("DLQPolicy.MaxDeliveries needs to be > 0")
}
- if policy.Topic == "" {
+ if policy.DeadLetterTopic == "" {
return nil, errors.New("DLQPolicy.Topic needs to be set to a valid topic name")
}
r.messageCh = make(chan ConsumerMessage)
r.closeCh = make(chan interface{}, 1)
- r.log = log.WithField("dlq-topic", policy.Topic)
+ r.log = log.WithField("dlq-topic", policy.DeadLetterTopic)
go r.run()
}
return r, nil
@@ -132,7 +132,7 @@ func (r *dlqRouter) getProducer() Producer {
backoff := &internal.Backoff{}
for {
producer, err := r.client.CreateProducer(ProducerOptions{
- Topic: r.policy.Topic,
+ Topic: r.policy.DeadLetterTopic,
CompressionType: LZ4,
BatchingMaxPublishDelay: 100 * time.Millisecond,
})
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index d670a37..d796969 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,6 +18,7 @@
package pulsar
import (
+ "fmt"
"math/big"
"strings"
"sync"
@@ -103,6 +104,10 @@ func (id messageID) Serialize() []byte {
return data
}
+func (id messageID) String() string {
+ return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)
+}
+
func deserializeMessageID(data []byte) (MessageID, error) {
msgID := &pb.MessageIdData{}
err := proto.Unmarshal(data, msgID)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1bfec52..0d2baab 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -176,6 +176,7 @@ type connection struct {
closeCh chan interface{}
writeRequestsCh chan Buffer
+ pendingLock sync.Mutex
pendingReqs map[uint64]*request
listeners map[uint64]ConnectionListener
@@ -356,24 +357,35 @@ func (c *connection) run() {
defer func() {
// all the accesses to the pendingReqs should be happened in this run loop thread,
// including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
+ c.pendingLock.Lock()
for id, req := range c.pendingReqs {
req.callback(nil, errors.New("connection closed"))
delete(c.pendingReqs, id)
}
+ c.pendingLock.Unlock()
c.Close()
}()
+ go func() {
+ for {
+ select {
+ case <-c.closeCh:
+ return
+
+ case req := <-c.incomingRequestsCh:
+ if req == nil {
+ return // TODO: this never gonna be happen
+ }
+ c.internalSendRequest(req)
+ }
+ }
+ }()
+
for {
select {
case <-c.closeCh:
return
- case req := <-c.incomingRequestsCh:
- if req == nil {
- return
- }
- c.internalSendRequest(req)
-
case cmd := <-c.incomingCmdCh:
c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
@@ -556,33 +568,41 @@ func (c *connection) SendRequestNoWait(req *pb.BaseCommand) {
}
func (c *connection) internalSendRequest(req *request) {
+ c.pendingLock.Lock()
if req.id != nil {
c.pendingReqs[*req.id] = req
}
+ c.pendingLock.Unlock()
c.writeCommand(req.cmd)
}
func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
+ c.pendingLock.Lock()
request, ok := c.pendingReqs[requestID]
if !ok {
c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
+ c.pendingLock.Unlock()
return
}
delete(c.pendingReqs, requestID)
+ c.pendingLock.Unlock()
request.callback(response, nil)
}
func (c *connection) handleResponseError(serverError *pb.CommandError) {
requestID := serverError.GetRequestId()
+ c.pendingLock.Lock()
request, ok := c.pendingReqs[requestID]
if !ok {
c.log.Warnf("Received unexpected error response for request %d of type %s",
requestID, serverError.GetError())
+ c.pendingLock.Unlock()
return
}
delete(c.pendingReqs, requestID)
+ c.pendingLock.Unlock()
errMsg := fmt.Sprintf("server error: %s: %s", serverError.GetError(), serverError.GetMessage())
request.callback(nil, errors.New(errMsg))
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index 78d2abb..68b7ea1 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -26,8 +26,9 @@ import (
// TopicName abstract a struct contained in a Topic
type TopicName struct {
- Name string
+ Domain string
Namespace string
+ Name string
Partition int
}
@@ -68,6 +69,7 @@ func ParseTopicName(topic string) (*TopicName, error) {
if domain != "persistent" && domain != "non-persistent" {
return nil, errors.New("Invalid topic domain: " + domain)
}
+ tn.Domain = domain
rest := parts[1]
var err error
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 2a2b63e..458d2b5 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -286,8 +286,8 @@ func (p *producer) Flush() error {
}
func (p *producer) Close() {
- p.RLock()
- defer p.RUnlock()
+ p.Lock()
+ defer p.Unlock()
if p.ticker != nil {
p.ticker.Stop()
close(p.tickerStop)
diff --git a/pulsar/dlq_router.go b/pulsar/retry_router.go
similarity index 50%
copy from pulsar/dlq_router.go
copy to pulsar/retry_router.go
index d6e7b30..b16417d 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/retry_router.go
@@ -26,95 +26,84 @@ import (
log "github.com/sirupsen/logrus"
)
-type dlqRouter struct {
+const (
+ DlqTopicSuffix = "-DLQ"
+ RetryTopicSuffix = "-RETRY"
+ MaxReconsumeTimes = 16
+
+ SysPropertyDelayTime = "DELAY_TIME"
+ SysPropertyRealTopic = "REAL_TOPIC"
+ SysPropertyRetryTopic = "RETRY_TOPIC"
+ SysPropertyReconsumeTimes = "RECONSUMETIMES"
+ SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME"
+)
+
+type RetryMessage struct {
+ producerMsg ProducerMessage
+ consumerMsg ConsumerMessage
+}
+
+type retryRouter struct {
client Client
producer Producer
policy *DLQPolicy
- messageCh chan ConsumerMessage
+ messageCh chan RetryMessage
closeCh chan interface{}
log *log.Entry
}
-func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {
- r := &dlqRouter{
+func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool) (*retryRouter, error) {
+ r := &retryRouter{
client: client,
policy: policy,
}
- if policy != nil {
+ if policy != nil && retryEnabled {
if policy.MaxDeliveries <= 0 {
return nil, errors.New("DLQPolicy.MaxDeliveries needs to be > 0")
}
- if policy.Topic == "" {
- return nil, errors.New("DLQPolicy.Topic needs to be set to a valid topic name")
+ if policy.RetryLetterTopic == "" {
+ return nil, errors.New("DLQPolicy.RetryLetterTopic needs to be set to a valid topic name")
}
- r.messageCh = make(chan ConsumerMessage)
+ r.messageCh = make(chan RetryMessage)
r.closeCh = make(chan interface{}, 1)
- r.log = log.WithField("dlq-topic", policy.Topic)
+ r.log = log.WithField("rlq-topic", policy.RetryLetterTopic)
go r.run()
}
return r, nil
}
-func (r *dlqRouter) shouldSendToDlq(cm *ConsumerMessage) bool {
- if r.policy == nil {
- return false
- }
-
- msg := cm.Message.(*message)
- r.log.WithField("count", msg.redeliveryCount).
- WithField("max", r.policy.MaxDeliveries).
- WithField("msgId", msg.msgID).
- Debug("Should route to DLQ?")
-
- // We use >= here because we're comparing the number of re-deliveries with
- // the number of deliveries. So:
- // * the user specifies that wants to process a message up to 10 times.
- // * the first time, the redeliveryCount == 0, then 1 and so on
- // * when we receive the message and redeliveryCount == 10, it means
- // that the application has already got (and Nack()) the message 10
- // times, so this time we should just go to DLQ.
-
- return msg.redeliveryCount >= r.policy.MaxDeliveries
-}
-
-func (r *dlqRouter) Chan() chan ConsumerMessage {
+func (r *retryRouter) Chan() chan RetryMessage {
return r.messageCh
}
-func (r *dlqRouter) run() {
+func (r *retryRouter) run() {
for {
select {
- case cm := <-r.messageCh:
- r.log.WithField("msgID", cm.ID()).Debug("Got message for DLQ")
+ case rm := <-r.messageCh:
+ r.log.WithField("msgID", rm.consumerMsg.ID()).Debug("Got message for RLQ")
producer := r.getProducer()
- msg := cm.Message.(*message)
- msgID := msg.ID()
- producer.SendAsync(context.Background(), &ProducerMessage{
- Payload: msg.Payload(),
- Key: msg.Key(),
- Properties: msg.Properties(),
- EventTime: msg.EventTime(),
- ReplicationClusters: msg.replicationClusters,
- }, func(MessageID, *ProducerMessage, error) {
- r.log.WithField("msgID", msgID).Debug("Sent message to DLQ")
- cm.Consumer.AckID(msgID)
+ msgID := rm.consumerMsg.ID()
+ producer.SendAsync(context.Background(), &rm.producerMsg, func(MessageID, *ProducerMessage, error) {
+ // TODO: if router produce failed, should Nack this message
+ r.log.WithField("msgID", msgID).Debug("Sent message to RLQ")
+ rm.consumerMsg.Consumer.AckID(msgID)
})
case <-r.closeCh:
if r.producer != nil {
r.producer.Close()
}
- r.log.Debug("Closed DLQ router")
+ r.log.Debug("Closed RLQ router")
return
}
}
}
-func (r *dlqRouter) close() {
+func (r *retryRouter) close() {
// Attempt to write on the close channel, without blocking
select {
case r.closeCh <- nil:
@@ -122,7 +111,7 @@ func (r *dlqRouter) close() {
}
}
-func (r *dlqRouter) getProducer() Producer {
+func (r *retryRouter) getProducer() Producer {
if r.producer != nil {
// Producer was already initialized
return r.producer
@@ -132,13 +121,13 @@ func (r *dlqRouter) getProducer() Producer {
backoff := &internal.Backoff{}
for {
producer, err := r.client.CreateProducer(ProducerOptions{
- Topic: r.policy.Topic,
+ Topic: r.policy.RetryLetterTopic,
CompressionType: LZ4,
BatchingMaxPublishDelay: 100 * time.Millisecond,
})
if err != nil {
- r.log.WithError(err).Error("Failed to create DLQ producer")
+ r.log.WithError(err).Error("Failed to create RLQ producer")
time.Sleep(backoff.Next())
continue
} else {