You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:48 UTC
[pulsar-client-go] 11/38: Added blocking queue implementation
This is an automated email from the ASF dual-hosted git repository.
mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git
commit b95a97191e2a5c63d67ab41940dcd57c1032c7ee
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Apr 9 21:28:31 2019 -0700
Added blocking queue implementation
---
pulsar/impl/util/blocking_queue.go | 125 ++++++++++++++++++++++++++++++++
pulsar/impl/util/blocking_queue_test.go | 97 +++++++++++++++++++++++++
2 files changed, 222 insertions(+)
diff --git a/pulsar/impl/util/blocking_queue.go b/pulsar/impl/util/blocking_queue.go
new file mode 100644
index 0000000..281d6ba
--- /dev/null
+++ b/pulsar/impl/util/blocking_queue.go
@@ -0,0 +1,125 @@
+package util
+
+import (
+ "sync"
+)
+
+type BlockingQueue interface {
+ // Enqueue one item, block if the queue is full
+ Put(item interface{})
+
+ // Dequeue one item, block until it's available
+ Take() interface{}
+
+ // Dequeue one item, return nil if queue is empty
+ Poll() interface{}
+
+ // Return one item without dequeing, return nil if queue is empty
+ Peek() interface{}
+
+ // Return the current size of the queue
+ Size() int
+}
+
+type blockingQueue struct {
+ items []interface{}
+ headIdx int
+ tailIdx int
+ size int
+ maxSize int
+
+ mutex sync.Mutex
+ isNotEmpty *sync.Cond
+ isNotFull *sync.Cond
+}
+
+func NewBlockingQueue(maxSize int) BlockingQueue {
+ bq := &blockingQueue{
+ items: make([]interface{}, maxSize),
+ headIdx: 0,
+ tailIdx: 0,
+ size: 0,
+ maxSize: maxSize,
+ }
+
+ bq.isNotEmpty = sync.NewCond(&bq.mutex)
+ bq.isNotFull = sync.NewCond(&bq.mutex)
+ return bq
+}
+
+func (bq *blockingQueue) Put(item interface{}) {
+ bq.mutex.Lock()
+ defer bq.mutex.Unlock()
+
+ for ; bq.size == bq.maxSize; {
+ bq.isNotFull.Wait()
+ }
+
+ wasEmpty := bq.size == 0
+
+ bq.items[bq.tailIdx] = item
+ bq.size += 1
+ bq.tailIdx += 1
+ if bq.tailIdx >= bq.maxSize {
+ bq.tailIdx = 0
+ }
+
+ if wasEmpty {
+ // Wake up eventual reader waiting for next item
+ bq.isNotEmpty.Signal()
+ }
+}
+
+func (bq *blockingQueue) Take() interface{} {
+ bq.mutex.Lock()
+ defer bq.mutex.Unlock()
+
+ for ; bq.size == 0; {
+ bq.isNotEmpty.Wait()
+ }
+
+ return bq.dequeue()
+}
+
+func (bq *blockingQueue) Poll() interface{} {
+ bq.mutex.Lock()
+ defer bq.mutex.Unlock()
+
+ if bq.size == 0 {
+ return nil
+ }
+
+ return bq.dequeue()
+}
+
+func (bq *blockingQueue) Peek() interface{} {
+ bq.mutex.Lock()
+ defer bq.mutex.Unlock()
+
+ if bq.size == 0 {
+ return nil
+ } else {
+ return bq.items[bq.headIdx]
+ }
+}
+
+func (bq *blockingQueue) dequeue() interface{} {
+ item := bq.items[bq.headIdx]
+ bq.items[bq.headIdx] = nil
+
+ bq.headIdx += 1
+ if bq.headIdx == len(bq.items) {
+ bq.headIdx = 0
+ }
+
+ bq.size -= 1
+ bq.isNotFull.Signal()
+ return item
+}
+
+func (bq *blockingQueue) Size() int {
+ bq.mutex.Lock()
+ defer bq.mutex.Unlock()
+
+ return bq.size
+}
diff --git a/pulsar/impl/util/blocking_queue_test.go b/pulsar/impl/util/blocking_queue_test.go
new file mode 100644
index 0000000..0ecc8d1
--- /dev/null
+++ b/pulsar/impl/util/blocking_queue_test.go
@@ -0,0 +1,97 @@
+package util
+
+import (
+ "fmt"
+ "testing"
+ "time"
+
+ "github.com/stretchr/testify/assert"
+)
+
+func TestBlockingQueue(t *testing.T) {
+ q := NewBlockingQueue(10)
+
+ assert.Equal(t, 0, q.Size())
+ assert.Equal(t, nil, q.Poll())
+ assert.Equal(t, nil, q.Peek())
+
+ q.Put("test")
+ assert.Equal(t, 1, q.Size())
+
+ assert.Equal(t, "test", q.Peek())
+ assert.Equal(t, 1, q.Size())
+
+ assert.Equal(t, "test", q.Take())
+ assert.Equal(t, 0, q.Size())
+
+ ch := make(chan string)
+
+ go func() {
+ // Stays blocked until item is available
+ ch <- q.Take().(string)
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+
+ select {
+ case _ = <-ch:
+ assert.Fail(t, "Should not have had a value at this point")
+ default:
+ // Good, no value yet
+ }
+
+ q.Put("test-2")
+
+ x := <-ch
+ assert.Equal(t, "test-2", x)
+
+ // Fill the queue
+ for i := 0; i < 10; i++ {
+ q.Put(fmt.Sprintf("i-%d", i))
+ assert.Equal(t, i+1, q.Size())
+ }
+
+ for i := 0; i < 10; i++ {
+ assert.Equal(t, fmt.Sprintf("i-%d", i), q.Take())
+ }
+
+ close(ch)
+}
+
+func TestBlockingQueueWaitWhenFull(t *testing.T) {
+ q := NewBlockingQueue(3)
+
+ q.Put("test-1")
+ q.Put("test-2")
+ q.Put("test-3")
+ assert.Equal(t, 3, q.Size())
+
+ ch := make(chan bool)
+
+ go func() {
+ // Stays blocked until space is available
+ q.Put("test-4")
+ ch <- true
+ }()
+
+ time.Sleep(100 * time.Millisecond)
+
+ select {
+ case _ = <-ch:
+ assert.Fail(t, "Should not have had a value at this point")
+ default:
+ // Good, no value yet
+ }
+
+ assert.Equal(t, "test-1", q.Poll())
+
+ // Now the go-routine should have completed
+ _ = <-ch
+ assert.Equal(t, 3, q.Size())
+
+ assert.Equal(t, "test-2", q.Take())
+ assert.Equal(t, "test-3", q.Take())
+ assert.Equal(t, "test-4", q.Take())
+
+ close(ch)
+}