You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/09/29 02:40:30 UTC
[pulsar-client-go] branch master updated: Parameterize the reconnection option (#853)
This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 6a8847f Parameterize the reconnection option (#853)
6a8847f is described below
commit 6a8847f34dfdef27740ac619ff9aeb3fd1e85afb
Author: xiaolong ran <xi...@tencent.com>
AuthorDate: Thu Sep 29 10:40:24 2022 +0800
Parameterize the reconnection option (#853)
---
pulsar/consumer.go | 6 ++++++
pulsar/consumer_impl.go | 1 +
pulsar/consumer_partition.go | 24 ++++++++++++++++--------
pulsar/dlq_router.go | 2 +-
pulsar/internal/backoff.go | 25 +++++++++++++++----------
pulsar/internal/backoff_test.go | 6 +++---
pulsar/internal/http_client.go | 2 +-
pulsar/internal/rpc_client.go | 2 +-
pulsar/negative_backoff_policy.go | 2 +-
pulsar/producer.go | 6 ++++++
pulsar/producer_partition.go | 23 ++++++++++++++---------
pulsar/retry_router.go | 2 +-
12 files changed, 66 insertions(+), 35 deletions(-)
diff --git a/pulsar/consumer.go b/pulsar/consumer.go
index 70d67ba..3f756f8 100644
--- a/pulsar/consumer.go
+++ b/pulsar/consumer.go
@@ -20,6 +20,8 @@ package pulsar
import (
"context"
"time"
+
+ "github.com/apache/pulsar-client-go/pulsar/internal"
)
// ConsumerMessage represents a pair of a Consumer and Message.
@@ -171,6 +173,10 @@ type ConsumerOptions struct {
// MaxReconnectToBroker sets the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
+ // BackoffPolicy parameterize the following options in the reconnection logic to
+ // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
+ BackoffPolicy internal.BackoffPolicy
+
// Decryption represents the encryption related fields required by the consumer to decrypt a message.
Decryption *MessageDecryptionInfo
diff --git a/pulsar/consumer_impl.go b/pulsar/consumer_impl.go
index e6135bf..517824d 100644
--- a/pulsar/consumer_impl.go
+++ b/pulsar/consumer_impl.go
@@ -361,6 +361,7 @@ func (c *consumer) internalTopicSubscribeToPartitions() error {
readCompacted: c.options.ReadCompacted,
interceptors: c.options.Interceptors,
maxReconnectToBroker: c.options.MaxReconnectToBroker,
+ backoffPolicy: c.options.BackoffPolicy,
keySharedPolicy: c.options.KeySharedPolicy,
schema: c.options.Schema,
decryption: c.options.Decryption,
diff --git a/pulsar/consumer_partition.go b/pulsar/consumer_partition.go
index cc9e710..7ddff5e 100644
--- a/pulsar/consumer_partition.go
+++ b/pulsar/consumer_partition.go
@@ -102,6 +102,7 @@ type partitionConsumerOpts struct {
disableForceTopicCreation bool
interceptors ConsumerInterceptors
maxReconnectToBroker *uint
+ backoffPolicy internal.BackoffPolicy
keySharedPolicy *KeySharedPolicy
schema Schema
decryption *MessageDecryptionInfo
@@ -1143,10 +1144,7 @@ func (pc *partitionConsumer) internalClose(req *closeRequest) {
}
func (pc *partitionConsumer) reconnectToBroker() {
- var (
- maxRetry int
- backoff = internal.Backoff{}
- )
+ var maxRetry int
if pc.options.maxReconnectToBroker == nil {
maxRetry = -1
@@ -1161,9 +1159,19 @@ func (pc *partitionConsumer) reconnectToBroker() {
return
}
- d := backoff.Next()
- pc.log.Info("Reconnecting to broker in ", d)
- time.Sleep(d)
+ var (
+ delayReconnectTime time.Duration
+ defaultBackoff = internal.DefaultBackoff{}
+ )
+
+ if pc.options.backoffPolicy == nil {
+ delayReconnectTime = defaultBackoff.Next()
+ } else {
+ delayReconnectTime = pc.options.backoffPolicy.Next()
+ }
+
+ pc.log.Info("Reconnecting to broker in ", delayReconnectTime)
+ time.Sleep(delayReconnectTime)
err := pc.grabConn()
if err == nil {
@@ -1183,7 +1191,7 @@ func (pc *partitionConsumer) reconnectToBroker() {
maxRetry--
}
pc.metrics.ConsumersReconnectFailure.Inc()
- if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+ if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
pc.metrics.ConsumersReconnectMaxRetry.Inc()
}
}
diff --git a/pulsar/dlq_router.go b/pulsar/dlq_router.go
index 966bff1..000faaa 100644
--- a/pulsar/dlq_router.go
+++ b/pulsar/dlq_router.go
@@ -133,7 +133,7 @@ func (r *dlqRouter) getProducer(schema Schema) Producer {
}
// Retry to create producer indefinitely
- backoff := &internal.Backoff{}
+ backoff := &internal.DefaultBackoff{}
for {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.DeadLetterTopic
diff --git a/pulsar/internal/backoff.go b/pulsar/internal/backoff.go
index ff9b0bc..3284fb7 100644
--- a/pulsar/internal/backoff.go
+++ b/pulsar/internal/backoff.go
@@ -26,20 +26,25 @@ func init() {
rand.Seed(time.Now().UnixNano())
}
-// Backoff computes the delay before retrying an action.
+// BackoffPolicy parameterize the following options in the reconnection logic to
+// allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
+type BackoffPolicy interface {
+ Next() time.Duration
+}
+
+// DefaultBackoff computes the delay before retrying an action.
// It uses an exponential backoff with jitter. The jitter represents up to 20 percents of the delay.
-type Backoff struct {
+type DefaultBackoff struct {
backoff time.Duration
}
-const (
- minBackoff = 100 * time.Millisecond
- maxBackoff = 60 * time.Second
- jitterPercentage = 0.2
-)
+const maxBackoff = 60 * time.Second
// Next returns the delay to wait before next retry
-func (b *Backoff) Next() time.Duration {
+func (b *DefaultBackoff) Next() time.Duration {
+ minBackoff := 100 * time.Millisecond
+ jitterPercentage := 0.2
+
// Double the delay each time
b.backoff += b.backoff
if b.backoff.Nanoseconds() < minBackoff.Nanoseconds() {
@@ -52,7 +57,7 @@ func (b *Backoff) Next() time.Duration {
return b.backoff + time.Duration(jitter)
}
-// IsMaxBackReached evaluates if the max number of retries is reached
-func (b *Backoff) IsMaxBackoffReached() bool {
+// IsMaxBackoffReached evaluates if the max number of retries is reached
+func (b *DefaultBackoff) IsMaxBackoffReached() bool {
return b.backoff >= maxBackoff
}
diff --git a/pulsar/internal/backoff_test.go b/pulsar/internal/backoff_test.go
index ad6e764..e05ea29 100644
--- a/pulsar/internal/backoff_test.go
+++ b/pulsar/internal/backoff_test.go
@@ -25,14 +25,14 @@ import (
)
func TestBackoff_NextMinValue(t *testing.T) {
- backoff := &Backoff{}
+ backoff := &DefaultBackoff{}
delay := backoff.Next()
assert.GreaterOrEqual(t, int64(delay), int64(100*time.Millisecond))
assert.LessOrEqual(t, int64(delay), int64(120*time.Millisecond))
}
func TestBackoff_NextExponentialBackoff(t *testing.T) {
- backoff := &Backoff{}
+ backoff := &DefaultBackoff{}
previousDelay := backoff.Next()
// the last value before capping to the max value is 51.2 s (.1, .2, .4, .8, 1.6, 3.2, 6.4, 12.8, 25.6, 51.2)
for previousDelay < 51*time.Second {
@@ -47,7 +47,7 @@ func TestBackoff_NextExponentialBackoff(t *testing.T) {
}
func TestBackoff_NextMaxValue(t *testing.T) {
- backoff := &Backoff{}
+ backoff := &DefaultBackoff{}
var delay time.Duration
for delay < maxBackoff {
delay = backoff.Next()
diff --git a/pulsar/internal/http_client.go b/pulsar/internal/http_client.go
index c160623..8c494dc 100644
--- a/pulsar/internal/http_client.go
+++ b/pulsar/internal/http_client.go
@@ -148,7 +148,7 @@ func (c *httpClient) Get(endpoint string, obj interface{}, params map[string]str
if _, ok := err.(*url.Error); ok {
// We can retry this kind of requests over a connection error because they're
// not specific to a particular broker.
- backoff := Backoff{100 * time.Millisecond}
+ backoff := DefaultBackoff{100 * time.Millisecond}
startTime := time.Now()
var retryTime time.Duration
diff --git a/pulsar/internal/rpc_client.go b/pulsar/internal/rpc_client.go
index 24506ef..4ef2995 100644
--- a/pulsar/internal/rpc_client.go
+++ b/pulsar/internal/rpc_client.go
@@ -91,7 +91,7 @@ func (c *rpcClient) RequestToAnyBroker(requestID uint64, cmdType pb.BaseCommand_
var host *url.URL
var rpcResult *RPCResult
startTime := time.Now()
- backoff := Backoff{100 * time.Millisecond}
+ backoff := DefaultBackoff{100 * time.Millisecond}
// we can retry these requests because this kind of request is
// not specific to any particular broker
for time.Since(startTime) < c.requestTimeout {
diff --git a/pulsar/negative_backoff_policy.go b/pulsar/negative_backoff_policy.go
index be72bfa..5cd35bc 100644
--- a/pulsar/negative_backoff_policy.go
+++ b/pulsar/negative_backoff_policy.go
@@ -29,7 +29,7 @@ import (
// > NackBackoffPolicy, which means the message might get redelivered earlier than the delay time
// > from the backoff.
type NackBackoffPolicy interface {
- // The redeliveryCount indicates the number of times the message was redelivered.
+ // Next param redeliveryCount indicates the number of times the message was redelivered.
// We can get the redeliveryCount from the CommandMessage.
Next(redeliveryCount uint32) time.Duration
}
diff --git a/pulsar/producer.go b/pulsar/producer.go
index fd68631..b4e43bd 100644
--- a/pulsar/producer.go
+++ b/pulsar/producer.go
@@ -20,6 +20,8 @@ package pulsar
import (
"context"
"time"
+
+ "github.com/apache/pulsar-client-go/pulsar/internal"
)
type HashingScheme int
@@ -155,6 +157,10 @@ type ProducerOptions struct {
// MaxReconnectToBroker specifies the maximum retry number of reconnectToBroker. (default: ultimate)
MaxReconnectToBroker *uint
+ // BackoffPolicy parameterize the following options in the reconnection logic to
+ // allow users to customize the reconnection logic (minBackoff, maxBackoff and jitterPercentage)
+ BackoffPolicy internal.BackoffPolicy
+
// BatcherBuilderType sets the batch builder type (default DefaultBatchBuilder)
// This will be used to create batch container when batching is enabled.
// Options:
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 45d2aa9..922d89d 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -380,11 +380,7 @@ func (p *partitionProducer) getOrCreateSchema(schemaInfo *SchemaInfo) (schemaVer
}
func (p *partitionProducer) reconnectToBroker() {
- var (
- maxRetry int
- backoff = internal.Backoff{}
- )
-
+ var maxRetry int
if p.options.MaxReconnectToBroker == nil {
maxRetry = -1
} else {
@@ -398,9 +394,18 @@ func (p *partitionProducer) reconnectToBroker() {
return
}
- d := backoff.Next()
- p.log.Info("Reconnecting to broker in ", d)
- time.Sleep(d)
+ var (
+ delayReconnectTime time.Duration
+ defaultBackoff = internal.DefaultBackoff{}
+ )
+
+ if p.options.BackoffPolicy == nil {
+ delayReconnectTime = defaultBackoff.Next()
+ } else {
+ delayReconnectTime = p.options.BackoffPolicy.Next()
+ }
+ p.log.Info("Reconnecting to broker in ", delayReconnectTime)
+ time.Sleep(delayReconnectTime)
atomic.AddUint64(&p.epoch, 1)
err := p.grabCnx()
if err == nil {
@@ -420,7 +425,7 @@ func (p *partitionProducer) reconnectToBroker() {
maxRetry--
}
p.metrics.ProducersReconnectFailure.Inc()
- if maxRetry == 0 || backoff.IsMaxBackoffReached() {
+ if maxRetry == 0 || defaultBackoff.IsMaxBackoffReached() {
p.metrics.ProducersReconnectMaxRetry.Inc()
}
}
diff --git a/pulsar/retry_router.go b/pulsar/retry_router.go
index 4d19ce2..7b5f6b8 100644
--- a/pulsar/retry_router.go
+++ b/pulsar/retry_router.go
@@ -123,7 +123,7 @@ func (r *retryRouter) getProducer() Producer {
}
// Retry to create producer indefinitely
- backoff := &internal.Backoff{}
+ backoff := &internal.DefaultBackoff{}
for {
opt := r.policy.ProducerOptions
opt.Topic = r.policy.RetryLetterTopic