You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/08/22 11:03:36 UTC

[incubator-eventmesh] 09/10: add until in message queue

This is an automated email from the ASF dual-hosted git repository.

walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git

commit 6507be75b015ddbcc14cb5cf3387e40c7dd84b65
Author: walleliu <li...@163.com>
AuthorDate: Fri Aug 19 18:10:22 2022 +0800

    add until in message queue
---
 .../pkg/connector/standalone/message_queue.go            | 16 +++++++++++++++-
 1 file changed, 15 insertions(+), 1 deletion(-)

diff --git a/eventmesh-server-go/pkg/connector/standalone/message_queue.go b/eventmesh-server-go/pkg/connector/standalone/message_queue.go
index 3e5173cf..63a7f65a 100644
--- a/eventmesh-server-go/pkg/connector/standalone/message_queue.go
+++ b/eventmesh-server-go/pkg/connector/standalone/message_queue.go
@@ -18,6 +18,9 @@ package standalone
 import (
 	"fmt"
 	"sync"
+	"time"
+
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
 )
 
 var (
@@ -33,6 +36,8 @@ type MessageQueue struct {
 	putIndex  int
 	count     int
 	capacity  int
+	notFull   *util.Until
+	notEmpty  *util.Until
 	lock      *sync.Mutex
 }
 
@@ -45,11 +50,20 @@ func NewDefaultMessageQueue() *MessageQueue {
 // NewMessageQueueWithCapacity crate message queue with
 // given capacity
 func NewMessageQueueWithCapacity(capacity int) *MessageQueue {
-	return &MessageQueue{
+	mq := &MessageQueue{
 		items:    make([]*MessageEntity, capacity),
 		lock:     new(sync.Mutex),
 		capacity: capacity,
 	}
+
+	mq.notFull = util.NewUntil(func() bool {
+		return mq.count == len(mq.items)
+	}, time.Second)
+	mq.notEmpty = util.NewUntil(func() bool {
+		return mq.count == 0
+	}, time.Second)
+
+	return mq
 }
 
 // Put insert the message at the tail of this queue,


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org