You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/14 22:36:53 UTC
[pulsar-client-go] branch master updated: NackBackoffPolicy.Next return time.Duration (#834)
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new edd5c71 NackBackoffPolicy.Next return time.Duration (#834)
edd5c71 is described below
commit edd5c71651b79bd35358a51ae3925905ed9f17e1
Author: Huanghy <h-...@h-hy.com>
AuthorDate: Thu Sep 15 06:36:48 2022 +0800
NackBackoffPolicy.Next return time.Duration (#834)
Co-authored-by: tevinhuang <te...@tencent.com>
---
pulsar/negative_acks_tracker.go | 4 ++--
pulsar/negative_acks_tracker_test.go | 14 +++++++++-----
pulsar/negative_backoff_policy.go | 17 ++++++++++-------
pulsar/negative_backoff_policy_test.go | 5 +++--
4 files changed, 24 insertions(+), 16 deletions(-)
diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index 3485e1b..79ed694 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -54,7 +54,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
if nackBackoffPolicy != nil {
firstDelayForNackBackoff := nackBackoffPolicy.Next(1)
- t.delay = time.Duration(firstDelayForNackBackoff)
+ t.delay = firstDelayForNackBackoff
} else {
t.delay = delay
}
@@ -109,7 +109,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
return
}
- targetTime := time.Now().Add(time.Duration(nackBackoffDelay))
+ targetTime := time.Now().Add(nackBackoffDelay)
t.negativeAcks[batchMsgID] = targetTime
}
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
index 537f0da..5faa947 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -35,14 +35,18 @@ type nackMockedConsumer struct {
lock sync.Mutex
}
-func newNackMockedConsumer() *nackMockedConsumer {
+func newNackMockedConsumer(nackBackoffPolicy NackBackoffPolicy) *nackMockedConsumer {
t := &nackMockedConsumer{
ch: make(chan messageID, 10),
}
go func() {
// since the client ticks at an interval of delay / 3
// wait another interval to ensure we get all messages
- time.Sleep(testNackDelay + 101*time.Millisecond)
+ if nackBackoffPolicy == nil {
+ time.Sleep(testNackDelay + 101*time.Millisecond)
+ } else {
+ time.Sleep(nackBackoffPolicy.Next(1) + 101*time.Millisecond)
+ }
t.lock.Lock()
defer t.lock.Unlock()
t.closed = true
@@ -74,7 +78,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
}
func TestNacksTracker(t *testing.T) {
- nmc := newNackMockedConsumer()
+ nmc := newNackMockedConsumer(nil)
nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger())
nacks.Add(messageID{
@@ -107,7 +111,7 @@ func TestNacksTracker(t *testing.T) {
}
func TestNacksWithBatchesTracker(t *testing.T) {
- nmc := newNackMockedConsumer()
+ nmc := newNackMockedConsumer(nil)
nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger())
nacks.Add(messageID{
@@ -150,7 +154,7 @@ func TestNacksWithBatchesTracker(t *testing.T) {
}
func TestNackBackoffTracker(t *testing.T) {
- nmc := newNackMockedConsumer()
+ nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy))
nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger())
nacks.AddMessage(new(mockMessage1))
diff --git a/pulsar/negative_backoff_policy.go b/pulsar/negative_backoff_policy.go
index cf080ad..be72bfa 100644
--- a/pulsar/negative_backoff_policy.go
+++ b/pulsar/negative_backoff_policy.go
@@ -17,7 +17,10 @@
package pulsar
-import "math"
+import (
+ "math"
+ "time"
+)
// NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
// for a consumer.
@@ -28,19 +31,19 @@ import "math"
type NackBackoffPolicy interface {
// The redeliveryCount indicates the number of times the message was redelivered.
// We can get the redeliveryCount from the CommandMessage.
- Next(redeliveryCount uint32) int64
+ Next(redeliveryCount uint32) time.Duration
}
// defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
type defaultNackBackoffPolicy struct{}
-func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
- minNackTimeMs := int64(1000 * 30) // 30sec
- maxNackTimeMs := 1000 * 60 * 10 // 10min
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) time.Duration {
+ minNackTime := 1 * time.Second // 1sec
+ maxNackTime := 10 * time.Minute // 10min
if redeliveryCount < 0 {
- return minNackTimeMs
+ return minNackTime
}
- return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))
+ return time.Duration(math.Min(math.Abs(float64(minNackTime<<redeliveryCount)), float64(maxNackTime)))
}
diff --git a/pulsar/negative_backoff_policy_test.go b/pulsar/negative_backoff_policy_test.go
index bfbb6a8..f37cf7c 100644
--- a/pulsar/negative_backoff_policy_test.go
+++ b/pulsar/negative_backoff_policy_test.go
@@ -19,6 +19,7 @@ package pulsar
import (
"testing"
+ "time"
"github.com/stretchr/testify/assert"
)
@@ -27,8 +28,8 @@ func TestDefaultNackBackoffPolicy_Next(t *testing.T) {
defaultNackBackoff := new(defaultNackBackoffPolicy)
res0 := defaultNackBackoff.Next(0)
- assert.Equal(t, int64(1000*30), res0)
+ assert.Equal(t, 1*time.Second, res0)
res5 := defaultNackBackoff.Next(5)
- assert.Equal(t, int64(600000), res5)
+ assert.Equal(t, 32*time.Second, res5)
}