You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/08/22 11:03:32 UTC
[incubator-eventmesh] 05/10: add broker
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
commit 50b063ea39a1adae9e9c468f34abe59dff9a582d
Author: walleliu1016 <li...@163.com>
AuthorDate: Thu Aug 18 21:57:32 2022 +0800
add broker
---
eventmesh-server-go/pkg/connector/consumer.go | 2 +-
eventmesh-server-go/pkg/connector/listener.go | 2 +-
eventmesh-server-go/pkg/connector/publisher.go | 8 +-
.../pkg/connector/standalone/broker.go | 106 ++++++++++++++++++++-
.../pkg/connector/standalone/message_entity.go | 8 +-
.../{listener.go => standalone/subscribe.go} | 9 --
6 files changed, 113 insertions(+), 22 deletions(-)
diff --git a/eventmesh-server-go/pkg/connector/consumer.go b/eventmesh-server-go/pkg/connector/consumer.go
index cf9a0bba..dc493388 100644
--- a/eventmesh-server-go/pkg/connector/consumer.go
+++ b/eventmesh-server-go/pkg/connector/consumer.go
@@ -25,7 +25,7 @@ type Consumer interface {
Initialize(*Properties) error
- UpdateOffset([]cloudevents.Event)
+ UpdateOffset([]*cloudevents.Event)
Subscribe(string)
diff --git a/eventmesh-server-go/pkg/connector/listener.go b/eventmesh-server-go/pkg/connector/listener.go
index e3cb658d..056746d2 100644
--- a/eventmesh-server-go/pkg/connector/listener.go
+++ b/eventmesh-server-go/pkg/connector/listener.go
@@ -21,5 +21,5 @@ import (
// EventListener listener to consume the cloudevents message
type EventListener interface {
- Consume(cloudevents.Event)
+ Consume(*cloudevents.Event)
}
diff --git a/eventmesh-server-go/pkg/connector/publisher.go b/eventmesh-server-go/pkg/connector/publisher.go
index 7026d29d..2f96d963 100644
--- a/eventmesh-server-go/pkg/connector/publisher.go
+++ b/eventmesh-server-go/pkg/connector/publisher.go
@@ -49,13 +49,13 @@ type Producer interface {
Initialize(*Properties) error
- Publish(cloudevents.Event, SendCallback) error
+ Publish(*cloudevents.Event, SendCallback) error
- SendOneway(cloudevents.Event) error
+ SendOneway(*cloudevents.Event) error
- Request(cloudevents.Event, RequestReplyCallback, time.Duration) error
+ Request(*cloudevents.Event, RequestReplyCallback, time.Duration) error
- Reply(cloudevents.Event, SendCallback) error
+ Reply(*cloudevents.Event, SendCallback) error
CheckTopicExist(string) bool
diff --git a/eventmesh-server-go/pkg/connector/standalone/broker.go b/eventmesh-server-go/pkg/connector/standalone/broker.go
index 048e8126..d6bede40 100644
--- a/eventmesh-server-go/pkg/connector/standalone/broker.go
+++ b/eventmesh-server-go/pkg/connector/standalone/broker.go
@@ -17,6 +17,9 @@ package standalone
import (
"context"
+ "fmt"
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "go.uber.org/atomic"
"sync"
"time"
)
@@ -26,16 +29,19 @@ var (
// If the currentTimeMills - messageCreateTimeMills >= MESSAGE_STORE_WINDOW,
// then the message will be clear, default to 1 hour
messageStoreWindow = time.Hour
+
+ // ErrTopicNotExist topic queue not exist in the broker
+ ErrTopicNotExist = fmt.Errorf("topic queue not exist")
)
// Broker used to store event, it just support standalone mode,
// you shouldn't use this module in production environment
type Broker struct {
// messageContainer store the topic and the queue
- // key = TopicMetadata value = MessageQueue
+ // key = topicName value = MessageQueue
messageContainer *sync.Map
// offsetMap store the offset for topic
- // key = TopicMetadata value = atomic.Long
+ // key = topicName value = atomic.Long
offsetMap *sync.Map
}
@@ -63,7 +69,7 @@ func (b *Broker) startHistoryMessageCleanTask(ctx context.Context) {
if currentMsg == nil {
return true
}
- if now.Sub(currentMsg.createTime) > messageStoreWindow {
+ if now.Sub(currentMsg.CreateTime) > messageStoreWindow {
v.(*MessageQueue).RemoveHead()
}
return true
@@ -71,3 +77,97 @@ func (b *Broker) startHistoryMessageCleanTask(ctx context.Context) {
}
}
}
+
+// PutMessage put message into broker
+func (b *Broker) PutMessage(topicName string, event *cloudevents.Event) (*MessageEntity, error) {
+ queue, offset := b.createTopicIfAbsent(topicName)
+ msg := &MessageEntity{
+ TopicMetadata: &TopicMetadata{
+ TopicName: topicName,
+ },
+ Message: event,
+ Offset: offset.Inc(),
+ CreateTime: time.Now(),
+ }
+ return msg, queue.Put(msg)
+}
+
+// TakeMessage Get the message, if the queue is empty then await
+func (b *Broker) TakeMessage(topicName string) (*cloudevents.Event, error) {
+ val, ok := b.messageContainer.Load(topicName)
+ if !ok {
+ return nil, ErrTopicNotExist
+ }
+ queue := val.(*MessageQueue)
+
+ return queue.Take().Message, nil
+}
+
+// GetMessage return the message in the head
+func (b *Broker) GetMessage(topicName string) (*cloudevents.Event, error) {
+ return b.GetMessageByOffset(topicName, 0)
+}
+
+// GetMessageByOffset get the message according to the offset
+// if offset is zero, head message will return
+func (b *Broker) GetMessageByOffset(topicName string, offset int64) (*cloudevents.Event, error) {
+ val, ok := b.messageContainer.Load(topicName)
+ if !ok {
+ return nil, ErrTopicNotExist
+ }
+ queue := val.(*MessageQueue)
+
+ if offset == 0 {
+ return queue.GetHead().Message, nil
+ }
+
+ msg, err := queue.GetByOffset(offset)
+ if err != nil {
+ return nil, err
+ }
+ return msg.Message, nil
+}
+
+// CheckTopicExist check the topic is exist in the broker
+func (b *Broker) CheckTopicExist(topicName string) bool {
+ _, ok := b.messageContainer.Load(topicName)
+ return ok
+}
+
+// UpdateOffset update the topic offset
+func (b *Broker) UpdateOffset(topicName string, offset int64) error {
+ val, ok := b.offsetMap.Load(topicName)
+ if !ok {
+ return ErrTopicNotExist
+ }
+
+ af := val.(*atomic.Int64)
+ af.Store(offset)
+ return nil
+}
+
+// createTopicIfAbsent create the message queue and offset if not exist
+func (b *Broker) createTopicIfAbsent(topicName string) (*MessageQueue, *atomic.Int64) {
+ var (
+ offset *atomic.Int64
+ queue *MessageQueue
+ )
+
+ val, ok := b.messageContainer.Load(topicName)
+ if !ok {
+ queue = NewDefaultMessageQueue()
+ b.messageContainer.Store(topicName, queue)
+ } else {
+ queue = val.(*MessageQueue)
+ }
+
+ valoffset, ok := b.offsetMap.Load(topicName)
+ if !ok {
+ offset = atomic.NewInt64(0)
+ b.offsetMap.Store(topicName, offset)
+ } else {
+ offset = (valoffset).(*atomic.Int64)
+ }
+
+ return queue, offset
+}
diff --git a/eventmesh-server-go/pkg/connector/standalone/message_entity.go b/eventmesh-server-go/pkg/connector/standalone/message_entity.go
index ea7a0f18..15ddaf8e 100644
--- a/eventmesh-server-go/pkg/connector/standalone/message_entity.go
+++ b/eventmesh-server-go/pkg/connector/standalone/message_entity.go
@@ -25,8 +25,8 @@ type TopicMetadata struct {
}
type MessageEntity struct {
- TopicMetadata *TopicMetadata `json:"topicMetadata"`
- Message cloudevents.Event `json:"message"`
- Offset int64 `json:"offset"`
- createTime time.Time `json:"createTime"`
+ TopicMetadata *TopicMetadata `json:"topicMetadata"`
+ Message *cloudevents.Event `json:"message"`
+ Offset int64 `json:"offset"`
+ CreateTime time.Time `json:"createTime"`
}
diff --git a/eventmesh-server-go/pkg/connector/listener.go b/eventmesh-server-go/pkg/connector/standalone/subscribe.go
similarity index 81%
copy from eventmesh-server-go/pkg/connector/listener.go
copy to eventmesh-server-go/pkg/connector/standalone/subscribe.go
index e3cb658d..5b7af9a7 100644
--- a/eventmesh-server-go/pkg/connector/listener.go
+++ b/eventmesh-server-go/pkg/connector/standalone/subscribe.go
@@ -14,12 +14,3 @@
// limitations under the License.
package standalone
-
-import (
- cloudevents "github.com/cloudevents/sdk-go/v2"
-)
-
-// EventListener listener to consume the cloudevents message
-type EventListener interface {
- Consume(cloudevents.Event)
-}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org