You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/09/20 14:18:12 UTC
[incubator-eventmesh] branch eventmesh-server-go updated: combine connector api
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
new 7608164a combine connector api
new 510a6ed2 Merge pull request #1352 from horoc/combine-connector-api
7608164a is described below
commit 7608164a5f494a3eafacfccef2b21d9737b54f95
Author: horoc <ho...@gmail.com>
AuthorDate: Tue Sep 20 20:40:51 2022 +0800
combine connector api
---
eventmesh-server-go/plugin/connector/connector.go | 38 +++++++--------
.../plugin/connector/rocketmq/factory.go | 54 ++++------------------
.../plugin/connector/standalone/consumer.go | 17 -------
.../plugin/connector/standalone/factory.go | 53 ++++-----------------
.../standalone/standalone_connector_test.go | 30 ++++++------
.../runtime/core/wrapper/consumer.go | 4 +-
.../runtime/core/wrapper/consumer_test.go | 8 ++--
.../runtime/core/wrapper/producer.go | 4 +-
.../runtime/core/wrapper/producer_test.go | 4 +-
9 files changed, 61 insertions(+), 151 deletions(-)
diff --git a/eventmesh-server-go/plugin/connector/connector.go b/eventmesh-server-go/plugin/connector/connector.go
index 11c40e18..46d3b533 100644
--- a/eventmesh-server-go/plugin/connector/connector.go
+++ b/eventmesh-server-go/plugin/connector/connector.go
@@ -25,11 +25,18 @@ import (
)
const (
- ConsumerPluginType = "connector.consumer"
- ProducerPluginType = "connector.producer"
- ResourcePluginType = "connector.resource"
+ PluginType = "connector"
)
+// Factory plugin factory of consumer/producer/resource
+type Factory interface {
+ plugin.Plugin
+
+ GetConsumer() (Consumer, error)
+ GetProducer() (Producer, error)
+ GetResource() (Resource, error)
+}
+
// EventMeshAction commit action of message consume
type EventMeshAction uint
@@ -40,7 +47,7 @@ const (
)
// Consumer interface of consumer
-// all the consumers implement this interface should implement a corresponding consumer factory and do plugin registration first.
+// all the consumers implement this interface should implement a corresponding factory and do plugin registration first.
type Consumer interface {
LifeCycle
@@ -51,13 +58,8 @@ type Consumer interface {
RegisterEventListener(listener *EventListener)
}
-type ConsumerFactory interface {
- plugin.Plugin
- Get() (Consumer, error)
-}
-
// Producer interface of producer
-// all the producers implement this interface should implement a corresponding producer factory and do plugin registration first.
+// all the producers implement this interface should implement a corresponding factory and do plugin registration first.
type Producer interface {
LifeCycle
@@ -70,23 +72,13 @@ type Producer interface {
SetExtFields() error
}
-type ProducerFactory interface {
- plugin.Plugin
- Get() (Producer, error)
-}
-
// Resource interface of resource service
-// all the resources implement this interface should implement a corresponding resource factory and do plugin registration first.
+// all the resources implement this interface should implement a corresponding factory and do plugin registration first.
type Resource interface {
Init() error
Release() error
}
-type ResourceFactory interface {
- plugin.Plugin
- Get() (Resource, error)
-}
-
// LifeCycle general life cycle interface for all connectors
type LifeCycle interface {
IsStarted() bool
@@ -122,5 +114,9 @@ type ErrorResult struct {
type EventListener struct {
Consume ConsumeFunc
}
+
+// CommitFunc user can commit message through this function in the consuming logic
type CommitFunc func(action EventMeshAction) error
+
+// ConsumeFunc custom message consuming logic
type ConsumeFunc func(event *ce.Event, commitFunc CommitFunc) error
diff --git a/eventmesh-server-go/plugin/connector/rocketmq/factory.go b/eventmesh-server-go/plugin/connector/rocketmq/factory.go
index 9208b495..05d6de15 100644
--- a/eventmesh-server-go/plugin/connector/rocketmq/factory.go
+++ b/eventmesh-server-go/plugin/connector/rocketmq/factory.go
@@ -17,26 +17,25 @@ package rocketmq
import (
"errors"
+
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
)
func init() {
- plugin.Register("rocketmq", &ProducerFactory{})
- plugin.Register("rocketmq", &ConsumerFactory{})
- plugin.Register("rocketmq", &ResourceFactory{})
+ plugin.Register("rocketmq", &Factory{})
}
-type ProducerFactory struct {
+type Factory struct {
plugin.Plugin
properties map[string]string
}
-func (f *ProducerFactory) Type() string {
- return connector.ProducerPluginType
+func (f *Factory) Type() string {
+ return connector.PluginType
}
-func (f *ProducerFactory) Setup(name string, dec plugin.Decoder) error {
+func (f *Factory) Setup(name string, dec plugin.Decoder) error {
if dec == nil {
return errors.New(" producer config decoder empty")
}
@@ -48,7 +47,7 @@ func (f *ProducerFactory) Setup(name string, dec plugin.Decoder) error {
return nil
}
-func (f *ProducerFactory) Get() (connector.Producer, error) {
+func (f *Factory) GetProducer() (connector.Producer, error) {
producer := NewProducer()
err := producer.InitProducer(f.properties)
if err != nil {
@@ -61,30 +60,9 @@ func (f *ProducerFactory) Get() (connector.Producer, error) {
return producer, nil
}
-type ConsumerFactory struct {
- plugin.Plugin
- properties map[string]string
-}
-
-func (c *ConsumerFactory) Type() string {
- return connector.ConsumerPluginType
-}
-
-func (c *ConsumerFactory) Setup(name string, dec plugin.Decoder) error {
- if dec == nil {
- return errors.New(" producer config decoder empty")
- }
- properties := make(map[string]string)
- if err := dec.Decode(properties); err != nil {
- return err
- }
- c.properties = properties
- return nil
-}
-
-func (c *ConsumerFactory) Get() (connector.Consumer, error) {
+func (f *Factory) GetConsumer() (connector.Consumer, error) {
consumer := NewConsumer()
- err := consumer.InitConsumer(c.properties)
+ err := consumer.InitConsumer(f.properties)
if err != nil {
return nil, err
}
@@ -95,18 +73,6 @@ func (c *ConsumerFactory) Get() (connector.Consumer, error) {
return consumer, nil
}
-type ResourceFactory struct {
- plugin.Plugin
-}
-
-func (f *ResourceFactory) Type() string {
- return connector.ResourcePluginType
-}
-
-func (f *ResourceFactory) Setup(name string, dec plugin.Decoder) error {
- return nil
-}
-
-func (f *ResourceFactory) Get() (connector.Resource, error) {
+func (f *Factory) GetResource() (connector.Resource, error) {
return &Resource{}, nil
}
diff --git a/eventmesh-server-go/plugin/connector/standalone/consumer.go b/eventmesh-server-go/plugin/connector/standalone/consumer.go
index b644c165..b2d8cba9 100644
--- a/eventmesh-server-go/plugin/connector/standalone/consumer.go
+++ b/eventmesh-server-go/plugin/connector/standalone/consumer.go
@@ -25,7 +25,6 @@ import (
"go.uber.org/atomic"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
)
@@ -105,22 +104,6 @@ func (c *Consumer) RegisterEventListener(listener *connector.EventListener) {
c.listener = *listener
}
-func (c *Consumer) Type() string {
- return connector.ConsumerPluginType
-}
-
-func (c *Consumer) Setup(name string, dec plugin.Decoder) error {
- if dec == nil {
- return errors.New("selector config decoder empty")
- }
- properties := make(map[string]string)
- if err := dec.Decode(properties); err != nil {
- return err
- }
- c.InitConsumer(properties)
- return nil
-}
-
func (c *Consumer) InitConsumer(properties map[string]string) error {
// No-Op for standalone connector
return nil
diff --git a/eventmesh-server-go/plugin/connector/standalone/factory.go b/eventmesh-server-go/plugin/connector/standalone/factory.go
index 73499a04..6d466516 100644
--- a/eventmesh-server-go/plugin/connector/standalone/factory.go
+++ b/eventmesh-server-go/plugin/connector/standalone/factory.go
@@ -23,23 +23,21 @@ import (
)
func init() {
- plugin.Register("standalone", &ConsumerFactory{})
- plugin.Register("standalone", &ProducerFactory{})
- plugin.Register("standalone", &ResourceFactory{})
+ plugin.Register("standalone", &Factory{})
}
-type ConsumerFactory struct {
+type Factory struct {
plugin.Plugin
properties map[string]string
}
-func (f *ConsumerFactory) Type() string {
- return connector.ConsumerPluginType
+func (f *Factory) Type() string {
+ return connector.PluginType
}
-func (f *ConsumerFactory) Setup(name string, dec plugin.Decoder) error {
+func (f *Factory) Setup(name string, dec plugin.Decoder) error {
if dec == nil {
- return errors.New("consumer config decoder empty")
+ return errors.New("standalone config decoder empty")
}
properties := make(map[string]string)
if err := dec.Decode(properties); err != nil {
@@ -49,53 +47,20 @@ func (f *ConsumerFactory) Setup(name string, dec plugin.Decoder) error {
return nil
}
-func (f *ConsumerFactory) Get() (connector.Consumer, error) {
+func (f *Factory) GetConsumer() (connector.Consumer, error) {
consumer := NewConsumer()
consumer.InitConsumer(f.properties)
consumer.Start()
return consumer, nil
}
-type ProducerFactory struct {
- plugin.Plugin
- properties map[string]string
-}
-
-func (f *ProducerFactory) Type() string {
- return connector.ProducerPluginType
-}
-
-func (f *ProducerFactory) Setup(name string, dec plugin.Decoder) error {
- if dec == nil {
- return errors.New(" producer config decoder empty")
- }
- properties := make(map[string]string)
- if err := dec.Decode(properties); err != nil {
- return err
- }
- f.properties = properties
- return nil
-}
-
-func (f *ProducerFactory) Get() (connector.Producer, error) {
+func (f *Factory) GetProducer() (connector.Producer, error) {
producer := NewProducer()
producer.InitProducer(f.properties)
producer.Start()
return producer, nil
}
-type ResourceFactory struct {
- plugin.Plugin
-}
-
-func (f *ResourceFactory) Type() string {
- return connector.ResourcePluginType
-}
-
-func (f *ResourceFactory) Setup(name string, dec plugin.Decoder) error {
- return nil
-}
-
-func (f *ResourceFactory) Get() (connector.Resource, error) {
+func (f *Factory) GetResource() (connector.Resource, error) {
return &Resource{}, nil
}
diff --git a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
index 4b61981a..9c6d8e9b 100644
--- a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
+++ b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
@@ -17,15 +17,17 @@ package standalone
import (
"context"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+ "sync"
+ "testing"
+ "time"
+
ce "github.com/cloudevents/sdk-go/v2"
"github.com/google/uuid"
"github.com/stretchr/testify/assert"
"go.uber.org/atomic"
- "sync"
- "testing"
- "time"
+
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
)
const (
@@ -34,8 +36,8 @@ const (
)
func TestProducer_Publish(t *testing.T) {
- factory := plugin.Get(connector.ProducerPluginType, pluginName).(connector.ProducerFactory)
- producer, _ := factory.Get()
+ factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+ producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
@@ -80,15 +82,14 @@ func TestConsumer_Subscribe(t *testing.T) {
},
}
- consumerFactory := plugin.Get(connector.ConsumerPluginType, pluginName).(connector.ConsumerFactory)
- consumer, _ := consumerFactory.Get()
+ factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+ consumer, _ := factory.GetConsumer()
consumer.Start()
consumer.RegisterEventListener(&listener)
consumer.Subscribe(topicName)
defer consumer.Shutdown()
- producerFactory := plugin.Get(connector.ProducerPluginType, pluginName).(connector.ProducerFactory)
- producer, _ := producerFactory.Get()
+ producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
for i := 1; i <= 50; i++ {
@@ -120,8 +121,8 @@ func TestConsumer_UpdateOffset(t *testing.T) {
},
}
- consumerFactory := plugin.Get(connector.ConsumerPluginType, pluginName).(connector.ConsumerFactory)
- consumer, _ := consumerFactory.Get()
+ factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+ consumer, _ := factory.GetConsumer()
consumer.Start()
defer consumer.Shutdown()
consumer.RegisterEventListener(&listener)
@@ -130,8 +131,7 @@ func TestConsumer_UpdateOffset(t *testing.T) {
consumer.Subscribe(topicName)
consumer.UpdateOffset(context.Background(), []*ce.Event{event})
- producerFactory := plugin.Get(connector.ProducerPluginType, pluginName).(connector.ProducerFactory)
- producer, _ := producerFactory.Get()
+ producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
for i := 1; i <= 50; i++ {
diff --git a/eventmesh-server-go/runtime/core/wrapper/consumer.go b/eventmesh-server-go/runtime/core/wrapper/consumer.go
index 5c13ec96..4b2ee2a7 100644
--- a/eventmesh-server-go/runtime/core/wrapper/consumer.go
+++ b/eventmesh-server-go/runtime/core/wrapper/consumer.go
@@ -38,8 +38,8 @@ func NewConsumer() (*Consumer, error) {
if !ok {
return nil, ErrNoConnectorName
}
- factory := plugin.Get(connector.ConsumerPluginType, connectorPluginName.Value).(connector.ConsumerFactory)
- consu, err := factory.Get()
+ factory := plugin.Get(connector.PluginType, connectorPluginName.Value).(connector.Factory)
+ consu, err := factory.GetConsumer()
if err != nil {
return nil, err
}
diff --git a/eventmesh-server-go/runtime/core/wrapper/consumer_test.go b/eventmesh-server-go/runtime/core/wrapper/consumer_test.go
index eba179ed..4c701972 100644
--- a/eventmesh-server-go/runtime/core/wrapper/consumer_test.go
+++ b/eventmesh-server-go/runtime/core/wrapper/consumer_test.go
@@ -32,8 +32,8 @@ func TestConsumer_Subscribe(t *testing.T) {
type args struct {
topicName string
}
- factory := plugin.Get(connector.ConsumerPluginType, "standalone").(connector.ConsumerFactory)
- consu, err := factory.Get()
+ factory := plugin.Get(connector.PluginType, "standalone").(connector.Factory)
+ consu, err := factory.GetConsumer()
assert.NoError(t, err)
tests := []struct {
name string
@@ -73,8 +73,8 @@ func TestConsumer_UnSubscribe(t *testing.T) {
type args struct {
topicName string
}
- factory := plugin.Get(connector.ConsumerPluginType, "standalone").(connector.ConsumerFactory)
- consu, err := factory.Get()
+ factory := plugin.Get(connector.PluginType, "standalone").(connector.Factory)
+ consu, err := factory.GetConsumer()
assert.NoError(t, err)
tests := []struct {
name string
diff --git a/eventmesh-server-go/runtime/core/wrapper/producer.go b/eventmesh-server-go/runtime/core/wrapper/producer.go
index 1dfbe51d..a1fd5831 100644
--- a/eventmesh-server-go/runtime/core/wrapper/producer.go
+++ b/eventmesh-server-go/runtime/core/wrapper/producer.go
@@ -47,8 +47,8 @@ func NewProducer() (*Producer, error) {
return nil, ErrNoConnectorName
}
log.Infof("init producer with connector name:%s", connectorPluginName)
- factory := plugin.Get(connector.ProducerPluginType, connectorPluginName.Value).(connector.ProducerFactory)
- consu, err := factory.Get()
+ factory := plugin.Get(connector.PluginType, connectorPluginName.Value).(connector.Factory)
+ consu, err := factory.GetProducer()
if err != nil {
return nil, err
}
diff --git a/eventmesh-server-go/runtime/core/wrapper/producer_test.go b/eventmesh-server-go/runtime/core/wrapper/producer_test.go
index 75146b57..4af64143 100644
--- a/eventmesh-server-go/runtime/core/wrapper/producer_test.go
+++ b/eventmesh-server-go/runtime/core/wrapper/producer_test.go
@@ -35,8 +35,8 @@ func TestProducer_Send(t *testing.T) {
event *eventv2.Event
callback *connector.SendCallback
}
- factory := plugin.Get(connector.ProducerPluginType, "standalone").(connector.ProducerFactory)
- produ, err := factory.Get()
+ factory := plugin.Get(connector.PluginType, "standalone").(connector.Factory)
+ produ, err := factory.GetProducer()
assert.NoError(t, err)
tests := []struct {
name string
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org