You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by mi...@apache.org on 2022/10/20 02:58:09 UTC

[incubator-eventmesh] branch eventmesh-server-go updated: go server standalond connector optimization

This is an automated email from the ASF dual-hosted git repository.

mikexue pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
     new 6c480b08 go server standalond connector optimization
     new f94c6e74 Merge pull request #1652 from horoc/eventmesh-server-go
6c480b08 is described below

commit 6c480b08f7b9065566ffedac9b4033712e69147e
Author: horoc <ho...@gmail.com>
AuthorDate: Tue Oct 18 21:57:08 2022 +0800

    go server standalond connector optimization
---
 .../plugin/connector/standalone/consumer.go        | 31 ++++++++-------
 .../standalone/standalone_connector_test.go        | 44 +++++++++++++++++++++-
 2 files changed, 60 insertions(+), 15 deletions(-)

diff --git a/eventmesh-server-go/plugin/connector/standalone/consumer.go b/eventmesh-server-go/plugin/connector/standalone/consumer.go
index b2d8cba9..da26a747 100644
--- a/eventmesh-server-go/plugin/connector/standalone/consumer.go
+++ b/eventmesh-server-go/plugin/connector/standalone/consumer.go
@@ -29,19 +29,19 @@ import (
 )
 
 type Consumer struct {
-	broker        *Broker
-	subscribes    map[string]*SubscribeWorker
-	consumeOffset map[string]*atomic.Int64
-	mutex         sync.Mutex
-	listener      connector.EventListener
-	started       atomic.Bool
+	broker          *Broker
+	subscribes      map[string]*SubscribeWorker
+	committedOffset map[string]*atomic.Int64
+	mutex           sync.Mutex
+	listener        connector.EventListener
+	started         atomic.Bool
 }
 
 func NewConsumer() *Consumer {
 	return &Consumer{
-		broker:        GetBroker(),
-		subscribes:    make(map[string]*SubscribeWorker),
-		consumeOffset: make(map[string]*atomic.Int64),
+		broker:          GetBroker(),
+		subscribes:      make(map[string]*SubscribeWorker),
+		committedOffset: make(map[string]*atomic.Int64),
 	}
 }
 
@@ -67,7 +67,7 @@ func (c *Consumer) Subscribe(topicName string) error {
 			quit:      make(chan struct{}, 1),
 		}
 
-		c.consumeOffset[topicName] = offset
+		c.committedOffset[topicName] = offset
 		c.subscribes[topicName] = worker
 		go worker.run()
 	}
@@ -90,10 +90,13 @@ func (c *Consumer) UpdateOffset(ctx context.Context, events []*ce.Event) error {
 	for _, event := range events {
 		topicName := event.Subject()
 		offset := GetOffsetFromEvent(event)
-		if curOffset, ok := c.consumeOffset[topicName]; ok {
+		if curOffset, ok := c.committedOffset[topicName]; ok {
 			if offset <= 0 {
 				return fmt.Errorf("fail to update offset, invalid param, topic %s, offset : %d", topicName, offset)
 			}
+			if offset < curOffset.Load() {
+				return nil
+			}
 			curOffset.Store(offset)
 		}
 	}
@@ -170,7 +173,7 @@ func (w *SubscribeWorker) pollMessage() error {
 			return nil
 		}
 	} else {
-		message, err = w.broker.TakeMessageByOffset(w.topicName, w.offset.Load())
+		message, err = w.broker.TakeMessageByOffset(w.topicName, w.offset.Load()+1)
 	}
 	if err != nil {
 		return errors.Wrap(err, "fail to take message from standalone broker")
@@ -180,12 +183,12 @@ func (w *SubscribeWorker) pollMessage() error {
 		switch action {
 		case connector.CommitMessage:
 			// update offset
-			w.offset.Add(1)
+			w.offset.Store(message.GetOffset())
 		case connector.ReconsumeLater:
 			// No-Op
 		case connector.ManualAck:
 			// update offset
-			w.offset.Add(1)
+			w.offset.Store(message.GetOffset())
 		default:
 		}
 		return nil
diff --git a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
index 9c6d8e9b..9a099e73 100644
--- a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
+++ b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
@@ -107,6 +107,48 @@ func TestConsumer_Subscribe(t *testing.T) {
 	assert.Equal(t, int64(1275), sum.Load())
 }
 
+func TestConsumer_ManualAck(t *testing.T) {
+	sum := atomic.NewInt64(0)
+	var wg sync.WaitGroup
+	wg.Add(50)
+
+	listener := connector.EventListener{
+		Consume: func(event *ce.Event, commitFunc connector.CommitFunc) error {
+			defer wg.Done()
+
+			var data map[string]interface{}
+			event.DataAs(&data)
+			index := int64(data["val"].(float64))
+			sum.Add(index)
+			commitFunc(connector.ManualAck)
+			return nil
+		},
+	}
+
+	factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+	consumer, _ := factory.GetConsumer()
+	consumer.Start()
+	consumer.RegisterEventListener(&listener)
+	consumer.Subscribe(topicName)
+	defer consumer.Shutdown()
+
+	producer, _ := factory.GetProducer()
+	producer.Start()
+	defer producer.Shutdown()
+	for i := 1; i <= 50; i++ {
+		err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
+			"val": i,
+		}), getEmptyPublishCallback())
+
+		if err != nil {
+			t.Fail()
+			return
+		}
+	}
+	wg.Wait()
+	assert.Equal(t, int64(1275), sum.Load())
+}
+
 func TestConsumer_UpdateOffset(t *testing.T) {
 	sum := atomic.NewInt64(0)
 	ch := make(chan struct{})
@@ -127,7 +169,7 @@ func TestConsumer_UpdateOffset(t *testing.T) {
 	defer consumer.Shutdown()
 	consumer.RegisterEventListener(&listener)
 	event := getTestEvent()
-	event.SetExtension("offset", "50")
+	event.SetExtension("offset", "49")
 	consumer.Subscribe(topicName)
 	consumer.UpdateOffset(context.Background(), []*ce.Event{event})
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org