You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/20 02:58:09 UTC
[incubator-eventmesh] branch eventmesh-server-go updated: go server standalond connector optimization
This is an automated email from the ASF dual-hosted git repository.
mikexue pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
new 6c480b08 go server standalond connector optimization
new f94c6e74 Merge pull request #1652 from horoc/eventmesh-server-go
6c480b08 is described below
commit 6c480b08f7b9065566ffedac9b4033712e69147e
Author: horoc <ho...@gmail.com>
AuthorDate: Tue Oct 18 21:57:08 2022 +0800
go server standalond connector optimization
---
.../plugin/connector/standalone/consumer.go | 31 ++++++++-------
.../standalone/standalone_connector_test.go | 44 +++++++++++++++++++++-
2 files changed, 60 insertions(+), 15 deletions(-)
diff --git a/eventmesh-server-go/plugin/connector/standalone/consumer.go b/eventmesh-server-go/plugin/connector/standalone/consumer.go
index b2d8cba9..da26a747 100644
--- a/eventmesh-server-go/plugin/connector/standalone/consumer.go
+++ b/eventmesh-server-go/plugin/connector/standalone/consumer.go
@@ -29,19 +29,19 @@ import (
)
type Consumer struct {
- broker *Broker
- subscribes map[string]*SubscribeWorker
- consumeOffset map[string]*atomic.Int64
- mutex sync.Mutex
- listener connector.EventListener
- started atomic.Bool
+ broker *Broker
+ subscribes map[string]*SubscribeWorker
+ committedOffset map[string]*atomic.Int64
+ mutex sync.Mutex
+ listener connector.EventListener
+ started atomic.Bool
}
func NewConsumer() *Consumer {
return &Consumer{
- broker: GetBroker(),
- subscribes: make(map[string]*SubscribeWorker),
- consumeOffset: make(map[string]*atomic.Int64),
+ broker: GetBroker(),
+ subscribes: make(map[string]*SubscribeWorker),
+ committedOffset: make(map[string]*atomic.Int64),
}
}
@@ -67,7 +67,7 @@ func (c *Consumer) Subscribe(topicName string) error {
quit: make(chan struct{}, 1),
}
- c.consumeOffset[topicName] = offset
+ c.committedOffset[topicName] = offset
c.subscribes[topicName] = worker
go worker.run()
}
@@ -90,10 +90,13 @@ func (c *Consumer) UpdateOffset(ctx context.Context, events []*ce.Event) error {
for _, event := range events {
topicName := event.Subject()
offset := GetOffsetFromEvent(event)
- if curOffset, ok := c.consumeOffset[topicName]; ok {
+ if curOffset, ok := c.committedOffset[topicName]; ok {
if offset <= 0 {
return fmt.Errorf("fail to update offset, invalid param, topic %s, offset : %d", topicName, offset)
}
+ if offset < curOffset.Load() {
+ return nil
+ }
curOffset.Store(offset)
}
}
@@ -170,7 +173,7 @@ func (w *SubscribeWorker) pollMessage() error {
return nil
}
} else {
- message, err = w.broker.TakeMessageByOffset(w.topicName, w.offset.Load())
+ message, err = w.broker.TakeMessageByOffset(w.topicName, w.offset.Load()+1)
}
if err != nil {
return errors.Wrap(err, "fail to take message from standalone broker")
@@ -180,12 +183,12 @@ func (w *SubscribeWorker) pollMessage() error {
switch action {
case connector.CommitMessage:
// update offset
- w.offset.Add(1)
+ w.offset.Store(message.GetOffset())
case connector.ReconsumeLater:
// No-Op
case connector.ManualAck:
// update offset
- w.offset.Add(1)
+ w.offset.Store(message.GetOffset())
default:
}
return nil
diff --git a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
index 9c6d8e9b..9a099e73 100644
--- a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
+++ b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
@@ -107,6 +107,48 @@ func TestConsumer_Subscribe(t *testing.T) {
assert.Equal(t, int64(1275), sum.Load())
}
+func TestConsumer_ManualAck(t *testing.T) {
+ sum := atomic.NewInt64(0)
+ var wg sync.WaitGroup
+ wg.Add(50)
+
+ listener := connector.EventListener{
+ Consume: func(event *ce.Event, commitFunc connector.CommitFunc) error {
+ defer wg.Done()
+
+ var data map[string]interface{}
+ event.DataAs(&data)
+ index := int64(data["val"].(float64))
+ sum.Add(index)
+ commitFunc(connector.ManualAck)
+ return nil
+ },
+ }
+
+ factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+ consumer, _ := factory.GetConsumer()
+ consumer.Start()
+ consumer.RegisterEventListener(&listener)
+ consumer.Subscribe(topicName)
+ defer consumer.Shutdown()
+
+ producer, _ := factory.GetProducer()
+ producer.Start()
+ defer producer.Shutdown()
+ for i := 1; i <= 50; i++ {
+ err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
+ "val": i,
+ }), getEmptyPublishCallback())
+
+ if err != nil {
+ t.Fail()
+ return
+ }
+ }
+ wg.Wait()
+ assert.Equal(t, int64(1275), sum.Load())
+}
+
func TestConsumer_UpdateOffset(t *testing.T) {
sum := atomic.NewInt64(0)
ch := make(chan struct{})
@@ -127,7 +169,7 @@ func TestConsumer_UpdateOffset(t *testing.T) {
defer consumer.Shutdown()
consumer.RegisterEventListener(&listener)
event := getTestEvent()
- event.SetExtension("offset", "50")
+ event.SetExtension("offset", "49")
consumer.Subscribe(topicName)
consumer.UpdateOffset(context.Background(), []*ce.Event{event})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org