You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:50 UTC

[pulsar-client-go] 13/38: Added blocking queue iterator

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit ecab9cf16eed5095bf39cbb376ca80248417db1c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Apr 10 14:46:44 2019 -0700

    Added blocking queue iterator
---
 pulsar/impl/util/blocking_queue.go      | 44 +++++++++++++++++++++++++++++++++
 pulsar/impl/util/blocking_queue_test.go | 15 +++++++++++
 2 files changed, 59 insertions(+)

diff --git a/pulsar/impl/util/blocking_queue.go b/pulsar/impl/util/blocking_queue.go
index 281d6ba..1b6a609 100644
--- a/pulsar/impl/util/blocking_queue.go
+++ b/pulsar/impl/util/blocking_queue.go
@@ -1,6 +1,7 @@
 package util
 
 import (
+	log "github.com/sirupsen/logrus"
 	"sync"
 )
 
@@ -19,6 +20,14 @@ type BlockingQueue interface {
 
 	// Return the current size of the queue
 	Size() int
+
+	// Return an iterator for the queue
+	Iterator() BlockingQueueIterator
+}
+
+type BlockingQueueIterator interface {
+	HasNext() bool
+	Next() interface{}
 }
 
 type blockingQueue struct {
@@ -33,6 +42,12 @@ type blockingQueue struct {
 	isNotFull  *sync.Cond
 }
 
+type blockingQueueIterator struct {
+	bq      *blockingQueue
+	readIdx int
+	toRead  int
+}
+
 func NewBlockingQueue(maxSize int) BlockingQueue {
 	bq := &blockingQueue{
 		items:   make([]interface{}, maxSize),
@@ -123,3 +138,32 @@ func (bq *blockingQueue) Size() int {
 
 	return bq.size
 }
+
+func (bq *blockingQueue) Iterator() BlockingQueueIterator {
+	bq.mutex.Lock()
+	defer bq.mutex.Unlock()
+
+	return &blockingQueueIterator{
+		bq:      bq,
+		readIdx: bq.headIdx,
+		toRead:  bq.size,
+	}
+}
+
+func (bqi *blockingQueueIterator) HasNext() bool {
+	return bqi.toRead > 0
+}
+
+func (bqi *blockingQueueIterator) Next() interface{} {
+	if bqi.toRead == 0 {
+		log.Panic("Trying to read past the end of the iterator")
+	}
+
+	item := bqi.bq.items[bqi.readIdx]
+	bqi.toRead--
+	bqi.readIdx++
+	if bqi.readIdx == bqi.bq.maxSize {
+		bqi.readIdx = 0
+	}
+	return item
+}
diff --git a/pulsar/impl/util/blocking_queue_test.go b/pulsar/impl/util/blocking_queue_test.go
index 0ecc8d1..dd17db4 100644
--- a/pulsar/impl/util/blocking_queue_test.go
+++ b/pulsar/impl/util/blocking_queue_test.go
@@ -95,3 +95,18 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) {
 
 	close(ch)
 }
+
+func TestBlockingQueueIterator(t *testing.T) {
+	q := NewBlockingQueue(10)
+
+	q.Put(1)
+	q.Put(2)
+	q.Put(3)
+	assert.Equal(t, 3, q.Size())
+
+	i := 1
+	for it := q.Iterator(); it.HasNext(); {
+		assert.Equal(t, i, it.Next())
+		i++
+	}
+}