You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by mm...@apache.org on 2019/05/14 16:33:48 UTC

[pulsar-client-go] 11/38: Added blocking queue implementation

This is an automated email from the ASF dual-hosted git repository.

mmerli pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar-client-go.git

commit b95a97191e2a5c63d67ab41940dcd57c1032c7ee
Author: Matteo Merli <mm...@apache.org>
AuthorDate: Tue Apr 9 21:28:31 2019 -0700

    Added blocking queue implementation
---
 pulsar/impl/util/blocking_queue.go      | 125 ++++++++++++++++++++++++++++++++
 pulsar/impl/util/blocking_queue_test.go |  97 +++++++++++++++++++++++++
 2 files changed, 222 insertions(+)

diff --git a/pulsar/impl/util/blocking_queue.go b/pulsar/impl/util/blocking_queue.go
new file mode 100644
index 0000000..281d6ba
--- /dev/null
+++ b/pulsar/impl/util/blocking_queue.go
@@ -0,0 +1,125 @@
+package util
+
+import (
+	"sync"
+)
+
+type BlockingQueue interface {
+	// Enqueue one item, block if the queue is full
+	Put(item interface{})
+
+	// Dequeue one item, block until it's available
+	Take() interface{}
+
+	// Dequeue one item, return nil if queue is empty
+	Poll() interface{}
+
+	// Return one item without dequeing, return nil if queue is empty
+	Peek() interface{}
+
+	// Return the current size of the queue
+	Size() int
+}
+
+type blockingQueue struct {
+	items   []interface{}
+	headIdx int
+	tailIdx int
+	size    int
+	maxSize int
+
+	mutex      sync.Mutex
+	isNotEmpty *sync.Cond
+	isNotFull  *sync.Cond
+}
+
+func NewBlockingQueue(maxSize int) BlockingQueue {
+	bq := &blockingQueue{
+		items:   make([]interface{}, maxSize),
+		headIdx: 0,
+		tailIdx: 0,
+		size:    0,
+		maxSize: maxSize,
+	}
+
+	bq.isNotEmpty = sync.NewCond(&bq.mutex)
+	bq.isNotFull = sync.NewCond(&bq.mutex)
+	return bq
+}
+
+func (bq *blockingQueue) Put(item interface{}) {
+	bq.mutex.Lock()
+	defer bq.mutex.Unlock()
+
+	for ; bq.size == bq.maxSize; {
+		bq.isNotFull.Wait()
+	}
+
+	wasEmpty := bq.size == 0
+
+	bq.items[bq.tailIdx] = item
+	bq.size += 1
+	bq.tailIdx += 1
+	if bq.tailIdx >= bq.maxSize {
+		bq.tailIdx = 0
+	}
+
+	if wasEmpty {
+		// Wake up eventual reader waiting for next item
+		bq.isNotEmpty.Signal()
+	}
+}
+
+func (bq *blockingQueue) Take() interface{} {
+	bq.mutex.Lock()
+	defer bq.mutex.Unlock()
+
+	for ; bq.size == 0; {
+		bq.isNotEmpty.Wait()
+	}
+
+	return bq.dequeue()
+}
+
+func (bq *blockingQueue) Poll() interface{} {
+	bq.mutex.Lock()
+	defer bq.mutex.Unlock()
+
+	if bq.size == 0 {
+		return nil
+	}
+
+	return bq.dequeue()
+}
+
+func (bq *blockingQueue) Peek() interface{} {
+	bq.mutex.Lock()
+	defer bq.mutex.Unlock()
+
+	if bq.size == 0 {
+		return nil
+	} else {
+		return bq.items[bq.headIdx]
+	}
+}
+
+func (bq *blockingQueue) dequeue() interface{} {
+	item := bq.items[bq.headIdx]
+	bq.items[bq.headIdx] = nil
+
+	bq.headIdx += 1
+	if bq.headIdx == len(bq.items) {
+		bq.headIdx = 0
+	}
+
+	bq.size -= 1
+	bq.isNotFull.Signal()
+	return item
+}
+
+func (bq *blockingQueue) Size() int {
+	bq.mutex.Lock()
+	defer bq.mutex.Unlock()
+
+	return bq.size
+}
diff --git a/pulsar/impl/util/blocking_queue_test.go b/pulsar/impl/util/blocking_queue_test.go
new file mode 100644
index 0000000..0ecc8d1
--- /dev/null
+++ b/pulsar/impl/util/blocking_queue_test.go
@@ -0,0 +1,97 @@
+package util
+
+import (
+	"fmt"
+	"testing"
+	"time"
+
+	"github.com/stretchr/testify/assert"
+)
+
+func TestBlockingQueue(t *testing.T) {
+	q := NewBlockingQueue(10)
+
+	assert.Equal(t, 0, q.Size())
+	assert.Equal(t, nil, q.Poll())
+	assert.Equal(t, nil, q.Peek())
+
+	q.Put("test")
+	assert.Equal(t, 1, q.Size())
+
+	assert.Equal(t, "test", q.Peek())
+	assert.Equal(t, 1, q.Size())
+
+	assert.Equal(t, "test", q.Take())
+	assert.Equal(t, 0, q.Size())
+
+	ch := make(chan string)
+
+	go func() {
+		// Stays blocked until item is available
+		ch <- q.Take().(string)
+	}()
+
+	time.Sleep(100 * time.Millisecond)
+
+	select {
+	case _ = <-ch:
+		assert.Fail(t, "Should not have had a value at this point")
+	default:
+		// Good, no value yet
+	}
+
+	q.Put("test-2")
+
+	x := <-ch
+	assert.Equal(t, "test-2", x)
+
+	// Fill the queue
+	for i := 0; i < 10; i++ {
+		q.Put(fmt.Sprintf("i-%d", i))
+		assert.Equal(t, i+1, q.Size())
+	}
+
+	for i := 0; i < 10; i++ {
+		assert.Equal(t, fmt.Sprintf("i-%d", i), q.Take())
+	}
+
+	close(ch)
+}
+
+func TestBlockingQueueWaitWhenFull(t *testing.T) {
+	q := NewBlockingQueue(3)
+
+	q.Put("test-1")
+	q.Put("test-2")
+	q.Put("test-3")
+	assert.Equal(t, 3, q.Size())
+
+	ch := make(chan bool)
+
+	go func() {
+		// Stays blocked until space is available
+		q.Put("test-4")
+		ch <- true
+	}()
+
+	time.Sleep(100 * time.Millisecond)
+
+	select {
+	case _ = <-ch:
+		assert.Fail(t, "Should not have had a value at this point")
+	default:
+		// Good, no value yet
+	}
+
+	assert.Equal(t, "test-1", q.Poll())
+
+	// Now the go-routine should have completed
+	_ = <-ch
+	assert.Equal(t, 3, q.Size())
+
+	assert.Equal(t, "test-2", q.Take())
+	assert.Equal(t, "test-3", q.Take())
+	assert.Equal(t, "test-4", q.Take())
+
+	close(ch)
+}