You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2022/09/14 22:36:53 UTC

[pulsar-client-go] branch master updated: NackBackoffPolicy.Next return time.Duration (#834)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new edd5c71  NackBackoffPolicy.Next return time.Duration (#834)
edd5c71 is described below

commit edd5c71651b79bd35358a51ae3925905ed9f17e1
Author: Huanghy <h-...@h-hy.com>
AuthorDate: Thu Sep 15 06:36:48 2022 +0800

    NackBackoffPolicy.Next return time.Duration (#834)
    
    Co-authored-by: tevinhuang <te...@tencent.com>
---
 pulsar/negative_acks_tracker.go        |  4 ++--
 pulsar/negative_acks_tracker_test.go   | 14 +++++++++-----
 pulsar/negative_backoff_policy.go      | 17 ++++++++++-------
 pulsar/negative_backoff_policy_test.go |  5 +++--
 4 files changed, 24 insertions(+), 16 deletions(-)

diff --git a/pulsar/negative_acks_tracker.go b/pulsar/negative_acks_tracker.go
index 3485e1b..79ed694 100644
--- a/pulsar/negative_acks_tracker.go
+++ b/pulsar/negative_acks_tracker.go
@@ -54,7 +54,7 @@ func newNegativeAcksTracker(rc redeliveryConsumer, delay time.Duration,
 
 	if nackBackoffPolicy != nil {
 		firstDelayForNackBackoff := nackBackoffPolicy.Next(1)
-		t.delay = time.Duration(firstDelayForNackBackoff)
+		t.delay = firstDelayForNackBackoff
 	} else {
 		t.delay = delay
 	}
@@ -109,7 +109,7 @@ func (t *negativeAcksTracker) AddMessage(msg Message) {
 		return
 	}
 
-	targetTime := time.Now().Add(time.Duration(nackBackoffDelay))
+	targetTime := time.Now().Add(nackBackoffDelay)
 	t.negativeAcks[batchMsgID] = targetTime
 }
 
diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
index 537f0da..5faa947 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -35,14 +35,18 @@ type nackMockedConsumer struct {
 	lock   sync.Mutex
 }
 
-func newNackMockedConsumer() *nackMockedConsumer {
+func newNackMockedConsumer(nackBackoffPolicy NackBackoffPolicy) *nackMockedConsumer {
 	t := &nackMockedConsumer{
 		ch: make(chan messageID, 10),
 	}
 	go func() {
 		// since the client ticks at an interval of delay / 3
 		// wait another interval to ensure we get all messages
-		time.Sleep(testNackDelay + 101*time.Millisecond)
+		if nackBackoffPolicy == nil {
+			time.Sleep(testNackDelay + 101*time.Millisecond)
+		} else {
+			time.Sleep(nackBackoffPolicy.Next(1) + 101*time.Millisecond)
+		}
 		t.lock.Lock()
 		defer t.lock.Unlock()
 		t.closed = true
@@ -74,7 +78,7 @@ func (nmc *nackMockedConsumer) Wait() <-chan messageID {
 }
 
 func TestNacksTracker(t *testing.T) {
-	nmc := newNackMockedConsumer()
+	nmc := newNackMockedConsumer(nil)
 	nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger())
 
 	nacks.Add(messageID{
@@ -107,7 +111,7 @@ func TestNacksTracker(t *testing.T) {
 }
 
 func TestNacksWithBatchesTracker(t *testing.T) {
-	nmc := newNackMockedConsumer()
+	nmc := newNackMockedConsumer(nil)
 	nacks := newNegativeAcksTracker(nmc, testNackDelay, nil, log.DefaultNopLogger())
 
 	nacks.Add(messageID{
@@ -150,7 +154,7 @@ func TestNacksWithBatchesTracker(t *testing.T) {
 }
 
 func TestNackBackoffTracker(t *testing.T) {
-	nmc := newNackMockedConsumer()
+	nmc := newNackMockedConsumer(new(defaultNackBackoffPolicy))
 	nacks := newNegativeAcksTracker(nmc, testNackDelay, new(defaultNackBackoffPolicy), log.DefaultNopLogger())
 
 	nacks.AddMessage(new(mockMessage1))
diff --git a/pulsar/negative_backoff_policy.go b/pulsar/negative_backoff_policy.go
index cf080ad..be72bfa 100644
--- a/pulsar/negative_backoff_policy.go
+++ b/pulsar/negative_backoff_policy.go
@@ -17,7 +17,10 @@
 
 package pulsar
 
-import "math"
+import (
+	"math"
+	"time"
+)
 
 // NackBackoffPolicy is a interface for custom message negativeAcked policy, users can specify a NackBackoffPolicy
 // for a consumer.
@@ -28,19 +31,19 @@ import "math"
 type NackBackoffPolicy interface {
 	// The redeliveryCount indicates the number of times the message was redelivered.
 	// We can get the redeliveryCount from the CommandMessage.
-	Next(redeliveryCount uint32) int64
+	Next(redeliveryCount uint32) time.Duration
 }
 
 // defaultNackBackoffPolicy is default impl for NackBackoffPolicy.
 type defaultNackBackoffPolicy struct{}
 
-func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) int64 {
-	minNackTimeMs := int64(1000 * 30) // 30sec
-	maxNackTimeMs := 1000 * 60 * 10   // 10min
+func (nbp *defaultNackBackoffPolicy) Next(redeliveryCount uint32) time.Duration {
+	minNackTime := 1 * time.Second  // 1sec
+	maxNackTime := 10 * time.Minute // 10min
 
 	if redeliveryCount < 0 {
-		return minNackTimeMs
+		return minNackTime
 	}
 
-	return int64(math.Min(math.Abs(float64(minNackTimeMs<<redeliveryCount)), float64(maxNackTimeMs)))
+	return time.Duration(math.Min(math.Abs(float64(minNackTime<<redeliveryCount)), float64(maxNackTime)))
 }
diff --git a/pulsar/negative_backoff_policy_test.go b/pulsar/negative_backoff_policy_test.go
index bfbb6a8..f37cf7c 100644
--- a/pulsar/negative_backoff_policy_test.go
+++ b/pulsar/negative_backoff_policy_test.go
@@ -19,6 +19,7 @@ package pulsar
 
 import (
 	"testing"
+	"time"
 
 	"github.com/stretchr/testify/assert"
 )
@@ -27,8 +28,8 @@ func TestDefaultNackBackoffPolicy_Next(t *testing.T) {
 	defaultNackBackoff := new(defaultNackBackoffPolicy)
 
 	res0 := defaultNackBackoff.Next(0)
-	assert.Equal(t, int64(1000*30), res0)
+	assert.Equal(t, 1*time.Second, res0)
 
 	res5 := defaultNackBackoff.Next(5)
-	assert.Equal(t, int64(600000), res5)
+	assert.Equal(t, 32*time.Second, res5)
 }