You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/10/21 02:51:24 UTC
[pulsar-client-go] branch master updated: feat: support limit the
retry number of reconnectToBroker (#360)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 44b8b4e feat: support limit the retry number of reconnectToBroker (#360)
44b8b4e is described below
commit 44b8b4efc18b15d823e316ed0b7d92768dfa7ea7
Author: jony montana <hy...@gmail.com>
AuthorDate: Wed Oct 21 10:51:16 2020 +0800
feat: support limit the retry number of reconnectToBroker (#360)
Signed-off-by: jonyhy96 <hy...@gmail.com>
Fixes #257
### Motivation
Once the connection is closed the reconnectToBroker logic of both producer and consumer will try to reconnect to the broker infinatly.
### Modifications
Add a field in the Options represent the max number of retries and not break current behavior if this field is not fullfilled.
---
pulsar/consumer.go | 3 +++
pulsar/consumer_impl.go | 1 +
pulsar/consumer_partition.go | 19 +++++++++++++++++--
pulsar/producer.go | 3 +++
pulsar/producer_partition.go | 18 ++++++++++++++++--
5 files changed, 40 insertions(+), 4 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 41ccb91..e9f803d 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -150,6 +150,9 @@ type ConsumerOptions struct {
// A chain of interceptors, These interceptors will be called at some points defined in ConsumerInterceptor interface.
Interceptors ConsumerInterceptors
+
+ // MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
+ MaxReconnectToBroker *uint
}
// Consumer is an interface that abstracts behavior of Pulsar's consumer
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index 5cb1dd2..ed2af4b 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -309,6 +309,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
subscriptionMode: durable,
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
+ maxReconnectToBroker: c.options.MaxReconnectToBroker,
keySharedPolicy: c.options.KeySharedPolicy,
}
cons, err := newPartitionConsumer(c, c.client, opts, c.messageCh, c.dlq)
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index 8de3f38..2d5ff6b 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -120,6 +120,7 @@ type partitionConsumerOpts struct {
readCompacted bool
disableForceTopicCreation bool
interceptors ConsumerInterceptors
+ maxReconnectToBroker *uint
keySharedPolicy *KeySharedPolicy
}
@@ -815,8 +816,18 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}
func (pc *partitionConsumer) reconnectToBroker() {
- backoff := internal.Backoff{}
- for {
+ var (
+ maxRetry int
+ backoff = internal.Backoff{}
+ )
+
+ if pc.options.maxReconnectToBroker == nil {
+ maxRetry = -1
+ } else {
+ maxRetry = int(*pc.options.maxReconnectToBroker)
+ }
+
+ for maxRetry != 0 {
if pc.state != consumerReady {
// Consumer is already closing
return
@@ -832,6 +843,10 @@ func (pc *partitionConsumer) reconnectToBroker() {
pc.log.Info("Reconnected consumer to broker")
return
}
+
+ if maxRetry > 0 {
+ maxRetry--
+ }
}
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index cb38e3e..a6cee38 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -138,6 +138,9 @@ type ProducerOptions struct {
// A chain of interceptors, These interceptors will be called at some points defined in ProducerInterceptor interface
Interceptors ProducerInterceptors
+
+ // MaxReconnectToBroker set the maximum retry number of reconnectToBroker. (default: ultimate)
+ MaxReconnectToBroker *uint
}
// Producer is used to publish messages on a topic
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index d245c33..399bcfe 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -243,8 +243,18 @@ func (p *partitionProducer) ConnectionClosed() {
}
func (p *partitionProducer) reconnectToBroker() {
- backoff := internal.Backoff{}
- for {
+ var (
+ maxRetry int
+ backoff = internal.Backoff{}
+ )
+
+ if p.options.MaxReconnectToBroker == nil {
+ maxRetry = -1
+ } else {
+ maxRetry = int(*p.options.MaxReconnectToBroker)
+ }
+
+ for maxRetry != 0 {
if atomic.LoadInt32(&p.state) != producerReady {
// Producer is already closing
return
@@ -260,6 +270,10 @@ func (p *partitionProducer) reconnectToBroker() {
p.log.WithField("cnx", p.cnx.ID()).Info("Reconnected producer to broker")
return
}
+
+ if maxRetry > 0 {
+ maxRetry--
+ }
}
}