You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/06/28 02:04:36 UTC

[pulsar-client-go] branch master updated: Removed blocking queue iterator (#301)

This is an automated email from the ASF dual-hosted git repository.

rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git


The following commit(s) were added to refs/heads/master by this push:
     new 975eb37  Removed blocking queue iterator (#301)
975eb37 is described below

commit 975eb3781644ebe588fc142e53eadf39fe50341a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Jun 27 19:04:27 2020 -0700

    Removed blocking queue iterator (#301)
    
    ## Motivation
    The blocking queue iterator method is not really thread safe in the presence of other read/write operations on the queue. While it makes a snapshot of what's readable at one point, if there are Poll() operations, it might be ending up reading new items before the new ones.
    
    For that, it's better to make a copy of the items in the queue, also considering that iterating over this queue is only done on reconnections and it's not performance sensitive.
---
 pulsar/internal/blocking_queue.go      | 50 ++++++++--------------------------
 pulsar/internal/blocking_queue_test.go | 29 +++++++++++++++-----
 pulsar/producer_partition.go           |  9 +++---
 3 files changed, 39 insertions(+), 49 deletions(-)

diff --git a/pulsar/internal/blocking_queue.go b/pulsar/internal/blocking_queue.go
index 37317a1..8162301 100644
--- a/pulsar/internal/blocking_queue.go
+++ b/pulsar/internal/blocking_queue.go
@@ -19,8 +19,6 @@ package internal
 
 import (
 	"sync"
-
-	log "github.com/sirupsen/logrus"
 )
 
 // BlockingQueue is a interface of block queue
@@ -43,14 +41,8 @@ type BlockingQueue interface {
 	// Size return the current size of the queue
 	Size() int
 
-	// Iterator return an iterator for the queue
-	Iterator() BlockingQueueIterator
-}
-
-// BlockingQueueIterator abstract a interface of block queue iterator.
-type BlockingQueueIterator interface {
-	HasNext() bool
-	Next() interface{}
+	// ReadableSlice returns a new view of the readable items in the queue
+	ReadableSlice() []interface{}
 }
 
 type blockingQueue struct {
@@ -65,12 +57,6 @@ type blockingQueue struct {
 	isNotFull  *sync.Cond
 }
 
-type blockingQueueIterator struct {
-	bq      *blockingQueue
-	readIdx int
-	toRead  int
-}
-
 // NewBlockingQueue init block queue and returns a BlockingQueue
 func NewBlockingQueue(maxSize int) BlockingQueue {
 	bq := &blockingQueue{
@@ -173,31 +159,19 @@ func (bq *blockingQueue) Size() int {
 	return bq.size
 }
 
-func (bq *blockingQueue) Iterator() BlockingQueueIterator {
+func (bq *blockingQueue) ReadableSlice() []interface{} {
 	bq.mutex.Lock()
 	defer bq.mutex.Unlock()
 
-	return &blockingQueueIterator{
-		bq:      bq,
-		readIdx: bq.headIdx,
-		toRead:  bq.size,
-	}
-}
-
-func (bqi *blockingQueueIterator) HasNext() bool {
-	return bqi.toRead > 0
-}
-
-func (bqi *blockingQueueIterator) Next() interface{} {
-	if bqi.toRead == 0 {
-		log.Panic("Trying to read past the end of the iterator")
+	res := make([]interface{}, bq.size)
+	readIdx := bq.headIdx
+	for i := 0; i < bq.size; i++ {
+		res[i] = bq.items[readIdx]
+		readIdx++
+		if readIdx == bq.maxSize {
+			readIdx = 0
+		}
 	}
 
-	item := bqi.bq.items[bqi.readIdx]
-	bqi.toRead--
-	bqi.readIdx++
-	if bqi.readIdx == bqi.bq.maxSize {
-		bqi.readIdx = 0
-	}
-	return item
+	return res
 }
diff --git a/pulsar/internal/blocking_queue_test.go b/pulsar/internal/blocking_queue_test.go
index 12bb8fc..c93b1a6 100644
--- a/pulsar/internal/blocking_queue_test.go
+++ b/pulsar/internal/blocking_queue_test.go
@@ -119,17 +119,32 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) {
 	close(ch)
 }
 
-func TestBlockingQueueIterator(t *testing.T) {
-	q := NewBlockingQueue(10)
+func TestBlockingQueue_ReadableSlice(t *testing.T) {
+	q := NewBlockingQueue(3)
 
 	q.Put(1)
 	q.Put(2)
 	q.Put(3)
 	assert.Equal(t, 3, q.Size())
 
-	i := 1
-	for it := q.Iterator(); it.HasNext(); {
-		assert.Equal(t, i, it.Next())
-		i++
-	}
+	items := q.ReadableSlice()
+	assert.Equal(t, len(items), 3)
+	assert.Equal(t, items[0], 1)
+	assert.Equal(t, items[1], 2)
+	assert.Equal(t, items[2], 3)
+
+	q.Poll()
+
+	items = q.ReadableSlice()
+	assert.Equal(t, len(items), 2)
+	assert.Equal(t, items[0], 2)
+	assert.Equal(t, items[1], 3)
+
+	q.Put(4)
+
+	items = q.ReadableSlice()
+	assert.Equal(t, len(items), 3)
+	assert.Equal(t, items[0], 2)
+	assert.Equal(t, items[1], 3)
+	assert.Equal(t, items[2], 4)
 }
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 368119f..e4765fc 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -176,10 +176,11 @@ func (p *partitionProducer) grabCnx() error {
 	p.cnx.RegisterListener(p.producerID, p)
 	p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
 
-	if p.pendingQueue.Size() > 0 {
-		p.log.Infof("Resending %d pending batches", p.pendingQueue.Size())
-		for it := p.pendingQueue.Iterator(); it.HasNext(); {
-			p.cnx.WriteData(it.Next().(*pendingItem).batchData)
+	pendingItems := p.pendingQueue.ReadableSlice()
+	if len(pendingItems) > 0 {
+		p.log.Infof("Resending %d pending batches", len(pendingItems))
+		for _, pi := range pendingItems {
+			p.cnx.WriteData(pi.(*pendingItem).batchData)
 		}
 	}
 	return nil