You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/10/21 02:51:24 UTC

[pulsar-client-go] branch master updated: feat: support limit the retry number of reconnectToBroker (#360)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 44b8b4e  feat: support limit the retry number of reconnectToBroker (#360)
44b8b4e is described below

commit 44b8b4efc18b15d823e316ed0b7d92768dfa7ea7
Author: jony montana <hy...@gmail.com>
AuthorDate: Wed Oct 21 10:51:16 2020 +0800

    feat: support limit the retry number of reconnectToBroker (#360)
    
    Signed-off-by: jonyhy96 <hy...@gmail.com>
    
    Fixes #257
    
    ### Motivation
    
    Once the connection is closed the reconnectToBroker logic of both producer and consumer will try to reconnect to the broker infinatly.
    
    ### Modifications
    
    Add a field in the Options represent the max number of retries and not break current behavior if this field is not fullfilled.
---
 pulsar/consumer.go           |  3 +++
 pulsar/consumer_impl.go      |  1 +
 pulsar/consumer_partition.go | 19 +++++++++++++++++--
 pulsar/producer.go           |  3 +++
 pulsar/producer_partition.go | 18 ++++++++++++++++--
 5 files changed, 40 insertions(+), 4 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 41ccb91..e9f803d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -150,6 +150,9 @@ type ConsumerOptions struct {
 
 	// A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface.
 	Interceptors ConsumerInterceptors
+
+	// MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
+	MaxReconnectToBroker *uint
 }
 
 // Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5cb1dd2..ed2af4b 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -309,6 +309,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 				subscriptionMode:           durable,
 				readCompacted:              c.options.ReadCompacted,
 				interceptors:               c.options.Interceptors,
+				maxReconnectToBroker:       c.options.MaxReconnectToBroker,
 				keySharedPolicy:            c.options.KeySharedPolicy,
 			}
 			cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 8de3f38..2d5ff6b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -120,6 +120,7 @@ type partitionConsumerOpts struct {
 	readCompacted              bool
 	disableForceTopicCreation  bool
 	interceptors               ConsumerInterceptors
+	maxReconnectToBroker       *uint
 	keySharedPolicy            *KeySharedPolicy
 }
 
@@ -815,8 +816,18 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
 }
 
 func (pc *partitionConsumer) reconnectToBroker() {
-	backoff := internal.Backoff{}
-	for {
+	var (
+		maxRetry int
+		backoff  = internal.Backoff{}
+	)
+
+	if pc.options.maxReconnectToBroker == nil {
+		maxRetry = -1
+	} else {
+		maxRetry = int(*pc.options.maxReconnectToBroker)
+	}
+
+	for maxRetry != 0 {
 		if pc.state != consumerReady {
 			// Consumer is already closing
 			return
@@ -832,6 +843,10 @@ func (pc *partitionConsumer) reconnectToBroker() {
 			pc.log.Info("Reconnected consumer to broker")
 			return
 		}
+
+		if maxRetry > 0 {
+			maxRetry--
+		}
 	}
 }
 
diff --git a/pulsar/producer.go b/pulsar/producer.go
index cb38e3e..a6cee38 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -138,6 +138,9 @@ type ProducerOptions struct {
 
 	// A chain of interceptors, These interceptors will be called at some points defined in ProducerInterceptor interface
 	Interceptors ProducerInterceptors
+
+	// MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
+	MaxReconnectToBroker *uint
 }
 
 // Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d245c33..399bcfe 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -243,8 +243,18 @@ func (p *partitionProducer) ConnectionClosed() {
 }
 
 func (p *partitionProducer) reconnectToBroker() {
-	backoff := internal.Backoff{}
-	for {
+	var (
+		maxRetry int
+		backoff  = internal.Backoff{}
+	)
+
+	if p.options.MaxReconnectToBroker == nil {
+		maxRetry = -1
+	} else {
+		maxRetry = int(*p.options.MaxReconnectToBroker)
+	}
+
+	for maxRetry != 0 {
 		if atomic.LoadInt32(&p.state) != producerReady {
 			// Producer is already closing
 			return
@@ -260,6 +270,10 @@ func (p *partitionProducer) reconnectToBroker() {
 			p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
 			return
 		}
+
+		if maxRetry > 0 {
+			maxRetry--
+		}
 	}
 }