You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/09/20 14:18:12 UTC

[incubator-eventmesh] branch eventmesh-server-go updated: combine connector api

This is an automated email from the ASF dual-hosted git repository.

walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
     new 7608164a combine connector api
     new 510a6ed2 Merge pull request #1352 from horoc/combine-connector-api
7608164a is described below

commit 7608164a5f494a3eafacfccef2b21d9737b54f95
Author: horoc <ho...@gmail.com>
AuthorDate: Tue Sep 20 20:40:51 2022 +0800

    combine connector api
---
 eventmesh-server-go/plugin/connector/connector.go  | 38 +++++++--------
 .../plugin/connector/rocketmq/factory.go           | 54 ++++------------------
 .../plugin/connector/standalone/consumer.go        | 17 -------
 .../plugin/connector/standalone/factory.go         | 53 ++++-----------------
 .../standalone/standalone_connector_test.go        | 30 ++++++------
 .../runtime/core/wrapper/consumer.go               |  4 +-
 .../runtime/core/wrapper/consumer_test.go          |  8 ++--
 .../runtime/core/wrapper/producer.go               |  4 +-
 .../runtime/core/wrapper/producer_test.go          |  4 +-
 9 files changed, 61 insertions(+), 151 deletions(-)

diff --git a/eventmesh-server-go/plugin/connector/connector.go b/eventmesh-server-go/plugin/connector/connector.go
index 11c40e18..46d3b533 100644
--- a/eventmesh-server-go/plugin/connector/connector.go
+++ b/eventmesh-server-go/plugin/connector/connector.go
@@ -25,11 +25,18 @@ import (
 )
 
 const (
-	ConsumerPluginType = "connector.consumer"
-	ProducerPluginType = "connector.producer"
-	ResourcePluginType = "connector.resource"
+	PluginType = "connector"
 )
 
+// Factory plugin factory of consumer/producer/resource
+type Factory interface {
+	plugin.Plugin
+
+	GetConsumer() (Consumer, error)
+	GetProducer() (Producer, error)
+	GetResource() (Resource, error)
+}
+
 // EventMeshAction commit action of message consume
 type EventMeshAction uint
 
@@ -40,7 +47,7 @@ const (
 )
 
 // Consumer interface of consumer
-// all the consumers implement this interface should implement a corresponding consumer factory and do plugin registration first.
+// all the consumers implement this interface should implement a corresponding factory and do plugin registration first.
 type Consumer interface {
 	LifeCycle
 
@@ -51,13 +58,8 @@ type Consumer interface {
 	RegisterEventListener(listener *EventListener)
 }
 
-type ConsumerFactory interface {
-	plugin.Plugin
-	Get() (Consumer, error)
-}
-
 // Producer interface of producer
-// all the producers implement this interface should implement a corresponding producer factory and do plugin registration first.
+// all the producers implement this interface should implement a corresponding factory and do plugin registration first.
 type Producer interface {
 	LifeCycle
 
@@ -70,23 +72,13 @@ type Producer interface {
 	SetExtFields() error
 }
 
-type ProducerFactory interface {
-	plugin.Plugin
-	Get() (Producer, error)
-}
-
 // Resource interface of resource service
-// all the resources implement this interface should implement a corresponding resource factory and do plugin registration first.
+// all the resources implement this interface should implement a corresponding factory and do plugin registration first.
 type Resource interface {
 	Init() error
 	Release() error
 }
 
-type ResourceFactory interface {
-	plugin.Plugin
-	Get() (Resource, error)
-}
-
 // LifeCycle general life cycle interface for all connectors
 type LifeCycle interface {
 	IsStarted() bool
@@ -122,5 +114,9 @@ type ErrorResult struct {
 type EventListener struct {
 	Consume ConsumeFunc
 }
+
+// CommitFunc user can commit message through this function in the consuming logic
 type CommitFunc func(action EventMeshAction) error
+
+// ConsumeFunc custom message consuming logic
 type ConsumeFunc func(event *ce.Event, commitFunc CommitFunc) error
diff --git a/eventmesh-server-go/plugin/connector/rocketmq/factory.go b/eventmesh-server-go/plugin/connector/rocketmq/factory.go
index 9208b495..05d6de15 100644
--- a/eventmesh-server-go/plugin/connector/rocketmq/factory.go
+++ b/eventmesh-server-go/plugin/connector/rocketmq/factory.go
@@ -17,26 +17,25 @@ package rocketmq
 
 import (
 	"errors"
+
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
 )
 
 func init() {
-	plugin.Register("rocketmq", &ProducerFactory{})
-	plugin.Register("rocketmq", &ConsumerFactory{})
-	plugin.Register("rocketmq", &ResourceFactory{})
+	plugin.Register("rocketmq", &Factory{})
 }
 
-type ProducerFactory struct {
+type Factory struct {
 	plugin.Plugin
 	properties map[string]string
 }
 
-func (f *ProducerFactory) Type() string {
-	return connector.ProducerPluginType
+func (f *Factory) Type() string {
+	return connector.PluginType
 }
 
-func (f *ProducerFactory) Setup(name string, dec plugin.Decoder) error {
+func (f *Factory) Setup(name string, dec plugin.Decoder) error {
 	if dec == nil {
 		return errors.New(" producer config decoder empty")
 	}
@@ -48,7 +47,7 @@ func (f *ProducerFactory) Setup(name string, dec plugin.Decoder) error {
 	return nil
 }
 
-func (f *ProducerFactory) Get() (connector.Producer, error) {
+func (f *Factory) GetProducer() (connector.Producer, error) {
 	producer := NewProducer()
 	err := producer.InitProducer(f.properties)
 	if err != nil {
@@ -61,30 +60,9 @@ func (f *ProducerFactory) Get() (connector.Producer, error) {
 	return producer, nil
 }
 
-type ConsumerFactory struct {
-	plugin.Plugin
-	properties map[string]string
-}
-
-func (c *ConsumerFactory) Type() string {
-	return connector.ConsumerPluginType
-}
-
-func (c *ConsumerFactory) Setup(name string, dec plugin.Decoder) error {
-	if dec == nil {
-		return errors.New(" producer config decoder empty")
-	}
-	properties := make(map[string]string)
-	if err := dec.Decode(properties); err != nil {
-		return err
-	}
-	c.properties = properties
-	return nil
-}
-
-func (c *ConsumerFactory) Get() (connector.Consumer, error) {
+func (f *Factory) GetConsumer() (connector.Consumer, error) {
 	consumer := NewConsumer()
-	err := consumer.InitConsumer(c.properties)
+	err := consumer.InitConsumer(f.properties)
 	if err != nil {
 		return nil, err
 	}
@@ -95,18 +73,6 @@ func (c *ConsumerFactory) Get() (connector.Consumer, error) {
 	return consumer, nil
 }
 
-type ResourceFactory struct {
-	plugin.Plugin
-}
-
-func (f *ResourceFactory) Type() string {
-	return connector.ResourcePluginType
-}
-
-func (f *ResourceFactory) Setup(name string, dec plugin.Decoder) error {
-	return nil
-}
-
-func (f *ResourceFactory) Get() (connector.Resource, error) {
+func (f *Factory) GetResource() (connector.Resource, error) {
 	return &Resource{}, nil
 }
diff --git a/eventmesh-server-go/plugin/connector/standalone/consumer.go b/eventmesh-server-go/plugin/connector/standalone/consumer.go
index b644c165..b2d8cba9 100644
--- a/eventmesh-server-go/plugin/connector/standalone/consumer.go
+++ b/eventmesh-server-go/plugin/connector/standalone/consumer.go
@@ -25,7 +25,6 @@ import (
 	"go.uber.org/atomic"
 
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
 	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
 )
 
@@ -105,22 +104,6 @@ func (c *Consumer) RegisterEventListener(listener *connector.EventListener) {
 	c.listener = *listener
 }
 
-func (c *Consumer) Type() string {
-	return connector.ConsumerPluginType
-}
-
-func (c *Consumer) Setup(name string, dec plugin.Decoder) error {
-	if dec == nil {
-		return errors.New("selector config decoder empty")
-	}
-	properties := make(map[string]string)
-	if err := dec.Decode(properties); err != nil {
-		return err
-	}
-	c.InitConsumer(properties)
-	return nil
-}
-
 func (c *Consumer) InitConsumer(properties map[string]string) error {
 	// No-Op for standalone connector
 	return nil
diff --git a/eventmesh-server-go/plugin/connector/standalone/factory.go b/eventmesh-server-go/plugin/connector/standalone/factory.go
index 73499a04..6d466516 100644
--- a/eventmesh-server-go/plugin/connector/standalone/factory.go
+++ b/eventmesh-server-go/plugin/connector/standalone/factory.go
@@ -23,23 +23,21 @@ import (
 )
 
 func init() {
-	plugin.Register("standalone", &ConsumerFactory{})
-	plugin.Register("standalone", &ProducerFactory{})
-	plugin.Register("standalone", &ResourceFactory{})
+	plugin.Register("standalone", &Factory{})
 }
 
-type ConsumerFactory struct {
+type Factory struct {
 	plugin.Plugin
 	properties map[string]string
 }
 
-func (f *ConsumerFactory) Type() string {
-	return connector.ConsumerPluginType
+func (f *Factory) Type() string {
+	return connector.PluginType
 }
 
-func (f *ConsumerFactory) Setup(name string, dec plugin.Decoder) error {
+func (f *Factory) Setup(name string, dec plugin.Decoder) error {
 	if dec == nil {
-		return errors.New("consumer config decoder empty")
+		return errors.New("standalone config decoder empty")
 	}
 	properties := make(map[string]string)
 	if err := dec.Decode(properties); err != nil {
@@ -49,53 +47,20 @@ func (f *ConsumerFactory) Setup(name string, dec plugin.Decoder) error {
 	return nil
 }
 
-func (f *ConsumerFactory) Get() (connector.Consumer, error) {
+func (f *Factory) GetConsumer() (connector.Consumer, error) {
 	consumer := NewConsumer()
 	consumer.InitConsumer(f.properties)
 	consumer.Start()
 	return consumer, nil
 }
 
-type ProducerFactory struct {
-	plugin.Plugin
-	properties map[string]string
-}
-
-func (f *ProducerFactory) Type() string {
-	return connector.ProducerPluginType
-}
-
-func (f *ProducerFactory) Setup(name string, dec plugin.Decoder) error {
-	if dec == nil {
-		return errors.New(" producer config decoder empty")
-	}
-	properties := make(map[string]string)
-	if err := dec.Decode(properties); err != nil {
-		return err
-	}
-	f.properties = properties
-	return nil
-}
-
-func (f *ProducerFactory) Get() (connector.Producer, error) {
+func (f *Factory) GetProducer() (connector.Producer, error) {
 	producer := NewProducer()
 	producer.InitProducer(f.properties)
 	producer.Start()
 	return producer, nil
 }
 
-type ResourceFactory struct {
-	plugin.Plugin
-}
-
-func (f *ResourceFactory) Type() string {
-	return connector.ResourcePluginType
-}
-
-func (f *ResourceFactory) Setup(name string, dec plugin.Decoder) error {
-	return nil
-}
-
-func (f *ResourceFactory) Get() (connector.Resource, error) {
+func (f *Factory) GetResource() (connector.Resource, error) {
 	return &Resource{}, nil
 }
diff --git a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
index 4b61981a..9c6d8e9b 100644
--- a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
+++ b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
@@ -17,15 +17,17 @@ package standalone
 
 import (
 	"context"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+	"sync"
+	"testing"
+	"time"
+
 	ce "github.com/cloudevents/sdk-go/v2"
 	"github.com/google/uuid"
 	"github.com/stretchr/testify/assert"
 	"go.uber.org/atomic"
-	"sync"
-	"testing"
-	"time"
+
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
 )
 
 const (
@@ -34,8 +36,8 @@ const (
 )
 
 func TestProducer_Publish(t *testing.T) {
-	factory := plugin.Get(connector.ProducerPluginType, pluginName).(connector.ProducerFactory)
-	producer, _ := factory.Get()
+	factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+	producer, _ := factory.GetProducer()
 	producer.Start()
 	defer producer.Shutdown()
 
@@ -80,15 +82,14 @@ func TestConsumer_Subscribe(t *testing.T) {
 		},
 	}
 
-	consumerFactory := plugin.Get(connector.ConsumerPluginType, pluginName).(connector.ConsumerFactory)
-	consumer, _ := consumerFactory.Get()
+	factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+	consumer, _ := factory.GetConsumer()
 	consumer.Start()
 	consumer.RegisterEventListener(&listener)
 	consumer.Subscribe(topicName)
 	defer consumer.Shutdown()
 
-	producerFactory := plugin.Get(connector.ProducerPluginType, pluginName).(connector.ProducerFactory)
-	producer, _ := producerFactory.Get()
+	producer, _ := factory.GetProducer()
 	producer.Start()
 	defer producer.Shutdown()
 	for i := 1; i <= 50; i++ {
@@ -120,8 +121,8 @@ func TestConsumer_UpdateOffset(t *testing.T) {
 		},
 	}
 
-	consumerFactory := plugin.Get(connector.ConsumerPluginType, pluginName).(connector.ConsumerFactory)
-	consumer, _ := consumerFactory.Get()
+	factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+	consumer, _ := factory.GetConsumer()
 	consumer.Start()
 	defer consumer.Shutdown()
 	consumer.RegisterEventListener(&listener)
@@ -130,8 +131,7 @@ func TestConsumer_UpdateOffset(t *testing.T) {
 	consumer.Subscribe(topicName)
 	consumer.UpdateOffset(context.Background(), []*ce.Event{event})
 
-	producerFactory := plugin.Get(connector.ProducerPluginType, pluginName).(connector.ProducerFactory)
-	producer, _ := producerFactory.Get()
+	producer, _ := factory.GetProducer()
 	producer.Start()
 	defer producer.Shutdown()
 	for i := 1; i <= 50; i++ {
diff --git a/eventmesh-server-go/runtime/core/wrapper/consumer.go b/eventmesh-server-go/runtime/core/wrapper/consumer.go
index 5c13ec96..4b2ee2a7 100644
--- a/eventmesh-server-go/runtime/core/wrapper/consumer.go
+++ b/eventmesh-server-go/runtime/core/wrapper/consumer.go
@@ -38,8 +38,8 @@ func NewConsumer() (*Consumer, error) {
 	if !ok {
 		return nil, ErrNoConnectorName
 	}
-	factory := plugin.Get(connector.ConsumerPluginType, connectorPluginName.Value).(connector.ConsumerFactory)
-	consu, err := factory.Get()
+	factory := plugin.Get(connector.PluginType, connectorPluginName.Value).(connector.Factory)
+	consu, err := factory.GetConsumer()
 	if err != nil {
 		return nil, err
 	}
diff --git a/eventmesh-server-go/runtime/core/wrapper/consumer_test.go b/eventmesh-server-go/runtime/core/wrapper/consumer_test.go
index eba179ed..4c701972 100644
--- a/eventmesh-server-go/runtime/core/wrapper/consumer_test.go
+++ b/eventmesh-server-go/runtime/core/wrapper/consumer_test.go
@@ -32,8 +32,8 @@ func TestConsumer_Subscribe(t *testing.T) {
 	type args struct {
 		topicName string
 	}
-	factory := plugin.Get(connector.ConsumerPluginType, "standalone").(connector.ConsumerFactory)
-	consu, err := factory.Get()
+	factory := plugin.Get(connector.PluginType, "standalone").(connector.Factory)
+	consu, err := factory.GetConsumer()
 	assert.NoError(t, err)
 	tests := []struct {
 		name    string
@@ -73,8 +73,8 @@ func TestConsumer_UnSubscribe(t *testing.T) {
 	type args struct {
 		topicName string
 	}
-	factory := plugin.Get(connector.ConsumerPluginType, "standalone").(connector.ConsumerFactory)
-	consu, err := factory.Get()
+	factory := plugin.Get(connector.PluginType, "standalone").(connector.Factory)
+	consu, err := factory.GetConsumer()
 	assert.NoError(t, err)
 	tests := []struct {
 		name    string
diff --git a/eventmesh-server-go/runtime/core/wrapper/producer.go b/eventmesh-server-go/runtime/core/wrapper/producer.go
index 1dfbe51d..a1fd5831 100644
--- a/eventmesh-server-go/runtime/core/wrapper/producer.go
+++ b/eventmesh-server-go/runtime/core/wrapper/producer.go
@@ -47,8 +47,8 @@ func NewProducer() (*Producer, error) {
 		return nil, ErrNoConnectorName
 	}
 	log.Infof("init producer with connector name:%s", connectorPluginName)
-	factory := plugin.Get(connector.ProducerPluginType, connectorPluginName.Value).(connector.ProducerFactory)
-	consu, err := factory.Get()
+	factory := plugin.Get(connector.PluginType, connectorPluginName.Value).(connector.Factory)
+	consu, err := factory.GetProducer()
 	if err != nil {
 		return nil, err
 	}
diff --git a/eventmesh-server-go/runtime/core/wrapper/producer_test.go b/eventmesh-server-go/runtime/core/wrapper/producer_test.go
index 75146b57..4af64143 100644
--- a/eventmesh-server-go/runtime/core/wrapper/producer_test.go
+++ b/eventmesh-server-go/runtime/core/wrapper/producer_test.go
@@ -35,8 +35,8 @@ func TestProducer_Send(t *testing.T) {
 		event    *eventv2.Event
 		callback *connector.SendCallback
 	}
-	factory := plugin.Get(connector.ProducerPluginType, "standalone").(connector.ProducerFactory)
-	produ, err := factory.Get()
+	factory := plugin.Get(connector.PluginType, "standalone").(connector.Factory)
+	produ, err := factory.GetProducer()
 	assert.NoError(t, err)
 	tests := []struct {
 		name    string


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org