You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/08/22 11:03:32 UTC

[incubator-eventmesh] 05/10: add broker

This is an automated email from the ASF dual-hosted git repository.

walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git

commit 50b063ea39a1adae9e9c468f34abe59dff9a582d
Author: walleliu1016 <li...@163.com>
AuthorDate: Thu Aug 18 21:57:32 2022 +0800

    add broker
---
 eventmesh-server-go/pkg/connector/consumer.go      |   2 +-
 eventmesh-server-go/pkg/connector/listener.go      |   2 +-
 eventmesh-server-go/pkg/connector/publisher.go     |   8 +-
 .../pkg/connector/standalone/broker.go             | 106 ++++++++++++++++++++-
 .../pkg/connector/standalone/message_entity.go     |   8 +-
 .../{listener.go => standalone/subscribe.go}       |   9 --
 6 files changed, 113 insertions(+), 22 deletions(-)

diff --git a/eventmesh-server-go/pkg/connector/consumer.go b/eventmesh-server-go/pkg/connector/consumer.go
index cf9a0bba..dc493388 100644
--- a/eventmesh-server-go/pkg/connector/consumer.go
+++ b/eventmesh-server-go/pkg/connector/consumer.go
@@ -25,7 +25,7 @@ type Consumer interface {
 
 	Initialize(*Properties) error
 
-	UpdateOffset([]cloudevents.Event)
+	UpdateOffset([]*cloudevents.Event)
 
 	Subscribe(string)
 
diff --git a/eventmesh-server-go/pkg/connector/listener.go b/eventmesh-server-go/pkg/connector/listener.go
index e3cb658d..056746d2 100644
--- a/eventmesh-server-go/pkg/connector/listener.go
+++ b/eventmesh-server-go/pkg/connector/listener.go
@@ -21,5 +21,5 @@ import (
 
 // EventListener listener to consume the cloudevents message
 type EventListener interface {
-	Consume(cloudevents.Event)
+	Consume(*cloudevents.Event)
 }
diff --git a/eventmesh-server-go/pkg/connector/publisher.go b/eventmesh-server-go/pkg/connector/publisher.go
index 7026d29d..2f96d963 100644
--- a/eventmesh-server-go/pkg/connector/publisher.go
+++ b/eventmesh-server-go/pkg/connector/publisher.go
@@ -49,13 +49,13 @@ type Producer interface {
 
 	Initialize(*Properties) error
 
-	Publish(cloudevents.Event, SendCallback) error
+	Publish(*cloudevents.Event, SendCallback) error
 
-	SendOneway(cloudevents.Event) error
+	SendOneway(*cloudevents.Event) error
 
-	Request(cloudevents.Event, RequestReplyCallback, time.Duration) error
+	Request(*cloudevents.Event, RequestReplyCallback, time.Duration) error
 
-	Reply(cloudevents.Event, SendCallback) error
+	Reply(*cloudevents.Event, SendCallback) error
 
 	CheckTopicExist(string) bool
 
diff --git a/eventmesh-server-go/pkg/connector/standalone/broker.go b/eventmesh-server-go/pkg/connector/standalone/broker.go
index 048e8126..d6bede40 100644
--- a/eventmesh-server-go/pkg/connector/standalone/broker.go
+++ b/eventmesh-server-go/pkg/connector/standalone/broker.go
@@ -17,6 +17,9 @@ package standalone
 
 import (
 	"context"
+	"fmt"
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+	"go.uber.org/atomic"
 	"sync"
 	"time"
 )
@@ -26,16 +29,19 @@ var (
 	// If the currentTimeMills - messageCreateTimeMills >= MESSAGE_STORE_WINDOW,
 	// then the message will be clear, default to 1 hour
 	messageStoreWindow = time.Hour
+
+	// ErrTopicNotExist topic queue not exist in the broker
+	ErrTopicNotExist = fmt.Errorf("topic queue not exist")
 )
 
 // Broker used to store event, it just support standalone mode,
 // you shouldn't use this module in production environment
 type Broker struct {
 	// messageContainer store the topic and the queue
-	// key = TopicMetadata value = MessageQueue
+	// key = topicName value = MessageQueue
 	messageContainer *sync.Map
 	// offsetMap store the offset for topic
-	// key = TopicMetadata value = atomic.Long
+	// key = topicName value = atomic.Long
 	offsetMap *sync.Map
 }
 
@@ -63,7 +69,7 @@ func (b *Broker) startHistoryMessageCleanTask(ctx context.Context) {
 				if currentMsg == nil {
 					return true
 				}
-				if now.Sub(currentMsg.createTime) > messageStoreWindow {
+				if now.Sub(currentMsg.CreateTime) > messageStoreWindow {
 					v.(*MessageQueue).RemoveHead()
 				}
 				return true
@@ -71,3 +77,97 @@ func (b *Broker) startHistoryMessageCleanTask(ctx context.Context) {
 		}
 	}
 }
+
+// PutMessage put message into broker
+func (b *Broker) PutMessage(topicName string, event *cloudevents.Event) (*MessageEntity, error) {
+	queue, offset := b.createTopicIfAbsent(topicName)
+	msg := &MessageEntity{
+		TopicMetadata: &TopicMetadata{
+			TopicName: topicName,
+		},
+		Message:    event,
+		Offset:     offset.Inc(),
+		CreateTime: time.Now(),
+	}
+	return msg, queue.Put(msg)
+}
+
+// TakeMessage Get the message, if the queue is empty then await
+func (b *Broker) TakeMessage(topicName string) (*cloudevents.Event, error) {
+	val, ok := b.messageContainer.Load(topicName)
+	if !ok {
+		return nil, ErrTopicNotExist
+	}
+	queue := val.(*MessageQueue)
+
+	return queue.Take().Message, nil
+}
+
+// GetMessage return the message in the head
+func (b *Broker) GetMessage(topicName string) (*cloudevents.Event, error) {
+	return b.GetMessageByOffset(topicName, 0)
+}
+
+// GetMessageByOffset get the message according to the offset
+// if offset is zero, head message will return
+func (b *Broker) GetMessageByOffset(topicName string, offset int64) (*cloudevents.Event, error) {
+	val, ok := b.messageContainer.Load(topicName)
+	if !ok {
+		return nil, ErrTopicNotExist
+	}
+	queue := val.(*MessageQueue)
+
+	if offset == 0 {
+		return queue.GetHead().Message, nil
+	}
+
+	msg, err := queue.GetByOffset(offset)
+	if err != nil {
+		return nil, err
+	}
+	return msg.Message, nil
+}
+
+// CheckTopicExist check the topic is exist in the broker
+func (b *Broker) CheckTopicExist(topicName string) bool {
+	_, ok := b.messageContainer.Load(topicName)
+	return ok
+}
+
+// UpdateOffset update the topic offset
+func (b *Broker) UpdateOffset(topicName string, offset int64) error {
+	val, ok := b.offsetMap.Load(topicName)
+	if !ok {
+		return ErrTopicNotExist
+	}
+
+	af := val.(*atomic.Int64)
+	af.Store(offset)
+	return nil
+}
+
+// createTopicIfAbsent create the message queue and offset if not exist
+func (b *Broker) createTopicIfAbsent(topicName string) (*MessageQueue, *atomic.Int64) {
+	var (
+		offset *atomic.Int64
+		queue  *MessageQueue
+	)
+
+	val, ok := b.messageContainer.Load(topicName)
+	if !ok {
+		queue = NewDefaultMessageQueue()
+		b.messageContainer.Store(topicName, queue)
+	} else {
+		queue = val.(*MessageQueue)
+	}
+
+	valoffset, ok := b.offsetMap.Load(topicName)
+	if !ok {
+		offset = atomic.NewInt64(0)
+		b.offsetMap.Store(topicName, offset)
+	} else {
+		offset = (valoffset).(*atomic.Int64)
+	}
+
+	return queue, offset
+}
diff --git a/eventmesh-server-go/pkg/connector/standalone/message_entity.go b/eventmesh-server-go/pkg/connector/standalone/message_entity.go
index ea7a0f18..15ddaf8e 100644
--- a/eventmesh-server-go/pkg/connector/standalone/message_entity.go
+++ b/eventmesh-server-go/pkg/connector/standalone/message_entity.go
@@ -25,8 +25,8 @@ type TopicMetadata struct {
 }
 
 type MessageEntity struct {
-	TopicMetadata *TopicMetadata    `json:"topicMetadata"`
-	Message       cloudevents.Event `json:"message"`
-	Offset        int64             `json:"offset"`
-	createTime    time.Time         `json:"createTime"`
+	TopicMetadata *TopicMetadata     `json:"topicMetadata"`
+	Message       *cloudevents.Event `json:"message"`
+	Offset        int64              `json:"offset"`
+	CreateTime    time.Time          `json:"createTime"`
 }
diff --git a/eventmesh-server-go/pkg/connector/listener.go b/eventmesh-server-go/pkg/connector/standalone/subscribe.go
similarity index 81%
copy from eventmesh-server-go/pkg/connector/listener.go
copy to eventmesh-server-go/pkg/connector/standalone/subscribe.go
index e3cb658d..5b7af9a7 100644
--- a/eventmesh-server-go/pkg/connector/listener.go
+++ b/eventmesh-server-go/pkg/connector/standalone/subscribe.go
@@ -14,12 +14,3 @@
 // limitations under the License.
 
 package standalone
-
-import (
-	cloudevents "github.com/cloudevents/sdk-go/v2"
-)
-
-// EventListener listener to consume the cloudevents message
-type EventListener interface {
-	Consume(cloudevents.Event)
-}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org