You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2022/12/12 14:16:12 UTC

[incubator-eventmesh] 01/01: fix go server standalone connector unit test

This is an automated email from the ASF dual-hosted git repository.

chenzhou pushed a commit to branch fix-go-server-standalone-unit-test
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git

commit 206a5b00aa374e0909df745d2e97b3569b4f307e
Author: horoc <ho...@gmail.com>
AuthorDate: Mon Dec 12 22:12:46 2022 +0800

    fix go server standalone connector unit test
---
 eventmesh-server-go/go.mod                         | 34 +++++++++++------
 eventmesh-server-go/go.sum                         | 13 ++-----
 .../standalone/standalone_connector_test.go        | 44 ++++++++++++++++------
 3 files changed, 58 insertions(+), 33 deletions(-)

diff --git a/eventmesh-server-go/go.mod b/eventmesh-server-go/go.mod
index b4d89ef7c..436f3316f 100644
--- a/eventmesh-server-go/go.mod
+++ b/eventmesh-server-go/go.mod
@@ -18,8 +18,14 @@ module github.com/apache/incubator-eventmesh/eventmesh-server-go
 go 1.18
 
 require (
+	github.com/apache/rocketmq-client-go/v2 v2.1.1
 	github.com/cloudevents/sdk-go/v2 v2.11.0
+	github.com/deckarep/golang-set/v2 v2.1.0
+	github.com/gin-contrib/pprof v1.4.0
+	github.com/gin-gonic/gin v1.8.1
 	github.com/gogf/gf v1.16.9
+	github.com/golang/mock v1.6.0
+	github.com/google/uuid v1.1.2
 	github.com/hashicorp/go-multierror v1.1.1
 	github.com/json-iterator/go v1.1.12
 	github.com/lestrrat-go/strftime v1.0.6
@@ -28,25 +34,26 @@ require (
 	github.com/panjf2000/ants/v2 v2.6.0
 	github.com/pkg/errors v0.9.1
 	github.com/prometheus/client_golang v1.13.0
-	go.uber.org/zap v1.22.0
-	golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
-	google.golang.org/grpc v1.36.1
-	gopkg.in/yaml.v3 v3.0.1
-	github.com/apache/rocketmq-client-go/v2 v2.1.1
-	github.com/deckarep/golang-set/v2 v2.1.0
-	github.com/gin-contrib/pprof v1.4.0
-	github.com/gin-gonic/gin v1.8.1
-	github.com/golang/mock v1.6.0
-	github.com/google/uuid v1.1.2
 	github.com/stretchr/testify v1.8.0
 	github.com/unrolled/secure v1.12.0
 	go.uber.org/atomic v1.7.0
 	go.uber.org/fx v1.18.1
+	go.uber.org/zap v1.22.0
+	golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
+	google.golang.org/grpc v1.36.1
 	google.golang.org/protobuf v1.28.1
+	gopkg.in/yaml.v3 v3.0.1
 )
 
 require (
+	github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 // indirect
+	github.com/beorn7/perks v1.0.1 // indirect
+	github.com/buger/jsonparser v1.1.1 // indirect
+	github.com/cespare/xxhash/v2 v2.1.2 // indirect
+	github.com/davecgh/go-spew v1.1.1 // indirect
+	github.com/emirpasic/gods v1.12.0 // indirect
 	github.com/gin-contrib/sse v0.1.0 // indirect
+	github.com/go-errors/errors v1.0.1 // indirect
 	github.com/go-playground/locales v0.14.0 // indirect
 	github.com/go-playground/universal-translator v0.18.0 // indirect
 	github.com/go-playground/validator/v10 v10.10.0 // indirect
@@ -57,8 +64,10 @@ require (
 	github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
 	github.com/leodido/go-urn v1.2.1 // indirect
 	github.com/mattn/go-isatty v0.0.14 // indirect
+	github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
 	github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
 	github.com/modern-go/reflect2 v1.0.2 // indirect
+	github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
 	github.com/pelletier/go-toml/v2 v2.0.1 // indirect
 	github.com/pmezard/go-difflib v1.0.0 // indirect
 	github.com/prometheus/client_model v0.2.0 // indirect
@@ -75,8 +84,11 @@ require (
 	go.uber.org/multierr v1.6.0 // indirect
 	golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
 	golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
+	golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
 	golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
 	golang.org/x/text v0.3.7 // indirect
-	google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
+	google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
+	gopkg.in/ini.v1 v1.42.0 // indirect
+	gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
 	gopkg.in/yaml.v2 v2.4.0 // indirect
 )
diff --git a/eventmesh-server-go/go.sum b/eventmesh-server-go/go.sum
index 36637c5d5..5f6733049 100644
--- a/eventmesh-server-go/go.sum
+++ b/eventmesh-server-go/go.sum
@@ -32,9 +32,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
 cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
 dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
 github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
-github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
-github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
 github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
 github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
 github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -324,7 +323,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
 github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
 github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
 github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
-github.com/ugorji/go v1.2.7 h1:qYhyWUUd6WbiM+C6JZAUkIJt/1WrjzNHY9+KCIjVqTo=
 github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M=
 github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
 github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
@@ -434,8 +432,6 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
 golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
 golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
 golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
-golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
 golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
 golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
 golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -453,8 +449,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
 golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
-golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
+golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
 golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
 golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -501,8 +497,6 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
 golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
 golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220908164124-27713097b956 h1:XeJjHH1KiLpKGb6lvMiksZ9l0fVUh+AmGcm0nOMEBOY=
-golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
 golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
 golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
 golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -612,7 +606,6 @@ google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfG
 google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
 google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
 google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
-google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
 google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
 google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
 google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
diff --git a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
index 9a099e73d..5e4530a48 100644
--- a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
+++ b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
@@ -17,6 +17,7 @@ package standalone
 
 import (
 	"context"
+	"fmt"
 	"sync"
 	"testing"
 	"time"
@@ -35,18 +36,29 @@ const (
 	pluginName = "standalone"
 )
 
+// MockDecoder standalone connector properties mock decoder
+type MockDecoder struct {
+}
+
+// Decode mock decoder, no-op
+func (m *MockDecoder) Decode(cfg interface{}) error {
+	return nil
+}
+
 func TestProducer_Publish(t *testing.T) {
 	factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+	factory.Setup(pluginName, &MockDecoder{})
 	producer, _ := factory.GetProducer()
 	producer.Start()
 	defer producer.Shutdown()
 
 	var publishSuccess bool
 	var callBackErr error
+	topic := fmt.Sprintf("%s_publish", topicName)
 	callback := connector.SendCallback{
 		OnSuccess: func(result *connector.SendResult) {
 			publishSuccess = true
-			assert.Equal(t, topicName, result.Topic)
+			assert.Equal(t, topic, result.Topic)
 			assert.Equal(t, "1", result.MessageId)
 			assert.Nil(t, result.Err)
 		},
@@ -55,18 +67,20 @@ func TestProducer_Publish(t *testing.T) {
 		},
 	}
 
-	err := producer.Publish(context.Background(), getTestEvent(), &callback)
+	err := producer.Publish(context.Background(), getTestEvent(topic), &callback)
 	assert.Nil(t, err)
 	assert.True(t, publishSuccess)
 	assert.Nil(t, callBackErr)
 
-	exist, err := producer.CheckTopicExist(topicName)
+	exist, err := producer.CheckTopicExist(topic)
 	assert.True(t, exist)
 	assert.Nil(t, err)
 
 }
 func TestConsumer_Subscribe(t *testing.T) {
 	sum := atomic.NewInt64(0)
+	topic := fmt.Sprintf("%s_subscribe", topicName)
+
 	var wg sync.WaitGroup
 	wg.Add(50)
 
@@ -83,17 +97,18 @@ func TestConsumer_Subscribe(t *testing.T) {
 	}
 
 	factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+	factory.Setup(pluginName, &MockDecoder{})
 	consumer, _ := factory.GetConsumer()
 	consumer.Start()
 	consumer.RegisterEventListener(&listener)
-	consumer.Subscribe(topicName)
+	consumer.Subscribe(topic)
 	defer consumer.Shutdown()
 
 	producer, _ := factory.GetProducer()
 	producer.Start()
 	defer producer.Shutdown()
 	for i := 1; i <= 50; i++ {
-		err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
+		err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
 			"val": i,
 		}), getEmptyPublishCallback())
 
@@ -109,6 +124,8 @@ func TestConsumer_Subscribe(t *testing.T) {
 
 func TestConsumer_ManualAck(t *testing.T) {
 	sum := atomic.NewInt64(0)
+	topic := fmt.Sprintf("%s_ack", topicName)
+
 	var wg sync.WaitGroup
 	wg.Add(50)
 
@@ -126,17 +143,18 @@ func TestConsumer_ManualAck(t *testing.T) {
 	}
 
 	factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+	factory.Setup(pluginName, &MockDecoder{})
 	consumer, _ := factory.GetConsumer()
 	consumer.Start()
 	consumer.RegisterEventListener(&listener)
-	consumer.Subscribe(topicName)
+	consumer.Subscribe(topic)
 	defer consumer.Shutdown()
 
 	producer, _ := factory.GetProducer()
 	producer.Start()
 	defer producer.Shutdown()
 	for i := 1; i <= 50; i++ {
-		err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
+		err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
 			"val": i,
 		}), getEmptyPublishCallback())
 
@@ -151,6 +169,7 @@ func TestConsumer_ManualAck(t *testing.T) {
 
 func TestConsumer_UpdateOffset(t *testing.T) {
 	sum := atomic.NewInt64(0)
+	topic := fmt.Sprintf("%s_offset", topicName)
 	ch := make(chan struct{})
 	listener := connector.EventListener{
 		Consume: func(event *ce.Event, commitFunc connector.CommitFunc) error {
@@ -164,20 +183,21 @@ func TestConsumer_UpdateOffset(t *testing.T) {
 	}
 
 	factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+	factory.Setup(pluginName, &MockDecoder{})
 	consumer, _ := factory.GetConsumer()
 	consumer.Start()
 	defer consumer.Shutdown()
 	consumer.RegisterEventListener(&listener)
-	event := getTestEvent()
+	event := getTestEvent(topic)
 	event.SetExtension("offset", "49")
-	consumer.Subscribe(topicName)
+	consumer.Subscribe(topic)
 	consumer.UpdateOffset(context.Background(), []*ce.Event{event})
 
 	producer, _ := factory.GetProducer()
 	producer.Start()
 	defer producer.Shutdown()
 	for i := 1; i <= 50; i++ {
-		err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
+		err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
 			"val": i,
 		}), getEmptyPublishCallback())
 
@@ -196,14 +216,14 @@ func TestConsumer_UpdateOffset(t *testing.T) {
 	}
 }
 
-func getTestEvent() *ce.Event {
+func getTestEvent(topicName string) *ce.Event {
 	event := ce.NewEvent()
 	event.SetID(uuid.New().String())
 	event.SetSubject(topicName)
 	return &event
 }
 
-func getTestEventOfData(data map[string]interface{}) *ce.Event {
+func getTestEventOfData(topicName string, data map[string]interface{}) *ce.Event {
 	event := ce.NewEvent()
 	event.SetID(uuid.New().String())
 	event.SetSubject(topicName)


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org