You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/12/12 01:44:13 UTC

[pulsar-client-go] branch master updated: Make nack tracker tests more robust. (#122)

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new eb9b69e  Make nack tracker tests more robust. (#122)
eb9b69e is described below

commit eb9b69e254693df8c922b361106a6c1885c1019d
Author: cckellogg <cc...@gmail.com>
AuthorDate: Wed Dec 11 17:44:06 2019 -0800

    Make nack tracker tests more robust. (#122)
    
    * Make nack tracker tests more robust.
    
    * Fix time corner cases in tests.
    
    * Fix test race condition.
    
    * Fix comment.
---
 pulsar/negative_acks_tracker_test.go | 77 ++++++++++++++++++++++++------------
 1 file changed, 52 insertions(+), 25 deletions(-)

diff --git a/pulsar/negative_acks_tracker_test.go b/pulsar/negative_acks_tracker_test.go
index 733114a..4dac66e 100644
--- a/pulsar/negative_acks_tracker_test.go
+++ b/pulsar/negative_acks_tracker_test.go
@@ -18,44 +18,64 @@
 package pulsar
 
 import (
-	"github.com/stretchr/testify/assert"
 	"sort"
 	"sync"
 	"testing"
 	"time"
+
+	"github.com/stretchr/testify/assert"
 )
 
+const testNackDelay = 300 * time.Millisecond
+
 type nackMockedConsumer struct {
-	sync.Mutex
-	cond   *sync.Cond
+	ch chan messageID
+	closed bool
+	lock sync.Mutex
 	msgIds []messageID
 }
 
-func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
-	nmc.Lock()
-	if nmc.msgIds == nil {
-		nmc.msgIds = msgIds
-		sort.Slice(msgIds, func(i, j int) bool {
-			return msgIds[i].ledgerID < msgIds[j].entryID
-		})
-		nmc.cond.Signal()
+func newNackMockedConsumer() *nackMockedConsumer {
+	t := &nackMockedConsumer{
+		ch: make(chan messageID, 10),
 	}
+	go func() {
+		// since the client ticks at an interval of delay / 3
+		// wait another interval to ensure we get all messages
+		time.Sleep(testNackDelay + 101 * time.Millisecond)
+		t.lock.Lock()
+		defer t.lock.Unlock()
+		t.closed = true
+		close(t.ch)
+	}()
+	return t
+}
 
-	nmc.Unlock()
+func (nmc *nackMockedConsumer) Redeliver(msgIds []messageID) {
+	nmc.lock.Lock()
+	defer nmc.lock.Unlock()
+	if nmc.closed {
+		return
+	}
+	for _, id := range msgIds {
+		nmc.ch <- id
+	}
 }
 
-func (nmc *nackMockedConsumer) Wait() []messageID {
-	nmc.Lock()
-	defer nmc.Unlock()
-	nmc.cond.Wait()
+func sortMessageIds(msgIds []messageID) []messageID {
+	sort.Slice(msgIds, func(i, j int) bool {
+		return msgIds[i].ledgerID < msgIds[j].entryID
+	})
+	return msgIds
+}
 
-	return nmc.msgIds
+func (nmc *nackMockedConsumer) Wait() <- chan messageID {
+	return nmc.ch
 }
 
 func TestNacksTracker(t *testing.T) {
-	nmc := &nackMockedConsumer{}
-	nmc.cond = sync.NewCond(nmc)
-	nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+	nmc := newNackMockedConsumer()
+	nacks := newNegativeAcksTracker(nmc, testNackDelay)
 
 	nacks.Add(&messageID{
 		ledgerID: 1,
@@ -69,7 +89,11 @@ func TestNacksTracker(t *testing.T) {
 		batchIdx: 1,
 	})
 
-	msgIds := nmc.Wait()
+	msgIds := make([]messageID, 0)
+	for id := range nmc.Wait() {
+		msgIds = append(msgIds, id)
+	}
+	msgIds = sortMessageIds(msgIds)
 
 	assert.Equal(t, 2, len(msgIds))
 	assert.Equal(t, int64(1), msgIds[0].ledgerID)
@@ -81,9 +105,8 @@ func TestNacksTracker(t *testing.T) {
 }
 
 func TestNacksWithBatchesTracker(t *testing.T) {
-	nmc := &nackMockedConsumer{}
-	nmc.cond = sync.NewCond(nmc)
-	nacks := newNegativeAcksTracker(nmc, 1*time.Second)
+	nmc := newNackMockedConsumer()
+	nacks := newNegativeAcksTracker(nmc, testNackDelay)
 
 	nacks.Add(&messageID{
 		ledgerID: 1,
@@ -109,7 +132,11 @@ func TestNacksWithBatchesTracker(t *testing.T) {
 		batchIdx: 1,
 	})
 
-	msgIds := nmc.Wait()
+	msgIds := make([]messageID, 0)
+	for id := range nmc.Wait() {
+		msgIds = append(msgIds, id)
+	}
+	msgIds = sortMessageIds(msgIds)
 
 	assert.Equal(t, 2, len(msgIds))
 	assert.Equal(t, int64(1), msgIds[0].ledgerID)