You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/09/09 14:03:14 UTC

[pulsar-client-go] branch master updated: Support retry letter topic (#359)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 3c523ba  Support retry letter topic  (#359)
3c523ba is described below

commit 3c523ba8288929ec4299b5a362fe8fc9088858e2
Author: wuYin <wu...@gmail.com>
AuthorDate: Wed Sep 9 22:00:43 2020 +0800

    Support retry letter topic  (#359)
    
    ### Motivation
    
    Follow [pulsar#6449](https://github.com/apache/pulsar/pull/6449) to support retry letter topic in go client
    
    ### Modifications
    
    - Add `retryRouter` for sending reconsume messages to retry letter topic
    - Add `ReconsumeLater(msg Message, delay time.Duration)` to Consumer interface
    - Add configureable retry letter topic name in `DLQPolicy`
        ```go
    	type DLQPolicy struct {
    		// ...
    		// Name of the topic where the retry messages will be sent.
    		RetryLetterTopic string
    	}
    	```
        enable it explicitly while creating consumer, default unenable
    
         ```go
        type ConsumerOptions struct {
    	    // ...
    		// Auto retry send messages to default filled DLQPolicy topics
    		RetryEnable bool
    	}
        ```
    - Add 2 `TestRLQ*`  test cases
---
 pulsar/consumer.go                        |  12 +-
 pulsar/consumer_impl.go                   | 104 ++++++++++++++-
 pulsar/consumer_multitopic.go             |  16 ++-
 pulsar/consumer_regex.go                  |  21 ++-
 pulsar/consumer_regex_test.go             |   6 +-
 pulsar/consumer_test.go                   | 207 +++++++++++++++++++++++++++++-
 pulsar/dlq_router.go                      |   6 +-
 pulsar/impl_message.go                    |   5 +
 pulsar/internal/connection.go             |  32 ++++-
 pulsar/internal/topic_name.go             |   4 +-
 pulsar/producer_impl.go                   |   4 +-
 pulsar/{dlq_router.go => retry_router.go} |  91 ++++++-------
 12 files changed, 423 insertions(+), 85 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index c1fe454..a99c030 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -66,7 +66,10 @@ type DLQPolicy struct {
 	MaxDeliveries uint32
 
 	// Name of the topic where the failing messages will be sent.
-	Topic string
+	DeadLetterTopic string
+
+	// Name of the topic where the retry messages will be sent.
+	RetryLetterTopic string
 }
 
 // ConsumerOptions is used to configure and create instances of Consumer
@@ -107,6 +110,10 @@ type ConsumerOptions struct {
 	// By default is nil and there's no DLQ
 	DLQ *DLQPolicy
 
+	// Auto retry send messages to default filled DLQPolicy topics
+	// Default is false
+	RetryEnable bool
+
 	// Sets a `MessageChannel` for the consumer
 	// When a message is received, it will be pushed to the channel for consumption
 	MessageChannel chan ConsumerMessage
@@ -163,6 +170,9 @@ type Consumer interface {
 	// AckID the consumption of a single message, identified by its MessageID
 	AckID(MessageID)
 
+	// ReconsumeLater mark a message for redelivery after custom delay
+	ReconsumeLater(msg Message, delay time.Duration)
+
 	// Acknowledge the failure to process a single message.
 	//
 	// When a message is "negatively acked" it will be marked for redelivery after
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index b5de1f9..7e266a2 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -22,6 +22,7 @@ import (
 	"errors"
 	"fmt"
 	"math/rand"
+	"strconv"
 	"sync"
 	"time"
 
@@ -73,6 +74,7 @@ type consumer struct {
 	messageCh chan ConsumerMessage
 
 	dlq       *dlqRouter
+	rlq       *retryRouter
 	closeOnce sync.Once
 	closeCh   chan struct{}
 	errorCh   chan error
@@ -108,10 +110,50 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		messageCh = make(chan ConsumerMessage, 10)
 	}
 
+	if options.RetryEnable {
+		usingTopic := ""
+		if options.Topic != "" {
+			usingTopic = options.Topic
+		} else if len(options.Topics) > 0 {
+			usingTopic = options.Topics[0]
+		}
+		tn, err := internal.ParseTopicName(usingTopic)
+		if err != nil {
+			return nil, err
+		}
+
+		retryTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + RetryTopicSuffix
+		dlqTopic := tn.Domain + "://" + tn.Namespace + "/" + options.SubscriptionName + DlqTopicSuffix
+		if options.DLQ == nil {
+			options.DLQ = &DLQPolicy{
+				MaxDeliveries:    MaxReconsumeTimes,
+				DeadLetterTopic:  dlqTopic,
+				RetryLetterTopic: retryTopic,
+			}
+		} else {
+			if options.DLQ.DeadLetterTopic == "" {
+				options.DLQ.DeadLetterTopic = dlqTopic
+			}
+			if options.DLQ.RetryLetterTopic == "" {
+				options.DLQ.RetryLetterTopic = retryTopic
+			}
+		}
+		if options.Topic != "" && len(options.Topics) == 0 {
+			options.Topics = []string{options.Topic, options.DLQ.RetryLetterTopic}
+			options.Topic = ""
+		} else if options.Topic == "" && len(options.Topics) > 0 {
+			options.Topics = append(options.Topics, options.DLQ.RetryLetterTopic)
+		}
+	}
+
 	dlq, err := newDlqRouter(client, options.DLQ)
 	if err != nil {
 		return nil, err
 	}
+	rlq, err := newRetryRouter(client, options.DLQ, options.RetryEnable)
+	if err != nil {
+		return nil, err
+	}
 
 	// single topic consumer
 	if options.Topic != "" || len(options.Topics) == 1 {
@@ -124,7 +166,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 			return nil, err
 		}
 
-		return topicSubscribe(client, options, topic, messageCh, dlq)
+		return topicSubscribe(client, options, topic, messageCh, dlq, rlq)
 	}
 
 	if len(options.Topics) > 1 {
@@ -132,7 +174,7 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 			return nil, err
 		}
 
-		return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq)
+		return newMultiTopicConsumer(client, options, options.Topics, messageCh, dlq, rlq)
 	}
 
 	if options.TopicsPattern != "" {
@@ -145,14 +187,14 @@ func newConsumer(client *client, options ConsumerOptions) (Consumer, error) {
 		if err != nil {
 			return nil, err
 		}
-		return newRegexConsumer(client, options, tn, pattern, messageCh, dlq)
+		return newRegexConsumer(client, options, tn, pattern, messageCh, dlq, rlq)
 	}
 
 	return nil, newError(ResultInvalidTopicName, "topic name is required for consumer")
 }
 
 func newInternalConsumer(client *client, options ConsumerOptions, topic string,
-	messageCh chan ConsumerMessage, dlq *dlqRouter, disableForceTopicCreation bool) (*consumer, error) {
+	messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter, disableForceTopicCreation bool) (*consumer, error) {
 
 	consumer := &consumer{
 		topic:                     topic,
@@ -163,6 +205,7 @@ func newInternalConsumer(client *client, options ConsumerOptions, topic string,
 		closeCh:                   make(chan struct{}),
 		errorCh:                   make(chan error),
 		dlq:                       dlq,
+		rlq:                       rlq,
 		log:                       log.WithField("topic", topic),
 		consumerName:              options.Name,
 	}
@@ -306,8 +349,8 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 }
 
 func topicSubscribe(client *client, options ConsumerOptions, topic string,
-	messageCh chan ConsumerMessage, dlqRouter *dlqRouter) (Consumer, error) {
-	c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, false)
+	messageCh chan ConsumerMessage, dlqRouter *dlqRouter, retryRouter *retryRouter) (Consumer, error) {
+	c, err := newInternalConsumer(client, options, topic, messageCh, dlqRouter, retryRouter, false)
 	if err == nil {
 		consumersOpened.Inc()
 	}
@@ -375,6 +418,54 @@ func (c *consumer) AckID(msgID MessageID) {
 	c.consumers[mid.partitionIdx].AckID(mid)
 }
 
+// ReconsumeLater mark a message for redelivery after custom delay
+func (c *consumer) ReconsumeLater(msg Message, delay time.Duration) {
+	if delay < 0 {
+		delay = 0
+	}
+	msgID, ok := c.messageID(msg.ID())
+	if !ok {
+		return
+	}
+	props := make(map[string]string)
+	for k, v := range msg.Properties() {
+		props[k] = v
+	}
+
+	reconsumeTimes := 1
+	if s, ok := props[SysPropertyReconsumeTimes]; ok {
+		reconsumeTimes, _ = strconv.Atoi(s)
+		reconsumeTimes++
+	} else {
+		props[SysPropertyRealTopic] = msg.Topic()
+		props[SysPropertyOriginMessageID] = msgID.messageID.String()
+	}
+	props[SysPropertyReconsumeTimes] = strconv.Itoa(reconsumeTimes)
+	props[SysPropertyDelayTime] = fmt.Sprintf("%d", int64(delay)/1e6)
+
+	consumerMsg := ConsumerMessage{
+		Consumer: c,
+		Message: &message{
+			payLoad:    msg.Payload(),
+			properties: props,
+			msgID:      msgID,
+		},
+	}
+	if uint32(reconsumeTimes) > c.dlq.policy.MaxDeliveries {
+		c.dlq.Chan() <- consumerMsg
+	} else {
+		c.rlq.Chan() <- RetryMessage{
+			consumerMsg: consumerMsg,
+			producerMsg: ProducerMessage{
+				Payload:      msg.Payload(),
+				Key:          msg.Key(),
+				Properties:   props,
+				DeliverAfter: delay,
+			},
+		}
+	}
+}
+
 func (c *consumer) Nack(msg Message) {
 	c.NackID(msg.ID())
 }
@@ -411,6 +502,7 @@ func (c *consumer) Close() {
 		c.ticker.Stop()
 		c.client.handlers.Del(c)
 		c.dlq.close()
+		c.rlq.close()
 		consumersClosed.Inc()
 		consumersPartitions.Sub(float64(len(c.consumers)))
 	})
diff --git a/pulsar/consumer_multitopic.go b/pulsar/consumer_multitopic.go
index 8d34203..0e823ca 100644
--- a/pulsar/consumer_multitopic.go
+++ b/pulsar/consumer_multitopic.go
@@ -38,6 +38,7 @@ type multiTopicConsumer struct {
 	consumers map[string]Consumer
 
 	dlq       *dlqRouter
+	rlq       *retryRouter
 	closeOnce sync.Once
 	closeCh   chan struct{}
 
@@ -45,19 +46,20 @@ type multiTopicConsumer struct {
 }
 
 func newMultiTopicConsumer(client *client, options ConsumerOptions, topics []string,
-	messageCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
+	messageCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
 	mtc := &multiTopicConsumer{
 		options:      options,
 		messageCh:    messageCh,
 		consumers:    make(map[string]Consumer, len(topics)),
 		closeCh:      make(chan struct{}),
 		dlq:          dlq,
+		rlq:          rlq,
 		log:          &log.Entry{},
 		consumerName: options.Name,
 	}
 
 	var errs error
-	for ce := range subscriber(client, topics, options, messageCh, dlq) {
+	for ce := range subscriber(client, topics, options, messageCh, dlq, rlq) {
 		if ce.err != nil {
 			errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
 		} else {
@@ -134,6 +136,15 @@ func (c *multiTopicConsumer) AckID(msgID MessageID) {
 	mid.Ack()
 }
 
+func (c *multiTopicConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+	consumer, ok := c.consumers[msg.Topic()]
+	if !ok {
+		c.log.Warnf("consumer of topic %s not exist unexpectedly", msg.Topic())
+		return
+	}
+	consumer.ReconsumeLater(msg, delay)
+}
+
 func (c *multiTopicConsumer) Nack(msg Message) {
 	c.NackID(msg.ID())
 }
@@ -166,6 +177,7 @@ func (c *multiTopicConsumer) Close() {
 		wg.Wait()
 		close(c.closeCh)
 		c.dlq.close()
+		c.rlq.close()
 		consumersClosed.Inc()
 	})
 }
diff --git a/pulsar/consumer_regex.go b/pulsar/consumer_regex.go
index e0fdbcb..13ef600 100644
--- a/pulsar/consumer_regex.go
+++ b/pulsar/consumer_regex.go
@@ -40,6 +40,7 @@ const (
 type regexConsumer struct {
 	client *client
 	dlq    *dlqRouter
+	rlq    *retryRouter
 
 	options ConsumerOptions
 
@@ -64,10 +65,11 @@ type regexConsumer struct {
 }
 
 func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, pattern *regexp.Regexp,
-	msgCh chan ConsumerMessage, dlq *dlqRouter) (Consumer, error) {
+	msgCh chan ConsumerMessage, dlq *dlqRouter, rlq *retryRouter) (Consumer, error) {
 	rc := &regexConsumer{
 		client:    c,
 		dlq:       dlq,
+		rlq:       rlq,
 		options:   opts,
 		messageCh: msgCh,
 
@@ -90,7 +92,7 @@ func newRegexConsumer(c *client, opts ConsumerOptions, tn *internal.TopicName, p
 	}
 
 	var errs error
-	for ce := range subscriber(c, topics, opts, msgCh, dlq) {
+	for ce := range subscriber(c, topics, opts, msgCh, dlq, rlq) {
 		if ce.err != nil {
 			errs = pkgerrors.Wrapf(ce.err, "unable to subscribe to topic=%s", ce.topic)
 		} else {
@@ -163,6 +165,10 @@ func (c *regexConsumer) Ack(msg Message) {
 	c.AckID(msg.ID())
 }
 
+func (c *regexConsumer) ReconsumeLater(msg Message, delay time.Duration) {
+	c.log.Warnf("regexp consumer not support ReconsumeLater yet.")
+}
+
 // Ack the consumption of a single message, identified by its MessageID
 func (c *regexConsumer) AckID(msgID MessageID) {
 	mid, ok := toTrackingMessageID(msgID)
@@ -215,6 +221,7 @@ func (c *regexConsumer) Close() {
 		}
 		wg.Wait()
 		c.dlq.close()
+		c.rlq.close()
 		consumersClosed.Inc()
 	})
 }
@@ -253,7 +260,7 @@ func (c *regexConsumer) monitor() {
 			}
 		case topics := <-c.subscribeCh:
 			if len(topics) > 0 && !c.closed() {
-				c.subscribe(topics, c.dlq)
+				c.subscribe(topics, c.dlq, c.rlq)
 			}
 		case topics := <-c.unsubscribeCh:
 			if len(topics) > 0 && !c.closed() {
@@ -298,12 +305,12 @@ func (c *regexConsumer) knownTopics() []string {
 	return topics
 }
 
-func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter) {
+func (c *regexConsumer) subscribe(topics []string, dlq *dlqRouter, rlq *retryRouter) {
 	if log.GetLevel() == log.DebugLevel {
 		c.log.WithField("topics", topics).Debug("subscribe")
 	}
 	consumers := make(map[string]Consumer, len(topics))
-	for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq) {
+	for ce := range subscriber(c.client, topics, c.options, c.messageCh, dlq, rlq) {
 		if ce.err != nil {
 			c.log.Warnf("Failed to subscribe to topic=%s", ce.topic)
 		} else {
@@ -359,7 +366,7 @@ type consumerError struct {
 }
 
 func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan ConsumerMessage,
-	dlq *dlqRouter) <-chan consumerError {
+	dlq *dlqRouter, rlq *retryRouter) <-chan consumerError {
 	consumerErrorCh := make(chan consumerError, len(topics))
 	var wg sync.WaitGroup
 	wg.Add(len(topics))
@@ -371,7 +378,7 @@ func subscriber(c *client, topics []string, opts ConsumerOptions, ch chan Consum
 	for _, t := range topics {
 		go func(topic string) {
 			defer wg.Done()
-			c, err := newInternalConsumer(c, opts, topic, ch, dlq, true)
+			c, err := newInternalConsumer(c, opts, topic, ch, dlq, rlq, true)
 			consumerErrorCh <- consumerError{
 				err:      err,
 				topic:    topic,
diff --git a/pulsar/consumer_regex_test.go b/pulsar/consumer_regex_test.go
index c45c40a..e4acf5f 100644
--- a/pulsar/consumer_regex_test.go
+++ b/pulsar/consumer_regex_test.go
@@ -154,7 +154,8 @@ func runRegexConsumerDiscoverPatternAll(t *testing.T, c Client, namespace string
 	}
 
 	dlq, _ := newDlqRouter(c.(*client), nil)
-	consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq)
+	rlq, _ := newRetryRouter(c.(*client), nil, false)
+	consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -202,7 +203,8 @@ func runRegexConsumerDiscoverPatternFoo(t *testing.T, c Client, namespace string
 	}
 
 	dlq, _ := newDlqRouter(c.(*client), nil)
-	consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq)
+	rlq, _ := newRetryRouter(c.(*client), nil, false)
+	consumer, err := newRegexConsumer(c.(*client), opts, tn, pattern, make(chan ConsumerMessage, 1), dlq, rlq)
 	if err != nil {
 		t.Fatal(err)
 	}
diff --git a/pulsar/consumer_test.go b/pulsar/consumer_test.go
index a3a22b6..97d2de2 100644
--- a/pulsar/consumer_test.go
+++ b/pulsar/consumer_test.go
@@ -973,8 +973,8 @@ func TestDLQ(t *testing.T) {
 		NackRedeliveryDelay: 1 * time.Second,
 		Type:                Shared,
 		DLQ: &DLQPolicy{
-			MaxDeliveries: 3,
-			Topic:         dlqTopic,
+			MaxDeliveries:   3,
+			DeadLetterTopic: dlqTopic,
 		},
 	})
 	assert.Nil(t, err)
@@ -1076,8 +1076,8 @@ func TestDLQMultiTopics(t *testing.T) {
 		NackRedeliveryDelay: 1 * time.Second,
 		Type:                Shared,
 		DLQ: &DLQPolicy{
-			MaxDeliveries: 3,
-			Topic:         dlqTopic,
+			MaxDeliveries:   3,
+			DeadLetterTopic: dlqTopic,
 		},
 	})
 	assert.Nil(t, err)
@@ -1146,6 +1146,205 @@ func TestDLQMultiTopics(t *testing.T) {
 	assert.Nil(t, msg)
 }
 
+func TestRLQ(t *testing.T) {
+	topic := "persistent://public/default/" + newTopicName()
+	subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+	maxRedeliveries := 2
+	N := 100
+	ctx := context.Background()
+
+	client, err := NewClient(ClientOptions{URL: lookupURL})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// 1. Pre-produce N messages
+	producer, err := client.CreateProducer(ProducerOptions{Topic: topic})
+	assert.Nil(t, err)
+	defer producer.Close()
+
+	for i := 0; i < N; i++ {
+		_, err = producer.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MESSAGE_%d", i))})
+		assert.Nil(t, err)
+	}
+
+	// 2. Create consumer on the Retry Topic to reconsume N messages (maxRedeliveries+1) times
+	rlqConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		SubscriptionName:            subName,
+		Type:                        Shared,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+		DLQ: &DLQPolicy{
+			MaxDeliveries: uint32(maxRedeliveries),
+		},
+		RetryEnable:         true,
+		NackRedeliveryDelay: 1 * time.Second,
+	})
+	assert.Nil(t, err)
+	defer rlqConsumer.Close()
+
+	rlqReceived := 0
+	for rlqReceived < N*(maxRedeliveries+1) {
+		msg, err := rlqConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+		rlqReceived++
+	}
+	fmt.Println("retry consumed:", rlqReceived) // 300
+
+	// No more messages on the Retry Topic
+	rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer rlqCancel()
+	msg, err := rlqConsumer.Receive(rlqCtx)
+	assert.Error(t, err)
+	assert.Nil(t, msg)
+
+	// 3. Create consumer on the DLQ topic to verify the routing
+	dlqConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "persistent://public/default/" + subName + "-DLQ",
+		SubscriptionName:            subName,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	defer dlqConsumer.Close()
+
+	dlqReceived := 0
+	for dlqReceived < N {
+		msg, err := dlqConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		dlqConsumer.Ack(msg)
+		dlqReceived++
+	}
+	fmt.Println("dlq received:", dlqReceived) // 100
+
+	// No more messages on the DLQ Topic
+	dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer dlqCancel()
+	msg, err = dlqConsumer.Receive(dlqCtx)
+	assert.Error(t, err)
+	assert.Nil(t, msg)
+
+	// 4. No more messages for same subscription
+	checkConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       topic,
+		SubscriptionName:            subName,
+		Type:                        Shared,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	defer checkConsumer.Close()
+
+	checkCtx, checkCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer checkCancel()
+	checkMsg, err := checkConsumer.Receive(checkCtx)
+	assert.Error(t, err)
+	assert.Nil(t, checkMsg)
+}
+
+func TestRLQMultiTopics(t *testing.T) {
+	now := time.Now().Unix()
+	topic01 := fmt.Sprintf("persistent://public/default/topic-%d-1", now)
+	topic02 := fmt.Sprintf("persistent://public/default/topic-%d-2", now)
+	topics := []string{topic01, topic02}
+
+	subName := fmt.Sprintf("sub01-%d", time.Now().Unix())
+	maxRedeliveries := 2
+	N := 100
+	ctx := context.Background()
+
+	client, err := NewClient(ClientOptions{URL: lookupURL})
+	assert.Nil(t, err)
+	defer client.Close()
+
+	// subscribe multi topics with Retry Topics
+	rlqConsumer, err := client.Subscribe(ConsumerOptions{
+		Topics:                      topics,
+		SubscriptionName:            subName,
+		Type:                        Shared,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+		DLQ:                         &DLQPolicy{MaxDeliveries: uint32(maxRedeliveries)},
+		RetryEnable:                 true,
+		NackRedeliveryDelay:         1 * time.Second,
+	})
+	assert.Nil(t, err)
+	defer rlqConsumer.Close()
+
+	// subscribe DLQ Topic
+	dlqConsumer, err := client.Subscribe(ConsumerOptions{
+		Topic:                       "persistent://public/default/" + subName + "-DLQ",
+		SubscriptionName:            subName,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	defer dlqConsumer.Close()
+
+	// create multi producers
+	producer01, err := client.CreateProducer(ProducerOptions{Topic: topic01})
+	assert.Nil(t, err)
+	defer producer01.Close()
+
+	producer02, err := client.CreateProducer(ProducerOptions{Topic: topic02})
+	assert.Nil(t, err)
+	defer producer02.Close()
+
+	// 1. Pre-produce N messages for every topic
+	for i := 0; i < N; i++ {
+		_, err = producer01.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_01_%d", i))})
+		assert.Nil(t, err)
+		_, err = producer02.Send(ctx, &ProducerMessage{Payload: []byte(fmt.Sprintf("MSG_02_%d", i))})
+		assert.Nil(t, err)
+	}
+
+	// 2. Create consumer on the Retry Topics to reconsume 2*N messages (maxRedeliveries+1) times
+	rlqReceived := 0
+	for rlqReceived < 2*N*(maxRedeliveries+1) {
+		msg, err := rlqConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		rlqConsumer.ReconsumeLater(msg, 1*time.Second)
+		rlqReceived++
+	}
+	fmt.Println("retry consumed:", rlqReceived) // 600
+
+	// No more messages on the Retry Topic
+	rlqCtx, rlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer rlqCancel()
+	msg, err := rlqConsumer.Receive(rlqCtx)
+	assert.Error(t, err)
+	assert.Nil(t, msg)
+
+	// 3. Create consumer on the DLQ topic to verify the routing
+	dlqReceived := 0
+	for dlqReceived < 2*N {
+		msg, err := dlqConsumer.Receive(ctx)
+		assert.Nil(t, err)
+		dlqConsumer.Ack(msg)
+		dlqReceived++
+	}
+	fmt.Println("dlq received:", dlqReceived) // 200
+
+	// No more messages on the DLQ Topic
+	dlqCtx, dlqCancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer dlqCancel()
+	msg, err = dlqConsumer.Receive(dlqCtx)
+	assert.Error(t, err)
+	assert.Nil(t, msg)
+
+	// 4. No more messages for same subscription
+	checkConsumer, err := client.Subscribe(ConsumerOptions{
+		Topics:                      []string{topic01, topic02},
+		SubscriptionName:            subName,
+		Type:                        Shared,
+		SubscriptionInitialPosition: SubscriptionPositionEarliest,
+	})
+	assert.Nil(t, err)
+	defer checkConsumer.Close()
+
+	timeoutCtx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
+	defer cancel()
+	checkMsg, err := checkConsumer.Receive(timeoutCtx)
+	assert.Error(t, err)
+	assert.Nil(t, checkMsg)
+}
+
 func TestGetDeliveryCount(t *testing.T) {
 	client, err := NewClient(ClientOptions{
 		URL: lookupURL,
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index d6e7b30..68b263b 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -46,13 +46,13 @@ func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {
 			return nil, errors.New("DLQPolicy.MaxDeliveries needs to be > 0")
 		}
 
-		if policy.Topic == "" {
+		if policy.DeadLetterTopic == "" {
 			return nil, errors.New("DLQPolicy.Topic needs to be set to a valid topic name")
 		}
 
 		r.messageCh = make(chan ConsumerMessage)
 		r.closeCh = make(chan interface{}, 1)
-		r.log = log.WithField("dlq-topic", policy.Topic)
+		r.log = log.WithField("dlq-topic", policy.DeadLetterTopic)
 		go r.run()
 	}
 	return r, nil
@@ -132,7 +132,7 @@ func (r *dlqRouter) getProducer() Producer {
 	backoff := &internal.Backoff{}
 	for {
 		producer, err := r.client.CreateProducer(ProducerOptions{
-			Topic:                   r.policy.Topic,
+			Topic:                   r.policy.DeadLetterTopic,
 			CompressionType:         LZ4,
 			BatchingMaxPublishDelay: 100 * time.Millisecond,
 		})
diff --git a/pulsar/impl_message.go b/pulsar/impl_message.go
index d670a37..d796969 100644
--- a/pulsar/impl_message.go
+++ b/pulsar/impl_message.go
@@ -18,6 +18,7 @@
 package pulsar
 
 import (
+	"fmt"
 	"math/big"
 	"strings"
 	"sync"
@@ -103,6 +104,10 @@ func (id messageID) Serialize() []byte {
 	return data
 }
 
+func (id messageID) String() string {
+	return fmt.Sprintf("%d:%d:%d", id.ledgerID, id.entryID, id.partitionIdx)
+}
+
 func deserializeMessageID(data []byte) (MessageID, error) {
 	msgID := &pb.MessageIdData{}
 	err := proto.Unmarshal(data, msgID)
diff --git a/pulsar/internal/connection.go b/pulsar/internal/connection.go
index 1bfec52..0d2baab 100644
--- a/pulsar/internal/connection.go
+++ b/pulsar/internal/connection.go
@@ -176,6 +176,7 @@ type connection struct {
 	closeCh            chan interface{}
 	writeRequestsCh    chan Buffer
 
+	pendingLock sync.Mutex
 	pendingReqs map[uint64]*request
 	listeners   map[uint64]ConnectionListener
 
@@ -356,24 +357,35 @@ func (c *connection) run() {
 	defer func() {
 		// all the accesses to the pendingReqs should be happened in this run loop thread,
 		// including the final cleanup, to avoid the issue https://github.com/apache/pulsar-client-go/issues/239
+		c.pendingLock.Lock()
 		for id, req := range c.pendingReqs {
 			req.callback(nil, errors.New("connection closed"))
 			delete(c.pendingReqs, id)
 		}
+		c.pendingLock.Unlock()
 		c.Close()
 	}()
 
+	go func() {
+		for {
+			select {
+			case <-c.closeCh:
+				return
+
+			case req := <-c.incomingRequestsCh:
+				if req == nil {
+					return // TODO: this never gonna be happen
+				}
+				c.internalSendRequest(req)
+			}
+		}
+	}()
+
 	for {
 		select {
 		case <-c.closeCh:
 			return
 
-		case req := <-c.incomingRequestsCh:
-			if req == nil {
-				return
-			}
-			c.internalSendRequest(req)
-
 		case cmd := <-c.incomingCmdCh:
 			c.internalReceivedCommand(cmd.cmd, cmd.headersAndPayload)
 
@@ -556,33 +568,41 @@ func (c *connection) SendRequestNoWait(req *pb.BaseCommand) {
 }
 
 func (c *connection) internalSendRequest(req *request) {
+	c.pendingLock.Lock()
 	if req.id != nil {
 		c.pendingReqs[*req.id] = req
 	}
+	c.pendingLock.Unlock()
 	c.writeCommand(req.cmd)
 }
 
 func (c *connection) handleResponse(requestID uint64, response *pb.BaseCommand) {
+	c.pendingLock.Lock()
 	request, ok := c.pendingReqs[requestID]
 	if !ok {
 		c.log.Warnf("Received unexpected response for request %d of type %s", requestID, response.Type)
+		c.pendingLock.Unlock()
 		return
 	}
 
 	delete(c.pendingReqs, requestID)
+	c.pendingLock.Unlock()
 	request.callback(response, nil)
 }
 
 func (c *connection) handleResponseError(serverError *pb.CommandError) {
 	requestID := serverError.GetRequestId()
+	c.pendingLock.Lock()
 	request, ok := c.pendingReqs[requestID]
 	if !ok {
 		c.log.Warnf("Received unexpected error response for request %d of type %s",
 			requestID, serverError.GetError())
+		c.pendingLock.Unlock()
 		return
 	}
 
 	delete(c.pendingReqs, requestID)
+	c.pendingLock.Unlock()
 
 	errMsg := fmt.Sprintf("server error: %s: %s", serverError.GetError(), serverError.GetMessage())
 	request.callback(nil, errors.New(errMsg))
diff --git a/pulsar/internal/topic_name.go b/pulsar/internal/topic_name.go
index 78d2abb..68b7ea1 100644
--- a/pulsar/internal/topic_name.go
+++ b/pulsar/internal/topic_name.go
@@ -26,8 +26,9 @@ import (
 
 // TopicName abstract a struct contained in a Topic
 type TopicName struct {
-	Name      string
+	Domain    string
 	Namespace string
+	Name      string
 	Partition int
 }
 
@@ -68,6 +69,7 @@ func ParseTopicName(topic string) (*TopicName, error) {
 	if domain != "persistent" && domain != "non-persistent" {
 		return nil, errors.New("Invalid topic domain: " + domain)
 	}
+	tn.Domain = domain
 
 	rest := parts[1]
 	var err error
diff --git a/pulsar/producer_impl.go b/pulsar/producer_impl.go
index 2a2b63e..458d2b5 100644
--- a/pulsar/producer_impl.go
+++ b/pulsar/producer_impl.go
@@ -286,8 +286,8 @@ func (p *producer) Flush() error {
 }
 
 func (p *producer) Close() {
-	p.RLock()
-	defer p.RUnlock()
+	p.Lock()
+	defer p.Unlock()
 	if p.ticker != nil {
 		p.ticker.Stop()
 		close(p.tickerStop)
diff --git a/pulsar/dlq_router.go b/pulsar/retry_router.go
similarity index 50%
copy from pulsar/dlq_router.go
copy to pulsar/retry_router.go
index d6e7b30..b16417d 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/retry_router.go
@@ -26,95 +26,84 @@ import (
 	log "github.com/sirupsen/logrus"
 )
 
-type dlqRouter struct {
+const (
+	DlqTopicSuffix    = "-DLQ"
+	RetryTopicSuffix  = "-RETRY"
+	MaxReconsumeTimes = 16
+
+	SysPropertyDelayTime       = "DELAY_TIME"
+	SysPropertyRealTopic       = "REAL_TOPIC"
+	SysPropertyRetryTopic      = "RETRY_TOPIC"
+	SysPropertyReconsumeTimes  = "RECONSUMETIMES"
+	SysPropertyOriginMessageID = "ORIGIN_MESSAGE_IDY_TIME"
+)
+
+type RetryMessage struct {
+	producerMsg ProducerMessage
+	consumerMsg ConsumerMessage
+}
+
+type retryRouter struct {
 	client    Client
 	producer  Producer
 	policy    *DLQPolicy
-	messageCh chan ConsumerMessage
+	messageCh chan RetryMessage
 	closeCh   chan interface{}
 	log       *log.Entry
 }
 
-func newDlqRouter(client Client, policy *DLQPolicy) (*dlqRouter, error) {
-	r := &dlqRouter{
+func newRetryRouter(client Client, policy *DLQPolicy, retryEnabled bool) (*retryRouter, error) {
+	r := &retryRouter{
 		client: client,
 		policy: policy,
 	}
 
-	if policy != nil {
+	if policy != nil && retryEnabled {
 		if policy.MaxDeliveries <= 0 {
 			return nil, errors.New("DLQPolicy.MaxDeliveries needs to be > 0")
 		}
 
-		if policy.Topic == "" {
-			return nil, errors.New("DLQPolicy.Topic needs to be set to a valid topic name")
+		if policy.RetryLetterTopic == "" {
+			return nil, errors.New("DLQPolicy.RetryLetterTopic needs to be set to a valid topic name")
 		}
 
-		r.messageCh = make(chan ConsumerMessage)
+		r.messageCh = make(chan RetryMessage)
 		r.closeCh = make(chan interface{}, 1)
-		r.log = log.WithField("dlq-topic", policy.Topic)
+		r.log = log.WithField("rlq-topic", policy.RetryLetterTopic)
 		go r.run()
 	}
 	return r, nil
 }
 
-func (r *dlqRouter) shouldSendToDlq(cm *ConsumerMessage) bool {
-	if r.policy == nil {
-		return false
-	}
-
-	msg := cm.Message.(*message)
-	r.log.WithField("count", msg.redeliveryCount).
-		WithField("max", r.policy.MaxDeliveries).
-		WithField("msgId", msg.msgID).
-		Debug("Should route to DLQ?")
-
-	// We use >= here because we're comparing the number of re-deliveries with
-	// the number of deliveries. So:
-	//  * the user specifies that wants to process a message up to 10 times.
-	//  * the first time, the redeliveryCount == 0, then 1 and so on
-	//  * when we receive the message and redeliveryCount == 10, it means
-	//    that the application has already got (and Nack())  the message 10
-	//    times, so this time we should just go to DLQ.
-
-	return msg.redeliveryCount >= r.policy.MaxDeliveries
-}
-
-func (r *dlqRouter) Chan() chan ConsumerMessage {
+func (r *retryRouter) Chan() chan RetryMessage {
 	return r.messageCh
 }
 
-func (r *dlqRouter) run() {
+func (r *retryRouter) run() {
 	for {
 		select {
-		case cm := <-r.messageCh:
-			r.log.WithField("msgID", cm.ID()).Debug("Got message for DLQ")
+		case rm := <-r.messageCh:
+			r.log.WithField("msgID", rm.consumerMsg.ID()).Debug("Got message for RLQ")
 			producer := r.getProducer()
 
-			msg := cm.Message.(*message)
-			msgID := msg.ID()
-			producer.SendAsync(context.Background(), &ProducerMessage{
-				Payload:             msg.Payload(),
-				Key:                 msg.Key(),
-				Properties:          msg.Properties(),
-				EventTime:           msg.EventTime(),
-				ReplicationClusters: msg.replicationClusters,
-			}, func(MessageID, *ProducerMessage, error) {
-				r.log.WithField("msgID", msgID).Debug("Sent message to DLQ")
-				cm.Consumer.AckID(msgID)
+			msgID := rm.consumerMsg.ID()
+			producer.SendAsync(context.Background(), &rm.producerMsg, func(MessageID, *ProducerMessage, error) {
+				// TODO: if router produce failed, should Nack this message
+				r.log.WithField("msgID", msgID).Debug("Sent message to RLQ")
+				rm.consumerMsg.Consumer.AckID(msgID)
 			})
 
 		case <-r.closeCh:
 			if r.producer != nil {
 				r.producer.Close()
 			}
-			r.log.Debug("Closed DLQ router")
+			r.log.Debug("Closed RLQ router")
 			return
 		}
 	}
 }
 
-func (r *dlqRouter) close() {
+func (r *retryRouter) close() {
 	// Attempt to write on the close channel, without blocking
 	select {
 	case r.closeCh <- nil:
@@ -122,7 +111,7 @@ func (r *dlqRouter) close() {
 	}
 }
 
-func (r *dlqRouter) getProducer() Producer {
+func (r *retryRouter) getProducer() Producer {
 	if r.producer != nil {
 		// Producer was already initialized
 		return r.producer
@@ -132,13 +121,13 @@ func (r *dlqRouter) getProducer() Producer {
 	backoff := &internal.Backoff{}
 	for {
 		producer, err := r.client.CreateProducer(ProducerOptions{
-			Topic:                   r.policy.Topic,
+			Topic:                   r.policy.RetryLetterTopic,
 			CompressionType:         LZ4,
 			BatchingMaxPublishDelay: 100 * time.Millisecond,
 		})
 
 		if err != nil {
-			r.log.WithError(err).Error("Failed to create DLQ producer")
+			r.log.WithError(err).Error("Failed to create RLQ producer")
 			time.Sleep(backoff.Next())
 			continue
 		} else {