You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by ch...@apache.org on 2022/12/12 14:16:12 UTC
[incubator-eventmesh] 01/01: fix go server standalone connector unit test
This is an automated email from the ASF dual-hosted git repository.
chenzhou pushed a commit to branch fix-go-server-standalone-unit-test
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
commit 206a5b00aa374e0909df745d2e97b3569b4f307e
Author: horoc <ho...@gmail.com>
AuthorDate: Mon Dec 12 22:12:46 2022 +0800
fix go server standalone connector unit test
---
eventmesh-server-go/go.mod | 34 +++++++++++------
eventmesh-server-go/go.sum | 13 ++-----
.../standalone/standalone_connector_test.go | 44 ++++++++++++++++------
3 files changed, 58 insertions(+), 33 deletions(-)
diff --git a/eventmesh-server-go/go.mod b/eventmesh-server-go/go.mod
index b4d89ef7c..436f3316f 100644
--- a/eventmesh-server-go/go.mod
+++ b/eventmesh-server-go/go.mod
@@ -18,8 +18,14 @@ module github.com/apache/incubator-eventmesh/eventmesh-server-go
go 1.18
require (
+ github.com/apache/rocketmq-client-go/v2 v2.1.1
github.com/cloudevents/sdk-go/v2 v2.11.0
+ github.com/deckarep/golang-set/v2 v2.1.0
+ github.com/gin-contrib/pprof v1.4.0
+ github.com/gin-gonic/gin v1.8.1
github.com/gogf/gf v1.16.9
+ github.com/golang/mock v1.6.0
+ github.com/google/uuid v1.1.2
github.com/hashicorp/go-multierror v1.1.1
github.com/json-iterator/go v1.1.12
github.com/lestrrat-go/strftime v1.0.6
@@ -28,25 +34,26 @@ require (
github.com/panjf2000/ants/v2 v2.6.0
github.com/pkg/errors v0.9.1
github.com/prometheus/client_golang v1.13.0
- go.uber.org/zap v1.22.0
- golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
- google.golang.org/grpc v1.36.1
- gopkg.in/yaml.v3 v3.0.1
- github.com/apache/rocketmq-client-go/v2 v2.1.1
- github.com/deckarep/golang-set/v2 v2.1.0
- github.com/gin-contrib/pprof v1.4.0
- github.com/gin-gonic/gin v1.8.1
- github.com/golang/mock v1.6.0
- github.com/google/uuid v1.1.2
github.com/stretchr/testify v1.8.0
github.com/unrolled/secure v1.12.0
go.uber.org/atomic v1.7.0
go.uber.org/fx v1.18.1
+ go.uber.org/zap v1.22.0
+ golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
+ google.golang.org/grpc v1.36.1
google.golang.org/protobuf v1.28.1
+ gopkg.in/yaml.v3 v3.0.1
)
require (
+ github.com/aliyun/alibaba-cloud-sdk-go v1.61.18 // indirect
+ github.com/beorn7/perks v1.0.1 // indirect
+ github.com/buger/jsonparser v1.1.1 // indirect
+ github.com/cespare/xxhash/v2 v2.1.2 // indirect
+ github.com/davecgh/go-spew v1.1.1 // indirect
+ github.com/emirpasic/gods v1.12.0 // indirect
github.com/gin-contrib/sse v0.1.0 // indirect
+ github.com/go-errors/errors v1.0.1 // indirect
github.com/go-playground/locales v0.14.0 // indirect
github.com/go-playground/universal-translator v0.18.0 // indirect
github.com/go-playground/validator/v10 v10.10.0 // indirect
@@ -57,8 +64,10 @@ require (
github.com/konsorten/go-windows-terminal-sequences v1.0.3 // indirect
github.com/leodido/go-urn v1.2.1 // indirect
github.com/mattn/go-isatty v0.0.14 // indirect
+ github.com/matttproud/golang_protobuf_extensions v1.0.1 // indirect
github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect
github.com/modern-go/reflect2 v1.0.2 // indirect
+ github.com/patrickmn/go-cache v2.1.0+incompatible // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/prometheus/client_model v0.2.0 // indirect
@@ -75,8 +84,11 @@ require (
go.uber.org/multierr v1.6.0 // indirect
golang.org/x/crypto v0.0.0-20210711020723-a769d52b0f97 // indirect
golang.org/x/net v0.0.0-20220225172249-27dd8689420f // indirect
+ golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f // indirect
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f // indirect
golang.org/x/text v0.3.7 // indirect
- google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 // indirect
+ google.golang.org/genproto v0.0.0-20200825200019-8632dd797987 // indirect
+ gopkg.in/ini.v1 v1.42.0 // indirect
+ gopkg.in/natefinch/lumberjack.v2 v2.0.0 // indirect
gopkg.in/yaml.v2 v2.4.0 // indirect
)
diff --git a/eventmesh-server-go/go.sum b/eventmesh-server-go/go.sum
index 36637c5d5..5f6733049 100644
--- a/eventmesh-server-go/go.sum
+++ b/eventmesh-server-go/go.sum
@@ -32,9 +32,8 @@ cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RX
cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0=
dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU=
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
+github.com/BurntSushi/toml v1.1.0 h1:ksErzDEI1khOiGPgpwuI7x2ebx/uXQNw7xJpn9Eq1+I=
github.com/BurntSushi/toml v1.1.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
-github.com/BurntSushi/toml v1.2.0 h1:Rt8g24XnyGTyglgET/PRUNlrUeu9F5L+7FilkXfZgs0=
-github.com/BurntSushi/toml v1.2.0/go.mod h1:CxXYINrC8qIiEnFrOxCa7Jy5BFHlXnUU2pbicEuybxQ=
github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo=
github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc=
@@ -324,7 +323,6 @@ github.com/tidwall/match v1.1.1 h1:+Ho715JplO36QYgwN9PGYNhgZvoUSc9X2c80KVTi+GA=
github.com/tidwall/match v1.1.1/go.mod h1:eRSPERbgtNPcGhD8UCthc6PmLEQXEWd3PRB5JTxsfmM=
github.com/tidwall/pretty v1.2.0 h1:RWIZEg2iJ8/g6fDDYzMpobmaoGh5OLl4AXtGUGPcqCs=
github.com/tidwall/pretty v1.2.0/go.mod h1:ITEVvHYasfjBbM0u2Pg8T2nJnzm8xPwvNhhsoaGGjNU=
-github.com/ugorji/go v1.2.7 h1:qYhyWUUd6WbiM+C6JZAUkIJt/1WrjzNHY9+KCIjVqTo=
github.com/ugorji/go v1.2.7/go.mod h1:nF9osbDWLy6bDVv/Rtoh6QgnvNDpmCalQV5urGCCS6M=
github.com/ugorji/go/codec v1.2.7 h1:YPXUKf7fYbp/y8xloBqZOw2qaVggbfwMlI8WM3wZUJ0=
github.com/ugorji/go/codec v1.2.7/go.mod h1:WGN1fab3R1fzQlVQTkfxVtIBhWDRqOviHU95kRgeqEY=
@@ -434,8 +432,6 @@ golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20220127200216-cd36cc0744dd/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f h1:oA4XRj0qtSt8Yo1Zms0CUlsT3KG69V2UGQWPBxujDmc=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
-golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
-golang.org/x/net v0.0.0-20220722155237-a158d28d115b/go.mod h1:XRhObCWvk6IyKnWLug+ECip1KBveYUHfp+8e9klMJ9c=
golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U=
golang.org/x/oauth2 v0.0.0-20190226205417-e64efc72b421/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45/go.mod h1:gOpvHmFTYa4IltrdGE7lF6nIHvwfUNPOp7c8zoXwtLw=
@@ -453,8 +449,8 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
-golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4 h1:uVc8UZUe6tr40fFVnUP5Oj+veunVezqYl9z7DYw9xzw=
-golang.org/x/sync v0.0.0-20220722155255-886fb9371eb4/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
+golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f h1:Ax0t5p6N38Ga0dThY21weqDEyz2oklo4IvDkpigvkD8=
+golang.org/x/sync v0.0.0-20220601150217-0de741cfad7f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20181116152217-5ac8a444bdc5/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
@@ -501,8 +497,6 @@ golang.org/x/sys v0.0.0-20211216021012-1d35b9e2eb4e/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220114195835-da31bd327af9/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
-golang.org/x/sys v0.0.0-20220908164124-27713097b956 h1:XeJjHH1KiLpKGb6lvMiksZ9l0fVUh+AmGcm0nOMEBOY=
-golang.org/x/sys v0.0.0-20220908164124-27713097b956/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo=
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8=
golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
@@ -612,7 +606,6 @@ google.golang.org/genproto v0.0.0-20200331122359-1ee6d9798940/go.mod h1:55QSHmfG
google.golang.org/genproto v0.0.0-20200430143042-b979b6f78d84/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200511104702-f5ebc3bea380/go.mod h1:55QSHmfGQM9UVYDPBsyGGes0y52j32PQ3BqQfXhyH3c=
google.golang.org/genproto v0.0.0-20200515170657-fc4c6c6a6587/go.mod h1:YsZOwe1myG/8QRHRsmBRE1LrgQY60beZKjly0O1fX9U=
-google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013 h1:+kGHl1aib/qcwaRi1CbqBZ1rk19r85MNUf8HaBghugY=
google.golang.org/genproto v0.0.0-20200526211855-cb27e3aa2013/go.mod h1:NbSheEEYHJ7i3ixzK3sjbqSGDJWnxyFXZblF3eUsNvo=
google.golang.org/genproto v0.0.0-20200618031413-b414f8b61790/go.mod h1:jDfRM7FcilCzHH/e9qn6dsT145K34l5v+OpcnNgKAAA=
google.golang.org/genproto v0.0.0-20200729003335-053ba62fc06f/go.mod h1:FWY/as6DDZQgahTzZj3fqbO1CbirC29ZNUFHwi0/+no=
diff --git a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
index 9a099e73d..5e4530a48 100644
--- a/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
+++ b/eventmesh-server-go/plugin/connector/standalone/standalone_connector_test.go
@@ -17,6 +17,7 @@ package standalone
import (
"context"
+ "fmt"
"sync"
"testing"
"time"
@@ -35,18 +36,29 @@ const (
pluginName = "standalone"
)
+// MockDecoder standalone connector properties mock decoder
+type MockDecoder struct {
+}
+
+// Decode mock decoder, no-op
+func (m *MockDecoder) Decode(cfg interface{}) error {
+ return nil
+}
+
func TestProducer_Publish(t *testing.T) {
factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+ factory.Setup(pluginName, &MockDecoder{})
producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
var publishSuccess bool
var callBackErr error
+ topic := fmt.Sprintf("%s_publish", topicName)
callback := connector.SendCallback{
OnSuccess: func(result *connector.SendResult) {
publishSuccess = true
- assert.Equal(t, topicName, result.Topic)
+ assert.Equal(t, topic, result.Topic)
assert.Equal(t, "1", result.MessageId)
assert.Nil(t, result.Err)
},
@@ -55,18 +67,20 @@ func TestProducer_Publish(t *testing.T) {
},
}
- err := producer.Publish(context.Background(), getTestEvent(), &callback)
+ err := producer.Publish(context.Background(), getTestEvent(topic), &callback)
assert.Nil(t, err)
assert.True(t, publishSuccess)
assert.Nil(t, callBackErr)
- exist, err := producer.CheckTopicExist(topicName)
+ exist, err := producer.CheckTopicExist(topic)
assert.True(t, exist)
assert.Nil(t, err)
}
func TestConsumer_Subscribe(t *testing.T) {
sum := atomic.NewInt64(0)
+ topic := fmt.Sprintf("%s_subscribe", topicName)
+
var wg sync.WaitGroup
wg.Add(50)
@@ -83,17 +97,18 @@ func TestConsumer_Subscribe(t *testing.T) {
}
factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+ factory.Setup(pluginName, &MockDecoder{})
consumer, _ := factory.GetConsumer()
consumer.Start()
consumer.RegisterEventListener(&listener)
- consumer.Subscribe(topicName)
+ consumer.Subscribe(topic)
defer consumer.Shutdown()
producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
for i := 1; i <= 50; i++ {
- err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
+ err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
"val": i,
}), getEmptyPublishCallback())
@@ -109,6 +124,8 @@ func TestConsumer_Subscribe(t *testing.T) {
func TestConsumer_ManualAck(t *testing.T) {
sum := atomic.NewInt64(0)
+ topic := fmt.Sprintf("%s_ack", topicName)
+
var wg sync.WaitGroup
wg.Add(50)
@@ -126,17 +143,18 @@ func TestConsumer_ManualAck(t *testing.T) {
}
factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+ factory.Setup(pluginName, &MockDecoder{})
consumer, _ := factory.GetConsumer()
consumer.Start()
consumer.RegisterEventListener(&listener)
- consumer.Subscribe(topicName)
+ consumer.Subscribe(topic)
defer consumer.Shutdown()
producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
for i := 1; i <= 50; i++ {
- err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
+ err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
"val": i,
}), getEmptyPublishCallback())
@@ -151,6 +169,7 @@ func TestConsumer_ManualAck(t *testing.T) {
func TestConsumer_UpdateOffset(t *testing.T) {
sum := atomic.NewInt64(0)
+ topic := fmt.Sprintf("%s_offset", topicName)
ch := make(chan struct{})
listener := connector.EventListener{
Consume: func(event *ce.Event, commitFunc connector.CommitFunc) error {
@@ -164,20 +183,21 @@ func TestConsumer_UpdateOffset(t *testing.T) {
}
factory := plugin.Get(connector.PluginType, pluginName).(connector.Factory)
+ factory.Setup(pluginName, &MockDecoder{})
consumer, _ := factory.GetConsumer()
consumer.Start()
defer consumer.Shutdown()
consumer.RegisterEventListener(&listener)
- event := getTestEvent()
+ event := getTestEvent(topic)
event.SetExtension("offset", "49")
- consumer.Subscribe(topicName)
+ consumer.Subscribe(topic)
consumer.UpdateOffset(context.Background(), []*ce.Event{event})
producer, _ := factory.GetProducer()
producer.Start()
defer producer.Shutdown()
for i := 1; i <= 50; i++ {
- err := producer.Publish(context.Background(), getTestEventOfData(map[string]interface{}{
+ err := producer.Publish(context.Background(), getTestEventOfData(topic, map[string]interface{}{
"val": i,
}), getEmptyPublishCallback())
@@ -196,14 +216,14 @@ func TestConsumer_UpdateOffset(t *testing.T) {
}
}
-func getTestEvent() *ce.Event {
+func getTestEvent(topicName string) *ce.Event {
event := ce.NewEvent()
event.SetID(uuid.New().String())
event.SetSubject(topicName)
return &event
}
-func getTestEventOfData(data map[string]interface{}) *ce.Event {
+func getTestEventOfData(topicName string, data map[string]interface{}) *ce.Event {
event := ce.NewEvent()
event.SetID(uuid.New().String())
event.SetSubject(topicName)
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org