You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/08/22 11:03:34 UTC
[incubator-eventmesh] 07/10: add standalone consumer
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
commit 64a87aa639780b7238b1264d0ca3ca11594a1151
Author: walleliu <li...@163.com>
AuthorDate: Fri Aug 19 11:31:22 2022 +0800
add standalone consumer
---
eventmesh-server-go/go.mod | 3 +-
eventmesh-server-go/go.sum | 2 +
eventmesh-server-go/pkg/connector/consumer.go | 6 +-
eventmesh-server-go/pkg/connector/lifecycle.go | 2 +-
eventmesh-server-go/pkg/connector/listener.go | 7 +-
eventmesh-server-go/pkg/connector/properties.go | 2 +-
eventmesh-server-go/pkg/connector/publisher.go | 5 +-
.../pkg/connector/standalone/consumer.go | 104 +++++++++++++++++++++
.../pkg/connector/standalone/subscribe.go | 87 +++++++++++++++++
9 files changed, 209 insertions(+), 9 deletions(-)
diff --git a/eventmesh-server-go/go.mod b/eventmesh-server-go/go.mod
index 878a9cdc..8562bbb9 100644
--- a/eventmesh-server-go/go.mod
+++ b/eventmesh-server-go/go.mod
@@ -34,8 +34,9 @@ require (
github.com/BurntSushi/toml v1.2.0 // indirect
github.com/gin-contrib/pprof v1.4.0
github.com/gin-gonic/gin v1.8.1
+ github.com/panjf2000/ants/v2 v2.5.0
github.com/unrolled/secure v1.12.0
- go.uber.org/atomic v1.9.0 // indirect
+ go.uber.org/atomic v1.9.0
go.uber.org/fx v1.18.1
go.uber.org/multierr v1.7.0 // indirect
golang.org/x/sys v0.0.0-20220224120231-95c6836cb0e7 // indirect
diff --git a/eventmesh-server-go/go.sum b/eventmesh-server-go/go.sum
index 5543a0f2..159feb2a 100644
--- a/eventmesh-server-go/go.sum
+++ b/eventmesh-server-go/go.sum
@@ -171,6 +171,8 @@ github.com/nacos-group/nacos-sdk-go/v2 v2.0.3/go.mod h1:SlhyCAv961LcZ198XpKfPEQq
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e/go.mod h1:zD1mROLANZcx1PVRCS0qkT7pwLkGfwJo4zjcN/Tysno=
github.com/olekukonko/tablewriter v0.0.5 h1:P2Ga83D34wi1o9J6Wh1mRuqd4mF/x/lgBS7N7AbDhec=
github.com/olekukonko/tablewriter v0.0.5/go.mod h1:hPp6KlRPjbx+hW8ykQs1w3UBbZlj6HuIJcUGPhkA7kY=
+github.com/panjf2000/ants/v2 v2.5.0 h1:1rWGWSnxCsQBga+nQbA4/iY6VMeNoOIAM0ZWh9u3q2Q=
+github.com/panjf2000/ants/v2 v2.5.0/go.mod h1:cU93usDlihJZ5CfRGNDYsiBYvoilLvBF5Qp/BT2GNRE=
github.com/pelletier/go-toml/v2 v2.0.1 h1:8e3L2cCQzLFi2CR4g7vGFuFxX7Jl1kKX8gW+iV0GUKU=
github.com/pelletier/go-toml/v2 v2.0.1/go.mod h1:r9LEWfGN8R5k0VXJ+0BkIe7MYkRdwZOjgMj2KwnJFUo=
github.com/pkg/diff v0.0.0-20210226163009-20ebb0f2a09e/go.mod h1:pJLUxLENpZxwdsKMEsNbx1VGcRFpLqf3715MtcvvzbA=
diff --git a/eventmesh-server-go/pkg/connector/consumer.go b/eventmesh-server-go/pkg/connector/consumer.go
index dc493388..43d87963 100644
--- a/eventmesh-server-go/pkg/connector/consumer.go
+++ b/eventmesh-server-go/pkg/connector/consumer.go
@@ -13,9 +13,11 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package standalone
+package connector
import (
+ "context"
+
cloudevents "github.com/cloudevents/sdk-go/v2"
)
@@ -27,7 +29,7 @@ type Consumer interface {
UpdateOffset([]*cloudevents.Event)
- Subscribe(string)
+ Subscribe(context.Context, string)
UnSubscribe(string)
diff --git a/eventmesh-server-go/pkg/connector/lifecycle.go b/eventmesh-server-go/pkg/connector/lifecycle.go
index cfc2b67e..0dd04c6a 100644
--- a/eventmesh-server-go/pkg/connector/lifecycle.go
+++ b/eventmesh-server-go/pkg/connector/lifecycle.go
@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package standalone
+package connector
// LifeCycle defines a lifecycle interface for a OMS related service endpoint,
type LifeCycle interface {
diff --git a/eventmesh-server-go/pkg/connector/listener.go b/eventmesh-server-go/pkg/connector/listener.go
index 056746d2..5dc5ae37 100644
--- a/eventmesh-server-go/pkg/connector/listener.go
+++ b/eventmesh-server-go/pkg/connector/listener.go
@@ -13,13 +13,16 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package standalone
+package connector
import (
cloudevents "github.com/cloudevents/sdk-go/v2"
)
+// AsyncConsumeHandler async handler to consume message
+type AsyncConsumeHandler func(EventMeshAction)
+
// EventListener listener to consume the cloudevents message
type EventListener interface {
- Consume(*cloudevents.Event)
+ Consume(*cloudevents.Event, AsyncConsumeHandler)
}
diff --git a/eventmesh-server-go/pkg/connector/properties.go b/eventmesh-server-go/pkg/connector/properties.go
index 2c4e2ace..86523a1e 100644
--- a/eventmesh-server-go/pkg/connector/properties.go
+++ b/eventmesh-server-go/pkg/connector/properties.go
@@ -13,7 +13,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package standalone
+package connector
// Properties represents a persistent set of properties.
// The Properties can be saved to a stream
diff --git a/eventmesh-server-go/pkg/connector/publisher.go b/eventmesh-server-go/pkg/connector/publisher.go
index 2f96d963..0c18a198 100644
--- a/eventmesh-server-go/pkg/connector/publisher.go
+++ b/eventmesh-server-go/pkg/connector/publisher.go
@@ -13,11 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package standalone
+package connector
import (
- cloudevents "github.com/cloudevents/sdk-go/v2"
"time"
+
+ cloudevents "github.com/cloudevents/sdk-go/v2"
)
type SendResult struct {
diff --git a/eventmesh-server-go/pkg/connector/standalone/consumer.go b/eventmesh-server-go/pkg/connector/standalone/consumer.go
new file mode 100644
index 00000000..5dedab8b
--- /dev/null
+++ b/eventmesh-server-go/pkg/connector/standalone/consumer.go
@@ -0,0 +1,104 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package standalone
+
+import (
+ "context"
+ "sync"
+
+ cloudevents "github.com/cloudevents/sdk-go/v2"
+ "go.uber.org/atomic"
+
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/connector"
+)
+
+// defaultPoolsize pool to process message
+var defaultPoolsize = 10
+
+// Consumer standalone comsumer message
+type Consumer struct {
+ broker *Broker
+ listener connector.EventListener
+ isStarted *atomic.Bool
+
+ // subscrbes map to store the SubscribeProcessor
+ // key is topicName ,value is SubscribeProcessor
+ subscribes *sync.Map
+}
+
+// NewConsumer create new standalone consumer
+func NewConsumer(ctx context.Context) (*Consumer, error) {
+ return &Consumer{
+ broker: NewBroker(ctx),
+ listener: nil,
+ isStarted: atomic.NewBool(false),
+ subscribes: new(sync.Map),
+ }, nil
+}
+
+func (c *Consumer) Initialize(*connector.Properties) error {
+ return nil
+}
+
+func (c *Consumer) UpdateOffset(events []*cloudevents.Event) {
+ for _, event := range events {
+ c.broker.UpdateOffset(event.Subject(), event.Extensions()[ExtensionOffset].(int64))
+ }
+}
+
+func (c *Consumer) Subscribe(ctx context.Context, topicName string) {
+ if _, has := c.subscribes.Load(topicName); has {
+ return
+ }
+
+ c.broker.createTopicIfAbsent(topicName)
+ p := NewSubscribeProcessor(ctx, topicName, c.broker, c.listener)
+ c.subscribes.Store(topicName, p)
+}
+
+func (c *Consumer) UnSubscribe(topicName string) {
+ p, has := c.subscribes.Load(topicName)
+ if !has {
+ return
+ }
+ p.(*SubscribeProcessor).Shutdown()
+ c.subscribes.Delete(topicName)
+}
+
+func (c *Consumer) RegisterEventListener(lis connector.EventListener) {
+ c.listener = lis
+}
+
+func (c *Consumer) IsStarted() bool {
+ return c.isStarted.Load()
+}
+
+func (c *Consumer) IsClosed() bool {
+ return !c.isStarted.Load()
+}
+
+func (c *Consumer) Start() {
+ c.isStarted.CAS(false, true)
+}
+
+func (c *Consumer) Shutdown() {
+ c.isStarted.CAS(true, false)
+ c.subscribes.Range(func(k, v interface{}) bool {
+ v.(*SubscribeProcessor).Shutdown()
+ return true
+ })
+ c.subscribes = new(sync.Map)
+}
diff --git a/eventmesh-server-go/pkg/connector/standalone/subscribe.go b/eventmesh-server-go/pkg/connector/standalone/subscribe.go
index 5b7af9a7..7cac1f68 100644
--- a/eventmesh-server-go/pkg/connector/standalone/subscribe.go
+++ b/eventmesh-server-go/pkg/connector/standalone/subscribe.go
@@ -14,3 +14,90 @@
// limitations under the License.
package standalone
+
+import (
+ "context"
+ "time"
+
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/connector"
+ "go.uber.org/atomic"
+)
+
+// ExtensionOffset key for extention about offset in cloudevent
+var ExtensionOffset = "offset"
+
+// SubscribeProcessor procese to handle the subscribe message
+type SubscribeProcessor struct {
+ topicName string
+ broker *Broker
+ listner connector.EventListener
+ offset *atomic.Int64
+ ctx context.Context
+ cancel context.CancelFunc
+}
+
+// NewSubscribeProcessor create new topic subscribe processer
+// and start a loop to handle the message
+func NewSubscribeProcessor(ctx context.Context, topicName string, br *Broker, lis connector.EventListener) *SubscribeProcessor {
+ cctx, cancel := context.WithCancel(ctx)
+ s := &SubscribeProcessor{
+ topicName: topicName,
+ broker: br,
+ listner: lis,
+ ctx: cctx,
+ cancel: cancel,
+ }
+ go s.RunLoop()
+ return s
+}
+
+// RunLoop run loop to process message
+func (s *SubscribeProcessor) RunLoop() {
+ log.Infof("process subscribe for topic:%s", s.topicName)
+
+ tick := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-s.ctx.Done():
+ return
+ case <-tick.C:
+ if s.offset == nil {
+ msg, err := s.broker.GetMessage(s.topicName)
+ if err != nil {
+ log.Warnf("get message from broker err:%v, topic:%v", err, s.topicName)
+ break
+ }
+ if msg != nil {
+ if val, ok := msg.Extensions()[ExtensionOffset]; ok {
+ s.offset = atomic.NewInt64(val.(int64))
+ } else {
+ s.offset = atomic.NewInt64(0)
+ }
+ }
+ }
+
+ msg, err := s.broker.GetMessageByOffset(s.topicName, s.offset.Load())
+ if err != nil {
+ log.Warnf("get message by offset from broker err:%v, topic:%v", err, s.topicName)
+ break
+ }
+ s.listner.Consume(msg, func(ac connector.EventMeshAction) {
+ switch ac {
+ case connector.CommitMessage:
+ log.Infof("commit message:%v, topic:%s, offset:%v", msg.ID(), s.topicName, s.offset.Load())
+ case connector.ManualAck:
+ log.Infof("manual message:%v, topic:%s, offset:%v", msg.ID(), s.topicName, s.offset.Load())
+ case connector.ReconsumeLater:
+ log.Infof("ack message:%v, topic:%s, offset:%v", msg.ID(), s.topicName, s.offset.Inc())
+ }
+ })
+ }
+ }
+}
+
+// Shutdown
+func (s *SubscribeProcessor) Shutdown() {
+ log.Infof("shutdown subscribe processor, topic:%s", s.topicName)
+ s.cancel()
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org