You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/29 02:40:30 UTC

[pulsar-client-go] branch master updated: Parameterize the reconnection option (#853)

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 6a8847f  Parameterize the reconnection option (#853)
6a8847f is described below

commit 6a8847f34dfdef27740ac619ff9aeb3fd1e85afb
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Thu Sep 29 10:40:24 2022 +0800

    Parameterize the reconnection option (#853)
---
 pulsar/consumer.go                |  6 ++++++
 pulsar/consumer_impl.go           |  1 +
 pulsar/consumer_partition.go      | 24 ++++++++++++++++--------
 pulsar/dlq_router.go              |  2 +-
 pulsar/internal/backoff.go        | 25 +++++++++++++++----------
 pulsar/internal/backoff_test.go   |  6 +++---
 pulsar/internal/http_client.go    |  2 +-
 pulsar/internal/rpc_client.go     |  2 +-
 pulsar/negative_backoff_policy.go |  2 +-
 pulsar/producer.go                |  6 ++++++
 pulsar/producer_partition.go      | 23 ++++++++++++++---------
 pulsar/retry_router.go            |  2 +-
 12 files changed, 66 insertions(+), 35 deletions(-)

diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 70d67ba..3f756f8 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -20,6 +20,8 @@ package pulsar
 import (
 	"context"
 	"time"
+
+	"github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 // ConsumerMessage represents a pair of a Consumer and Message.
@@ -171,6 +173,10 @@ type ConsumerOptions struct {
 	// MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate)
 	MaxReconnectToBroker *uint
 
+	// BackoffPolicy parameterize the following options in the reconnection logic to
+	// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
+	BackoffPolicy internal.BackoffPolicy
+
 	// Decryption represents the encryption related fields required by the consumer to decrypt a message.
 	Decryption *MessageDecryptionInfo
 
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e6135bf..517824d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
 				readCompacted:              c.options.ReadCompacted,
 				interceptors:               c.options.Interceptors,
 				maxReconnectToBroker:       c.options.MaxReconnectToBroker,
+				backoffPolicy:              c.options.BackoffPolicy,
 				keySharedPolicy:            c.options.KeySharedPolicy,
 				schema:                     c.options.Schema,
 				decryption:                 c.options.Decryption,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cc9e710..7ddff5e 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -102,6 +102,7 @@ type partitionConsumerOpts struct {
 	disableForceTopicCreation  bool
 	interceptors               ConsumerInterceptors
 	maxReconnectToBroker       *uint
+	backoffPolicy              internal.BackoffPolicy
 	keySharedPolicy            *KeySharedPolicy
 	schema                     Schema
 	decryption                 *MessageDecryptionInfo
@@ -1143,10 +1144,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
 }
 
 func (pc *partitionConsumer) reconnectToBroker() {
-	var (
-		maxRetry int
-		backoff  = internal.Backoff{}
-	)
+	var maxRetry int
 
 	if pc.options.maxReconnectToBroker == nil {
 		maxRetry = -1
@@ -1161,9 +1159,19 @@ func (pc *partitionConsumer) reconnectToBroker() {
 			return
 		}
 
-		d := backoff.Next()
-		pc.log.Info("Reconnecting to broker in ", d)
-		time.Sleep(d)
+		var (
+			delayReconnectTime time.Duration
+			defaultBackoff     = internal.DefaultBackoff{}
+		)
+
+		if pc.options.backoffPolicy == nil {
+			delayReconnectTime = defaultBackoff.Next()
+		} else {
+			delayReconnectTime = pc.options.backoffPolicy.Next()
+		}
+
+		pc.log.Info("Reconnecting to broker in ", delayReconnectTime)
+		time.Sleep(delayReconnectTime)
 
 		err := pc.grabConn()
 		if err == nil {
@@ -1183,7 +1191,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
 			maxRetry--
 		}
 		pc.metrics.ConsumersReconnectFailure.Inc()
-		if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+		if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
 			pc.metrics.ConsumersReconnectMaxRetry.Inc()
 		}
 	}
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 966bff1..000faaa 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -133,7 +133,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
 	}
 
 	// Retry to create producer indefinitely
-	backoff := &internal.Backoff{}
+	backoff := &internal.DefaultBackoff{}
 	for {
 		opt := r.policy.ProducerOptions
 		opt.Topic = r.policy.DeadLetterTopic
diff --git a/pulsar/internal/backoff.go b/pulsar/internal/backoff.go
index ff9b0bc..3284fb7 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/internal/backoff.go
@@ -26,20 +26,25 @@ func init() {
 	rand.Seed(time.Now().UnixNano())
 }
 
-// Backoff computes the delay before retrying an action.
+// BackoffPolicy parameterize the following options in the reconnection logic to
+// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
+type BackoffPolicy interface {
+	Next() time.Duration
+}
+
+// DefaultBackoff computes the delay before retrying an action.
 // It uses an exponential backoff with jitter. The jitter represents up to 20 percents of the delay.
-type Backoff struct {
+type DefaultBackoff struct {
 	backoff time.Duration
 }
 
-const (
-	minBackoff       = 100 * time.Millisecond
-	maxBackoff       = 60 * time.Second
-	jitterPercentage = 0.2
-)
+const maxBackoff = 60 * time.Second
 
 // Next returns the delay to wait before next retry
-func (b *Backoff) Next() time.Duration {
+func (b *DefaultBackoff) Next() time.Duration {
+	minBackoff := 100 * time.Millisecond
+	jitterPercentage := 0.2
+
 	// Double the delay each time
 	b.backoff += b.backoff
 	if b.backoff.Nanoseconds() < minBackoff.Nanoseconds() {
@@ -52,7 +57,7 @@ func (b *Backoff) Next() time.Duration {
 	return b.backoff + time.Duration(jitter)
 }
 
-// IsMaxBackReached evaluates if the max number of retries is reached
-func (b *Backoff) IsMaxBackoffReached() bool {
+// IsMaxBackoffReached evaluates if the max number of retries is reached
+func (b *DefaultBackoff) IsMaxBackoffReached() bool {
 	return b.backoff >= maxBackoff
 }
diff --git a/pulsar/internal/backoff_test.go b/pulsar/internal/backoff_test.go
index ad6e764..e05ea29 100644
--- a/pulsar/internal/backoff_test.go
+++ b/pulsar/internal/backoff_test.go
@@ -25,14 +25,14 @@ import (
 )
 
 func TestBackoff_NextMinValue(t *testing.T) {
-	backoff := &Backoff{}
+	backoff := &DefaultBackoff{}
 	delay := backoff.Next()
 	assert.GreaterOrEqual(t, int64(delay), int64(100*time.Millisecond))
 	assert.LessOrEqual(t, int64(delay), int64(120*time.Millisecond))
 }
 
 func TestBackoff_NextExponentialBackoff(t *testing.T) {
-	backoff := &Backoff{}
+	backoff := &DefaultBackoff{}
 	previousDelay := backoff.Next()
 	// the last value before capping to the max value is 51.2 s (.1, .2, .4, .8, 1.6, 3.2, 6.4, 12.8, 25.6, 51.2)
 	for previousDelay < 51*time.Second {
@@ -47,7 +47,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) {
 }
 
 func TestBackoff_NextMaxValue(t *testing.T) {
-	backoff := &Backoff{}
+	backoff := &DefaultBackoff{}
 	var delay time.Duration
 	for delay < maxBackoff {
 		delay = backoff.Next()
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index c160623..8c494dc 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -148,7 +148,7 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str
 	if _, ok := err.(*url.Error); ok {
 		// We can retry this kind of requests over a connection error because they're
 		// not specific to a particular broker.
-		backoff := Backoff{100 * time.Millisecond}
+		backoff := DefaultBackoff{100 * time.Millisecond}
 		startTime := time.Now()
 		var retryTime time.Duration
 
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 24506ef..4ef2995 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -91,7 +91,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_
 	var host *url.URL
 	var rpcResult *RPCResult
 	startTime := time.Now()
-	backoff := Backoff{100 * time.Millisecond}
+	backoff := DefaultBackoff{100 * time.Millisecond}
 	// we can retry these requests because this kind of request is
 	// not specific to any particular broker
 	for time.Since(startTime) < c.requestTimeout {
diff --git a/pulsar/negative_backoff_policy.go b/pulsar/negative_backoff_policy.go
index be72bfa..5cd35bc 100644
--- a/pulsar/negative_backoff_policy.go
+++ b/pulsar/negative_backoff_policy.go
@@ -29,7 +29,7 @@ import (
 // > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
 // > from the backoff.
 type NackBackoffPolicy interface {
-	// The redeliveryCount indicates the number of times the message was redelivered.
+	// Next param redeliveryCount indicates the number of times the message was redelivered.
 	// We can get the redeliveryCount from the CommandMessage.
 	Next(redeliveryCount uint32) time.Duration
 }
diff --git a/pulsar/producer.go b/pulsar/producer.go
index fd68631..b4e43bd 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -20,6 +20,8 @@ package pulsar
 import (
 	"context"
 	"time"
+
+	"github.com/apache/pulsar-client-go/pulsar/internal"
 )
 
 type HashingScheme int
@@ -155,6 +157,10 @@ type ProducerOptions struct {
 	// MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. (default: ultimate)
 	MaxReconnectToBroker *uint
 
+	// BackoffPolicy parameterize the following options in the reconnection logic to
+	// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
+	BackoffPolicy internal.BackoffPolicy
+
 	// BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder)
 	// This will be used to create batch container when batching is enabled.
 	// Options:
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 45d2aa9..922d89d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -380,11 +380,7 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer
 }
 
 func (p *partitionProducer) reconnectToBroker() {
-	var (
-		maxRetry int
-		backoff  = internal.Backoff{}
-	)
-
+	var maxRetry int
 	if p.options.MaxReconnectToBroker == nil {
 		maxRetry = -1
 	} else {
@@ -398,9 +394,18 @@ func (p *partitionProducer) reconnectToBroker() {
 			return
 		}
 
-		d := backoff.Next()
-		p.log.Info("Reconnecting to broker in ", d)
-		time.Sleep(d)
+		var (
+			delayReconnectTime time.Duration
+			defaultBackoff     = internal.DefaultBackoff{}
+		)
+
+		if p.options.BackoffPolicy == nil {
+			delayReconnectTime = defaultBackoff.Next()
+		} else {
+			delayReconnectTime = p.options.BackoffPolicy.Next()
+		}
+		p.log.Info("Reconnecting to broker in ", delayReconnectTime)
+		time.Sleep(delayReconnectTime)
 		atomic.AddUint64(&p.epoch, 1)
 		err := p.grabCnx()
 		if err == nil {
@@ -420,7 +425,7 @@ func (p *partitionProducer) reconnectToBroker() {
 			maxRetry--
 		}
 		p.metrics.ProducersReconnectFailure.Inc()
-		if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+		if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
 			p.metrics.ProducersReconnectMaxRetry.Inc()
 		}
 	}
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 4d19ce2..7b5f6b8 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -123,7 +123,7 @@ func (r *retryRouter) getProducer() Producer {
 	}
 
 	// Retry to create producer indefinitely
-	backoff := &internal.Backoff{}
+	backoff := &internal.DefaultBackoff{}
 	for {
 		opt := r.policy.ProducerOptions
 		opt.Topic = r.policy.RetryLetterTopic