You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:50 UTC
[pulsar-client-go] 13/38: Added blocking queue iterator
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit ecab9cf16eed5095bf39cbb376ca80248417db1c
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Wed Apr 10 14:46:44 2019 -0700
Added blocking queue iterator
---
pulsar/impl/util/blocking_queue.go | 44 +++++++++++++++++++++++++++++++++
pulsar/impl/util/blocking_queue_test.go | 15 +++++++++++
2 files changed, 59 insertions(+)
diff --git a/pulsar/impl/util/blocking_queue.go b/pulsar/impl/util/blocking_queue.go
index 281d6ba..1b6a609 100644
--- a/pulsar/impl/util/blocking_queue.go
+++ b/pulsar/impl/util/blocking_queue.go
@@ -1,6 +1,7 @@
package util
import (
+ log "github.com/sirupsen/logrus"
"sync"
)
@@ -19,6 +20,14 @@ type BlockingQueue interface {
// Return the current size of the queue
Size() int
+
+ // Return an iterator for the queue
+ Iterator() BlockingQueueIterator
+}
+
+type BlockingQueueIterator interface {
+ HasNext() bool
+ Next() interface{}
}
type blockingQueue struct {
@@ -33,6 +42,12 @@ type blockingQueue struct {
isNotFull *sync.Cond
}
+type blockingQueueIterator struct {
+ bq *blockingQueue
+ readIdx int
+ toRead int
+}
+
func NewBlockingQueue(maxSize int) BlockingQueue {
bq := &blockingQueue{
items: make([]interface{}, maxSize),
@@ -123,3 +138,32 @@ func (bq *blockingQueue) Size() int {
return bq.size
}
+
+func (bq *blockingQueue) Iterator() BlockingQueueIterator {
+ bq.mutex.Lock()
+ defer bq.mutex.Unlock()
+
+ return &blockingQueueIterator{
+ bq: bq,
+ readIdx: bq.headIdx,
+ toRead: bq.size,
+ }
+}
+
+func (bqi *blockingQueueIterator) HasNext() bool {
+ return bqi.toRead > 0
+}
+
+func (bqi *blockingQueueIterator) Next() interface{} {
+ if bqi.toRead == 0 {
+ log.Panic("Trying to read past the end of the iterator")
+ }
+
+ item := bqi.bq.items[bqi.readIdx]
+ bqi.toRead--
+ bqi.readIdx++
+ if bqi.readIdx == bqi.bq.maxSize {
+ bqi.readIdx = 0
+ }
+ return item
+}
diff --git a/pulsar/impl/util/blocking_queue_test.go b/pulsar/impl/util/blocking_queue_test.go
index 0ecc8d1..dd17db4 100644
--- a/pulsar/impl/util/blocking_queue_test.go
+++ b/pulsar/impl/util/blocking_queue_test.go
@@ -95,3 +95,18 @@ func TestBlockingQueueWaitWhenFull(t *testing.T) {
close(ch)
}
+
+func TestBlockingQueueIterator(t *testing.T) {
+ q := NewBlockingQueue(10)
+
+ q.Put(1)
+ q.Put(2)
+ q.Put(3)
+ assert.Equal(t, 3, q.Size())
+
+ i := 1
+ for it := q.Iterator(); it.HasNext(); {
+ assert.Equal(t, i, it.Next())
+ i++
+ }
+}