You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/08/22 11:03:34 UTC

[incubator-eventmesh] 07/10: add standalone consumer

This is an automated email from the ASF dual-hosted git repository.

walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git

commit 64a87aa639780b7238b1264d0ca3ca11594a1151
Author: walleliu <li...@163.com>
AuthorDate: Fri Aug 19 11:31:22 2022 +0800

    add standalone consumer
---
 eventmesh-server-go/go.mod                         |   3 +-
 eventmesh-server-go/go.sum                         |   2 +
 eventmesh-server-go/pkg/connector/consumer.go      |   6 +-
 eventmesh-server-go/pkg/connector/lifecycle.go     |   2 +-
 eventmesh-server-go/pkg/connector/listener.go      |   7 +-
 eventmesh-server-go/pkg/connector/properties.go    |   2 +-
 eventmesh-server-go/pkg/connector/publisher.go     |   5 +-
 .../pkg/connector/standalone/consumer.go           | 104 +++++++++++++++++++++
 .../pkg/connector/standalone/subscribe.go          |  87 +++++++++++++++++
 9 files changed, 209 insertions(+), 9 deletions(-)

diff --git a/eventmesh-server-go/go.mod b/eventmesh-server-go/go.mod
index 878a9cdc..8562bbb9 100644
--- a/eventmesh-server-go/go.mod
+++ b/eventmesh-server-go/go.mod
@@ -34,8 +34,9 @@ require (
 	github.com/BurntSushi/toml v1.2.0 // indirect
 	github.com/gin-contrib/pprof v1.4.0
 	github.com/gin-gonic/gin v1.8.1
+	github.com/panjf2000/ants/v2 v2.5.0
 	github.com/unrolled/secure v1.12.0
-	go.uber.org/atomic v1.9.0 // indirect
+	go.uber.org/atomic v1.9.0
 	go.uber.org/fx v1.18.1
 	go.uber.org/multierr v1.7.0 // indirect
 	golang.org/x/sys v0.0.0-20220224120231-95c6836cb0e7 // indirect
diff --git a/eventmesh-server-go/go.sum b/eventmesh-server-go/go.sum
index 5543a0f2..159feb2a 100644
--- a/eventmesh-server-go/go.sum
+++ b/eventmesh-server-go/go.sum
@@ -171,6 +171,8 @@ github.com/nacos-group/nacos-sdk-go/v2 v2.0.3/go.mod h1:SlhyCAv961LcZ198XpKfPEQq
 github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
 github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
 github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
+github.com/panjf2000/ants/v2 v2.5.0 h1:1rWGWSnxCsQBga+nQbA4/iY6VMeNoOIAM0ZWh9u3q2Q=
+github.com/panjf2000/ants/v2 v2.5.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE=
 github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV0GUKU=
 github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo=
 github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
diff --git a/eventmesh-server-go/pkg/connector/consumer.go b/eventmesh-server-go/pkg/connector/consumer.go
index dc493388..43d87963 100644
--- a/eventmesh-server-go/pkg/connector/consumer.go
+++ b/eventmesh-server-go/pkg/connector/consumer.go
@@ -13,9 +13,11 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package standalone
+package connector
 
 import (
+	"context"
+
 	cloudevents "github.com/cloudevents/sdk-go/v2"
 )
 
@@ -27,7 +29,7 @@ type Consumer interface {
 
 	UpdateOffset([]*cloudevents.Event)
 
-	Subscribe(string)
+	Subscribe(context.Context, string)
 
 	UnSubscribe(string)
 
diff --git a/eventmesh-server-go/pkg/connector/lifecycle.go b/eventmesh-server-go/pkg/connector/lifecycle.go
index cfc2b67e..0dd04c6a 100644
--- a/eventmesh-server-go/pkg/connector/lifecycle.go
+++ b/eventmesh-server-go/pkg/connector/lifecycle.go
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package standalone
+package connector
 
 // LifeCycle defines a lifecycle interface for a OMS related service endpoint,
 type LifeCycle interface {
diff --git a/eventmesh-server-go/pkg/connector/listener.go b/eventmesh-server-go/pkg/connector/listener.go
index 056746d2..5dc5ae37 100644
--- a/eventmesh-server-go/pkg/connector/listener.go
+++ b/eventmesh-server-go/pkg/connector/listener.go
@@ -13,13 +13,16 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package standalone
+package connector
 
 import (
 	cloudevents "github.com/cloudevents/sdk-go/v2"
 )
 
+// AsyncConsumeHandler async handler to consume message
+type AsyncConsumeHandler func(EventMeshAction)
+
 // EventListener listener to consume the cloudevents message
 type EventListener interface {
-	Consume(*cloudevents.Event)
+	Consume(*cloudevents.Event, AsyncConsumeHandler)
 }
diff --git a/eventmesh-server-go/pkg/connector/properties.go b/eventmesh-server-go/pkg/connector/properties.go
index 2c4e2ace..86523a1e 100644
--- a/eventmesh-server-go/pkg/connector/properties.go
+++ b/eventmesh-server-go/pkg/connector/properties.go
@@ -13,7 +13,7 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package standalone
+package connector
 
 // Properties represents a persistent set of properties.
 // The Properties can be saved to a stream
diff --git a/eventmesh-server-go/pkg/connector/publisher.go b/eventmesh-server-go/pkg/connector/publisher.go
index 2f96d963..0c18a198 100644
--- a/eventmesh-server-go/pkg/connector/publisher.go
+++ b/eventmesh-server-go/pkg/connector/publisher.go
@@ -13,11 +13,12 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package standalone
+package connector
 
 import (
-	cloudevents "github.com/cloudevents/sdk-go/v2"
 	"time"
+
+	cloudevents "github.com/cloudevents/sdk-go/v2"
 )
 
 type SendResult struct {
diff --git a/eventmesh-server-go/pkg/connector/standalone/consumer.go b/eventmesh-server-go/pkg/connector/standalone/consumer.go
new file mode 100644
index 00000000..5dedab8b
--- /dev/null
+++ b/eventmesh-server-go/pkg/connector/standalone/consumer.go
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package standalone
+
+import (
+	"context"
+	"sync"
+
+	cloudevents "github.com/cloudevents/sdk-go/v2"
+	"go.uber.org/atomic"
+
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/connector"
+)
+
+// defaultPoolsize pool to process message
+var defaultPoolsize = 10
+
+// Consumer standalone comsumer message
+type Consumer struct {
+	broker    *Broker
+	listener  connector.EventListener
+	isStarted *atomic.Bool
+
+	// subscrbes map to store the SubscribeProcessor
+	// key is topicName ,value is SubscribeProcessor
+	subscribes *sync.Map
+}
+
+// NewConsumer create new standalone consumer
+func NewConsumer(ctx context.Context) (*Consumer, error) {
+	return &Consumer{
+		broker:     NewBroker(ctx),
+		listener:   nil,
+		isStarted:  atomic.NewBool(false),
+		subscribes: new(sync.Map),
+	}, nil
+}
+
+func (c *Consumer) Initialize(*connector.Properties) error {
+	return nil
+}
+
+func (c *Consumer) UpdateOffset(events []*cloudevents.Event) {
+	for _, event := range events {
+		c.broker.UpdateOffset(event.Subject(), event.Extensions()[ExtensionOffset].(int64))
+	}
+}
+
+func (c *Consumer) Subscribe(ctx context.Context, topicName string) {
+	if _, has := c.subscribes.Load(topicName); has {
+		return
+	}
+
+	c.broker.createTopicIfAbsent(topicName)
+	p := NewSubscribeProcessor(ctx, topicName, c.broker, c.listener)
+	c.subscribes.Store(topicName, p)
+}
+
+func (c *Consumer) UnSubscribe(topicName string) {
+	p, has := c.subscribes.Load(topicName)
+	if !has {
+		return
+	}
+	p.(*SubscribeProcessor).Shutdown()
+	c.subscribes.Delete(topicName)
+}
+
+func (c *Consumer) RegisterEventListener(lis connector.EventListener) {
+	c.listener = lis
+}
+
+func (c *Consumer) IsStarted() bool {
+	return c.isStarted.Load()
+}
+
+func (c *Consumer) IsClosed() bool {
+	return !c.isStarted.Load()
+}
+
+func (c *Consumer) Start() {
+	c.isStarted.CAS(false, true)
+}
+
+func (c *Consumer) Shutdown() {
+	c.isStarted.CAS(true, false)
+	c.subscribes.Range(func(k, v interface{}) bool {
+		v.(*SubscribeProcessor).Shutdown()
+		return true
+	})
+	c.subscribes = new(sync.Map)
+}
diff --git a/eventmesh-server-go/pkg/connector/standalone/subscribe.go b/eventmesh-server-go/pkg/connector/standalone/subscribe.go
index 5b7af9a7..7cac1f68 100644
--- a/eventmesh-server-go/pkg/connector/standalone/subscribe.go
+++ b/eventmesh-server-go/pkg/connector/standalone/subscribe.go
@@ -14,3 +14,90 @@
 // limitations under the License.
 
 package standalone
+
+import (
+	"context"
+	"time"
+
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/connector"
+	"go.uber.org/atomic"
+)
+
+// ExtensionOffset key for extention about offset in cloudevent
+var ExtensionOffset = "offset"
+
+// SubscribeProcessor procese to handle the subscribe message
+type SubscribeProcessor struct {
+	topicName string
+	broker    *Broker
+	listner   connector.EventListener
+	offset    *atomic.Int64
+	ctx       context.Context
+	cancel    context.CancelFunc
+}
+
+// NewSubscribeProcessor create new topic subscribe processer
+// and start a loop to handle the message
+func NewSubscribeProcessor(ctx context.Context, topicName string, br *Broker, lis connector.EventListener) *SubscribeProcessor {
+	cctx, cancel := context.WithCancel(ctx)
+	s := &SubscribeProcessor{
+		topicName: topicName,
+		broker:    br,
+		listner:   lis,
+		ctx:       cctx,
+		cancel:    cancel,
+	}
+	go s.RunLoop()
+	return s
+}
+
+// RunLoop run loop to process message
+func (s *SubscribeProcessor) RunLoop() {
+	log.Infof("process subscribe for topic:%s", s.topicName)
+
+	tick := time.NewTicker(time.Second)
+	for {
+		select {
+		case <-s.ctx.Done():
+			return
+		case <-tick.C:
+			if s.offset == nil {
+				msg, err := s.broker.GetMessage(s.topicName)
+				if err != nil {
+					log.Warnf("get message from broker err:%v, topic:%v", err, s.topicName)
+					break
+				}
+				if msg != nil {
+					if val, ok := msg.Extensions()[ExtensionOffset]; ok {
+						s.offset = atomic.NewInt64(val.(int64))
+					} else {
+						s.offset = atomic.NewInt64(0)
+					}
+				}
+			}
+
+			msg, err := s.broker.GetMessageByOffset(s.topicName, s.offset.Load())
+			if err != nil {
+				log.Warnf("get message by offset from broker err:%v, topic:%v", err, s.topicName)
+				break
+			}
+			s.listner.Consume(msg, func(ac connector.EventMeshAction) {
+				switch ac {
+				case connector.CommitMessage:
+					log.Infof("commit message:%v, topic:%s, offset:%v", msg.ID(), s.topicName, s.offset.Load())
+				case connector.ManualAck:
+					log.Infof("manual message:%v, topic:%s, offset:%v", msg.ID(), s.topicName, s.offset.Load())
+				case connector.ReconsumeLater:
+					log.Infof("ack message:%v, topic:%s, offset:%v", msg.ID(), s.topicName, s.offset.Inc())
+				}
+			})
+		}
+	}
+}
+
+// Shutdown
+func (s *SubscribeProcessor) Shutdown() {
+	log.Infof("shutdown subscribe processor, topic:%s", s.topicName)
+	s.cancel()
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org