You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by rx...@apache.org on 2020/06/28 02:04:36 UTC
[pulsar-client-go] branch master updated: Removed blocking queue
iterator (#301)
This is an automated email from the ASF dual-hosted git repository.
rxl pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
The following commit(s) were added to refs/heads/master by this push:
new 975eb37 Removed blocking queue iterator (#301)
975eb37 is described below
commit 975eb3781644ebe588fc142e53eadf39fe50341a
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Sat Jun 27 19:04:27 2020 -0700
Removed blocking queue iterator (#301)
## Motivation
The blocking queue iterator method is not really thread safe in the presence of other read/write operations on the queue. While it makes a snapshot of what's readable at one point, if there are Poll() operations, it might be ending up reading new items before the new ones.
For that, it's better to make a copy of the items in the queue, also considering that iterating over this queue is only done on reconnections and it's not performance sensitive.
---
pulsar/internal/blocking_queue.go | 50 ++++++++--------------------------
pulsar/internal/blocking_queue_test.go | 29 +++++++++++++++-----
pulsar/producer_partition.go | 9 +++---
3 files changed, 39 insertions(+), 49 deletions(-)
diff --git a/pulsar/internal/blocking_queue.go b/pulsar/internal/blocking_queue.go
index 37317a1..8162301 100644
--- a/pulsar/internal/blocking_queue.go
+++ b/pulsar/internal/blocking_queue.go
@@ -19,8 +19,6 @@ package internal
import (
"sync"
-
- log "github.com/sirupsen/logrus"
)
// BlockingQueue is a interface of block queue
@@ -43,14 +41,8 @@ type BlockingQueue interface {
// Size return the current size of the queue
Size() int
- // Iterator return an iterator for the queue
- Iterator() BlockingQueueIterator
-}
-
-// BlockingQueueIterator abstract a interface of block queue iterator.
-type BlockingQueueIterator interface {
- HasNext() bool
- Next() interface{}
+ // ReadableSlice returns a new view of the readable items in the queue
+ ReadableSlice() []interface{}
}
type blockingQueue struct {
@@ -65,12 +57,6 @@ type blockingQueue struct {
isNotFull *sync.Cond
}
-type blockingQueueIterator struct {
- bq *blockingQueue
- readIdx int
- toRead int
-}
-
// NewBlockingQueue init block queue and returns a BlockingQueue
func NewBlockingQueue(maxSize int) BlockingQueue {
bq := &blockingQueue{
@@ -173,31 +159,19 @@ func (bq *blockingQueue) Size() int {
return bq.size
}
-func (bq *blockingQueue) Iterator() BlockingQueueIterator {
+func (bq *blockingQueue) ReadableSlice() []interface{} {
bq.mutex.Lock()
defer bq.mutex.Unlock()
- return &blockingQueueIterator{
- bq: bq,
- readIdx: bq.headIdx,
- toRead: bq.size,
- }
-}
-
-func (bqi *blockingQueueIterator) HasNext() bool {
- return bqi.toRead > 0
-}
-
-func (bqi *blockingQueueIterator) Next() interface{} {
- if bqi.toRead == 0 {
- log.Panic("Trying to read past the end of the iterator")
+ res := make([]interface{}, bq.size)
+ readIdx := bq.headIdx
+ for i := 0; i < bq.size; i++ {
+ res[i] = bq.items[readIdx]
+ readIdx++
+ if readIdx == bq.maxSize {
+ readIdx = 0
+ }
}
- item := bqi.bq.items[bqi.readIdx]
- bqi.toRead--
- bqi.readIdx++
- if bqi.readIdx == bqi.bq.maxSize {
- bqi.readIdx = 0
- }
- return item
+ return res
}
diff --git a/pulsar/internal/blocking_queue_test.go b/pulsar/internal/blocking_queue_test.go
index 12bb8fc..c93b1a6 100644
--- a/pulsar/internal/blocking_queue_test.go
+++ b/pulsar/internal/blocking_queue_test.go
@@ -119,17 +119,32 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) {
close(ch)
}
-func TestBlockingQueueIterator(t *testing.T) {
- q := NewBlockingQueue(10)
+func TestBlockingQueue_ReadableSlice(t *testing.T) {
+ q := NewBlockingQueue(3)
q.Put(1)
q.Put(2)
q.Put(3)
assert.Equal(t, 3, q.Size())
- i := 1
- for it := q.Iterator(); it.HasNext(); {
- assert.Equal(t, i, it.Next())
- i++
- }
+ items := q.ReadableSlice()
+ assert.Equal(t, len(items), 3)
+ assert.Equal(t, items[0], 1)
+ assert.Equal(t, items[1], 2)
+ assert.Equal(t, items[2], 3)
+
+ q.Poll()
+
+ items = q.ReadableSlice()
+ assert.Equal(t, len(items), 2)
+ assert.Equal(t, items[0], 2)
+ assert.Equal(t, items[1], 3)
+
+ q.Put(4)
+
+ items = q.ReadableSlice()
+ assert.Equal(t, len(items), 3)
+ assert.Equal(t, items[0], 2)
+ assert.Equal(t, items[1], 3)
+ assert.Equal(t, items[2], 4)
}
diff --git a/pulsar/producer_partition.go b/pulsar/producer_partition.go
index 368119f..e4765fc 100644
--- a/pulsar/producer_partition.go
+++ b/pulsar/producer_partition.go
@@ -176,10 +176,11 @@ func (p *partitionProducer) grabCnx() error {
p.cnx.RegisterListener(p.producerID, p)
p.log.WithField("cnx", res.Cnx.ID()).Debug("Connected producer")
- if p.pendingQueue.Size() > 0 {
- p.log.Infof("Resending %d pending batches", p.pendingQueue.Size())
- for it := p.pendingQueue.Iterator(); it.HasNext(); {
- p.cnx.WriteData(it.Next().(*pendingItem).batchData)
+ pendingItems := p.pendingQueue.ReadableSlice()
+ if len(pendingItems) > 0 {
+ p.log.Infof("Resending %d pending batches", len(pendingItems))
+ for _, pi := range pendingItems {
+ p.cnx.WriteData(pi.(*pendingItem).batchData)
}
}
return nil