You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/08/22 11:03:36 UTC
[incubator-eventmesh] 09/10: add until in message queue
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
commit 6507be75b015ddbcc14cb5cf3387e40c7dd84b65
Author: walleliu <li...@163.com>
AuthorDate: Fri Aug 19 18:10:22 2022 +0800
add until in message queue
---
.../pkg/connector/standalone/message_queue.go | 16 +++++++++++++++-
1 file changed, 15 insertions(+), 1 deletion(-)
diff --git a/eventmesh-server-go/pkg/connector/standalone/message_queue.go b/eventmesh-server-go/pkg/connector/standalone/message_queue.go
index 3e5173cf..63a7f65a 100644
--- a/eventmesh-server-go/pkg/connector/standalone/message_queue.go
+++ b/eventmesh-server-go/pkg/connector/standalone/message_queue.go
@@ -18,6 +18,9 @@ package standalone
import (
"fmt"
"sync"
+ "time"
+
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
)
var (
@@ -33,6 +36,8 @@ type MessageQueue struct {
putIndex int
count int
capacity int
+ notFull *util.Until
+ notEmpty *util.Until
lock *sync.Mutex
}
@@ -45,11 +50,20 @@ func NewDefaultMessageQueue() *MessageQueue {
// NewMessageQueueWithCapacity crate message queue with
// given capacity
func NewMessageQueueWithCapacity(capacity int) *MessageQueue {
- return &MessageQueue{
+ mq := &MessageQueue{
items: make([]*MessageEntity, capacity),
lock: new(sync.Mutex),
capacity: capacity,
}
+
+ mq.notFull = util.NewUntil(func() bool {
+ return mq.count == len(mq.items)
+ }, time.Second)
+ mq.notEmpty = util.NewUntil(func() bool {
+ return mq.count == 0
+ }, time.Second)
+
+ return mq
}
// Put insert the message at the tail of this queue,
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org