You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2023/01/08 09:04:34 UTC
[incubator-eventmesh] branch master updated: fix issue2855
This is an automated email from the ASF dual-hosted git repository.
walleliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new f2d697cf4 fix issue2855
new 23855e66e Merge pull request #2857 from jonyangx/issue2855
f2d697cf4 is described below
commit f2d697cf408f0a9101365af1a6180aa9878ddd14
Author: jonyangx <jo...@gmail.com>
AuthorDate: Sun Jan 8 16:07:35 2023 +0800
fix issue2855
---
eventmesh-sdk-go/common/id/id_snake_test.go | 4 +-
eventmesh-sdk-go/grpc/client.go | 9 +-
eventmesh-sdk-go/grpc/client_test.go | 905 +++++++++++++++-------------
eventmesh-sdk-go/grpc/dispatcher_test.go | 86 +--
eventmesh-sdk-go/grpc/fake_grpcserver.go | 12 +-
eventmesh-sdk-go/grpc/grpc_suite_test.go | 44 ++
eventmesh-sdk-go/grpc/heartbeat_test.go | 92 ++-
7 files changed, 649 insertions(+), 503 deletions(-)
diff --git a/eventmesh-sdk-go/common/id/id_snake_test.go b/eventmesh-sdk-go/common/id/id_snake_test.go
index 22228e96d..54d69365f 100644
--- a/eventmesh-sdk-go/common/id/id_snake_test.go
+++ b/eventmesh-sdk-go/common/id/id_snake_test.go
@@ -70,12 +70,12 @@ var _ = Describe("id_snake test", func() {
flake := NewFlakeWithSonyflake(aSonyflake)
- want := "over the time limit"
+ want := "test error"
var ret string
defer func() {
if err := recover(); err != nil {
ret = err.(error).Error()
- Ω(ret).To(Equal(want))
+ Ω(want).To(Equal(ret))
}
mockPatches.Reset()
}()
diff --git a/eventmesh-sdk-go/grpc/client.go b/eventmesh-sdk-go/grpc/client.go
index 96abf56a4..bb8924d4e 100644
--- a/eventmesh-sdk-go/grpc/client.go
+++ b/eventmesh-sdk-go/grpc/client.go
@@ -174,7 +174,7 @@ func (e *eventMeshGRPCClient) setupContext(ctx context.Context) context.Context
// Close meshclient and free all resources
func (e *eventMeshGRPCClient) Close() error {
- log.Infof("close grpc client")
+ log.Infof("begin close grpc client")
if e.cancel != nil {
e.cancel()
}
@@ -190,8 +190,11 @@ func (e *eventMeshGRPCClient) Close() error {
}
e.eventMeshConsumer = nil
}
- if err := e.grpcConn.Close(); err != nil {
- log.Warnf("err in close conn with err:%v", err)
+
+ if e.grpcConn != nil {
+ if err := e.grpcConn.Close(); err != nil {
+ log.Warnf("err in close conn with err:%v", err)
+ }
}
log.Infof("success close grpc client")
diff --git a/eventmesh-sdk-go/grpc/client_test.go b/eventmesh-sdk-go/grpc/client_test.go
index f3ebf3b8e..fa1c0b16c 100644
--- a/eventmesh-sdk-go/grpc/client_test.go
+++ b/eventmesh-sdk-go/grpc/client_test.go
@@ -19,446 +19,545 @@ import (
"context"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc"
- "testing"
"time"
)
-func Test_newEventMeshGRPCClient(t *testing.T) {
- type args struct {
- cfg *conf.GRPCConfig
- }
- tests := []struct {
- name string
- args args
- want *eventMeshGRPCClient
- wantErr bool
- }{
- {
- name: "host is empty",
- args: args{cfg: &conf.GRPCConfig{
- Host: "",
- }},
- wantErr: true,
- want: nil,
- },
- {
- name: "producer wrong",
- args: args{cfg: &conf.GRPCConfig{
- Host: "1.1.1.1",
- ProducerConfig: conf.ProducerConfig{},
- }},
- wantErr: true,
- want: nil,
- },
- {
- name: "client with send msg",
- args: args{cfg: &conf.GRPCConfig{
- Host: "101.43.84.47",
- Port: 10205,
- ENV: "sendmsgenv",
- Region: "sh",
- IDC: "idc01",
- SYS: "test-system",
- ProtocolType: "grpc",
- ProducerConfig: conf.ProducerConfig{
- ProducerGroup: "test-producer-group",
+type fakeidg struct {
+}
+
+func (f *fakeidg) Next() string {
+ return "fake"
+}
+
+var _ = Describe("client test", func() {
+
+ Context("newEventMeshGRPCClient() test ", func() {
+ type args struct {
+ cfg *conf.GRPCConfig
+ }
+
+ It("host is empty", func() {
+ tests := []struct {
+ name string
+ args args
+ want *eventMeshGRPCClient
+ wantErr bool
+ }{
+ {
+ name: "host is empty",
+ args: args{cfg: &conf.GRPCConfig{
+ Host: "",
+ }},
+ wantErr: true,
+ want: nil,
},
- Username: "user",
- Password: "passwd",
- }},
- want: nil,
- wantErr: false,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- cli, err := newEventMeshGRPCClient(tt.args.cfg)
- if (err != nil) != tt.wantErr {
- t.Errorf("newEventMeshGRPCClient() error = %v, wantErr %v", err, tt.wantErr)
- return
}
- if err == nil {
- assert.NoError(t, cli.Close())
+
+ for _, tt := range tests {
+ cli, err := newEventMeshGRPCClient(tt.args.cfg)
+ if tt.wantErr {
+ Ω(err).To(HaveOccurred())
+ } else {
+ Ω(err).NotTo(HaveOccurred())
+ }
+
+ if cli != nil {
+ Ω(cli.Close()).NotTo(HaveOccurred())
+ }
}
})
- }
-}
-func Test_multiple_set_context(t *testing.T) {
- root := context.Background()
- onec, cancel := context.WithTimeout(root, time.Second*5)
- defer cancel()
- valc := context.WithValue(onec, "test", "got")
+ It("producer wrong", func() {
+ tests := []struct {
+ name string
+ args args
+ want *eventMeshGRPCClient
+ wantErr bool
+ }{
+ {
+ name: "producer wrong",
+ args: args{cfg: &conf.GRPCConfig{
+ Host: "1.1.1.1",
+ ProducerConfig: conf.ProducerConfig{},
+ }},
+ wantErr: false,
+ want: nil,
+ },
+ }
- select {
- case <-valc.Done():
- val := valc.Value("test")
- t.Logf("5 s reached, value in context:%v", val)
- case <-time.After(time.Second * 10):
- t.Logf("ooor, 10s timeout")
- }
+ for _, tt := range tests {
+ cli, err := newEventMeshGRPCClient(tt.args.cfg)
+ if tt.wantErr {
+ Ω(err).To(HaveOccurred())
+ } else {
+ Ω(err).NotTo(HaveOccurred())
-}
+ }
-func Test_eventMeshGRPCClient_Publish(t *testing.T) {
- // run fake server
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go runFakeServer(ctx)
- cli, err := New(&conf.GRPCConfig{
- Host: "127.0.0.1",
- Port: 8086,
- ProducerConfig: conf.ProducerConfig{
- ProducerGroup: "test-publish-group",
- },
- ConsumerConfig: conf.ConsumerConfig{
- Enabled: false,
- },
- })
- assert.NoError(t, err, "create grpc client")
- type args struct {
- ctx context.Context
- msg *proto.SimpleMessage
- opts []grpc.CallOption
- }
- tests := []struct {
- name string
- args args
- want *proto.Response
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "publish msg",
- args: args{
- ctx: context.TODO(),
- msg: &proto.SimpleMessage{
- Header: &proto.RequestHeader{},
- Topic: "test-publish-topic",
- },
- },
- wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
- return true
- },
- },
- {
- name: "publish with timeout",
- args: args{
- ctx: func() context.Context {
- ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
- return ctx
- }(),
- msg: &proto.SimpleMessage{
- Header: &proto.RequestHeader{},
- Topic: "test-timeout-topic",
+ if cli != nil {
+ Ω(cli.Close()).NotTo(HaveOccurred())
+ }
+ }
+ })
+
+ It("client with send msg", func() {
+ tests := []struct {
+ name string
+ args args
+ want *eventMeshGRPCClient
+ wantErr bool
+ }{
+ {
+ name: "client with send msg",
+ args: args{cfg: &conf.GRPCConfig{
+ Host: "101.43.84.47",
+ Port: 10205,
+ ENV: "sendmsgenv",
+ Region: "sh",
+ IDC: "idc01",
+ SYS: "test-system",
+ ProtocolType: "grpc",
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-producer-group",
+ },
+ Username: "user",
+ Password: "passwd",
+ }},
+ want: nil,
+ wantErr: false,
},
- },
- wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
- return true
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- got, err := cli.Publish(tt.args.ctx, tt.args.msg, tt.args.opts...)
- assert.NoError(t, err)
- t.Logf("receive publish response:%v", got.String())
- assert.NoError(t, cli.Close())
+ }
+
+ for _, tt := range tests {
+ cli, err := newEventMeshGRPCClient(tt.args.cfg)
+ if tt.wantErr {
+ Ω(err).To(HaveOccurred())
+ } else {
+ Ω(err).NotTo(HaveOccurred())
+
+ }
+
+ if cli != nil {
+ Ω(cli.Close()).NotTo(HaveOccurred())
+ }
+ }
})
- }
-}
-func Test_eventMeshGRPCClient_RequestReply(t *testing.T) {
- // run fake server
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go runFakeServer(ctx)
- cli, err := New(&conf.GRPCConfig{
- Host: "127.0.0.1",
- Port: 8086,
- ProducerConfig: conf.ProducerConfig{
- ProducerGroup: "test-publish-group",
- },
- ConsumerConfig: conf.ConsumerConfig{
- Enabled: false,
- },
})
- assert.NoError(t, err, "create grpc client")
- type args struct {
- ctx context.Context
- msg *proto.SimpleMessage
- opts []grpc.CallOption
- }
- tests := []struct {
- name string
- args args
- want *proto.SimpleMessage
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "test-request-reply",
- args: args{
- ctx: context.TODO(),
- msg: &proto.SimpleMessage{
- Header: &proto.RequestHeader{},
- Topic: "test-request-reply-topic",
- },
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- got, err := cli.RequestReply(tt.args.ctx, tt.args.msg, tt.args.opts...)
- assert.NoError(t, err)
- t.Logf("receive request reply response:%v", got.String())
- assert.NoError(t, cli.Close())
+
+ Context("multiple_set_context() test ", func() {
+ It("should done", func() {
+ root := context.Background()
+ onec, cancel := context.WithTimeout(root, time.Second*3)
+ defer cancel()
+ val := "test"
+ go func() {
+
+ select {
+ case <-onec.Done():
+ val = "test"
+ case <-time.After(time.Second * 1):
+ val = ""
+ }
+ }()
+
+ time.Sleep(2 * time.Second)
+ Ω(val).To(Equal(""))
+ })
+
+ It("should timeout", func() {
+ root := context.Background()
+ onec, cancel := context.WithTimeout(root, time.Second*1)
+ defer cancel()
+
+ val := "test"
+ go func() {
+
+ select {
+ case <-onec.Done():
+ val = "test"
+ break
+ case <-time.After(time.Second * 3):
+ val = ""
+ }
+ }()
+
+ time.Sleep(2 * time.Second)
+ Ω(val).To(Equal("test"))
})
- }
-}
-func Test_eventMeshGRPCClient_BatchPublish(t *testing.T) {
- // run fake server
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go runFakeServer(ctx)
- cli, err := New(&conf.GRPCConfig{
- Host: "127.0.0.1",
- Port: 8086,
- ProducerConfig: conf.ProducerConfig{
- ProducerGroup: "test-publish-group",
- },
- ConsumerConfig: conf.ConsumerConfig{
- Enabled: false,
- },
})
- assert.NoError(t, err, "create grpc client")
- type args struct {
- ctx context.Context
- msg *proto.BatchMessage
- opts []grpc.CallOption
- }
- tests := []struct {
- name string
- args args
- want *proto.Response
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "test batch publish",
- args: args{
- ctx: context.TODO(),
- msg: &proto.BatchMessage{
- Header: &proto.RequestHeader{},
- ProducerGroup: "fake-batch-group",
- Topic: "fake-batch-topic",
- MessageItem: []*proto.BatchMessage_MessageItem{
- {
- Content: "batch-1",
- Ttl: "1",
- UniqueId: "batch-id",
- SeqNum: "1",
- Tag: "tag",
- Properties: map[string]string{
- "from": "test",
- "type": "batch-msg",
- },
+
+ Context("eventMeshGRPCClient_Publish test ", func() {
+
+ type args struct {
+ ctx context.Context
+ msg *proto.SimpleMessage
+ opts []grpc.CallOption
+ }
+
+ It("publish msg", func() {
+ cli, err := New(&conf.GRPCConfig{
+ Host: "127.0.0.1",
+ Port: 8086,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-publish-group",
+ },
+ ConsumerConfig: conf.ConsumerConfig{
+ Enabled: false,
+ },
+ })
+ Ω(err).NotTo(HaveOccurred())
+
+ tests := []struct {
+ name string
+ args args
+ want *proto.Response
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "publish msg",
+ args: args{
+ ctx: context.TODO(),
+ msg: &proto.SimpleMessage{
+ Header: &proto.RequestHeader{},
+ Topic: "test-publish-topic",
},
- {
- Content: "batch-2",
- Ttl: "2",
- UniqueId: "batch-id",
- SeqNum: "2",
- Tag: "tag",
- Properties: map[string]string{
- "from": "test",
- "type": "batch-msg",
- },
+ },
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ return true
+ },
+ },
+ }
+
+ for _, tt := range tests {
+ _, err := cli.Publish(tt.args.ctx, tt.args.msg, tt.args.opts...)
+ Ω(err).NotTo(HaveOccurred())
+ }
+
+ Ω(cli.Close()).NotTo(HaveOccurred())
+
+ })
+
+ It("publish with timeout", func() {
+ cli, err := New(&conf.GRPCConfig{
+ Host: "127.0.0.1",
+ Port: 8086,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-publish-group",
+ },
+ ConsumerConfig: conf.ConsumerConfig{
+ Enabled: false,
+ },
+ })
+ Ω(err).NotTo(HaveOccurred())
+ tests := []struct {
+ name string
+ args args
+ want *proto.Response
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "publish with timeout",
+ args: args{
+ ctx: func() context.Context {
+ ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
+ return ctx
+ }(),
+ msg: &proto.SimpleMessage{
+ Header: &proto.RequestHeader{},
+ Topic: "test-timeout-topic",
},
},
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ return true
+ },
},
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- got, err := cli.BatchPublish(tt.args.ctx, tt.args.msg, tt.args.opts...)
- assert.NoError(t, err)
- t.Logf("receive request reply response:%v", got.String())
- assert.NoError(t, cli.Close())
+ }
+
+ for _, tt := range tests {
+ _, err := cli.Publish(tt.args.ctx, tt.args.msg, tt.args.opts...)
+ Ω(err).NotTo(HaveOccurred())
+ }
+
+ Ω(cli.Close()).NotTo(HaveOccurred())
+
})
- }
-}
-func Test_eventMeshGRPCClient_webhook_subscribe(t *testing.T) {
- ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
- defer cancel()
- go runWebhookServer(ctx)
- go runFakeServer(ctx)
- cli, err := New(&conf.GRPCConfig{
- Host: "127.0.0.1",
- Port: 8086,
- ProducerConfig: conf.ProducerConfig{
- ProducerGroup: "test-publish-group",
- },
- ConsumerConfig: conf.ConsumerConfig{
- Enabled: true,
- ConsumerGroup: "test-consumer-group-subscribe",
- PoolSize: 5,
- },
- HeartbeatConfig: conf.HeartbeatConfig{
- Period: time.Second * 5,
- Timeout: time.Second * 3,
- },
})
- assert.NoError(t, err, "create grpc client")
- assert.NoError(t, cli.SubscribeWebhook(conf.SubscribeItem{
- SubscribeMode: 1,
- SubscribeType: 1,
- Topic: "topic-1",
- }, "http://localhost:8080/onmessage"))
- time.Sleep(time.Second * 5)
-}
-func Test_eventMeshGRPCClient_Subscribe(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go runFakeServer(ctx)
- cli, err := New(&conf.GRPCConfig{
- Host: "127.0.0.1",
- Port: 8086,
- ProducerConfig: conf.ProducerConfig{
- ProducerGroup: "test-publish-group",
- },
- ConsumerConfig: conf.ConsumerConfig{
- Enabled: true,
- ConsumerGroup: "test-consumer-group-subscribe",
- PoolSize: 5,
- },
- HeartbeatConfig: conf.HeartbeatConfig{
- Period: time.Second * 5,
- Timeout: time.Second * 3,
- },
+ Context("eventMeshGRPCClient_RequestReply test ", func() {
+ type args struct {
+ ctx context.Context
+ msg *proto.SimpleMessage
+ opts []grpc.CallOption
+ }
+
+ It("test-request-reply", func() {
+ cli, err := New(&conf.GRPCConfig{
+ Host: "127.0.0.1",
+ Port: 8086,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-publish-group",
+ },
+ ConsumerConfig: conf.ConsumerConfig{
+ Enabled: false,
+ },
+ })
+ Ω(err).NotTo(HaveOccurred())
+
+ tests := []struct {
+ name string
+ args args
+ want *proto.SimpleMessage
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "test-request-reply",
+ args: args{
+ ctx: context.TODO(),
+ msg: &proto.SimpleMessage{
+ Header: &proto.RequestHeader{},
+ Topic: "test-request-reply-topic",
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ _, err := cli.RequestReply(tt.args.ctx, tt.args.msg, tt.args.opts...)
+ Ω(err).NotTo(HaveOccurred())
+ }
+ Ω(cli.Close()).NotTo(HaveOccurred())
+ })
})
- assert.NoError(t, err, "create grpc client")
- type args struct {
- item conf.SubscribeItem
- handler OnMessage
- }
- tests := []struct {
- name string
- args args
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "subcribe one",
- args: args{
- item: conf.SubscribeItem{
- SubscribeMode: 1,
- SubscribeType: 1,
- Topic: "topic-1",
+
+ Context("eventMeshGRPCClient_RequestReply test ", func() {
+ type args struct {
+ ctx context.Context
+ msg *proto.BatchMessage
+ opts []grpc.CallOption
+ }
+
+ It("test batch publish", func() {
+ cli, err := New(&conf.GRPCConfig{
+ Host: "127.0.0.1",
+ Port: 8086,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-publish-group",
+ },
+ ConsumerConfig: conf.ConsumerConfig{
+ Enabled: false,
},
- handler: func(message *proto.SimpleMessage) interface{} {
- t.Logf("receive subscribe response:%s", message.String())
- return nil
+ })
+ Ω(err).NotTo(HaveOccurred())
+
+ tests := []struct {
+ name string
+ args args
+ want *proto.Response
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "test batch publish",
+ args: args{
+ ctx: context.TODO(),
+ msg: &proto.BatchMessage{
+ Header: &proto.RequestHeader{},
+ ProducerGroup: "fake-batch-group",
+ Topic: "fake-batch-topic",
+ MessageItem: []*proto.BatchMessage_MessageItem{
+ {
+ Content: "batch-1",
+ Ttl: "1",
+ UniqueId: "batch-id",
+ SeqNum: "1",
+ Tag: "tag",
+ Properties: map[string]string{
+ "from": "test",
+ "type": "batch-msg",
+ },
+ },
+ {
+ Content: "batch-2",
+ Ttl: "2",
+ UniqueId: "batch-id",
+ SeqNum: "2",
+ Tag: "tag",
+ Properties: map[string]string{
+ "from": "test",
+ "type": "batch-msg",
+ },
+ },
+ },
+ },
+ },
},
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- err := cli.SubscribeStream(tt.args.item, tt.args.handler)
- assert.NoError(t, err)
- assert.NoError(t, cli.Close())
+ }
+
+ for _, tt := range tests {
+ _, err := cli.BatchPublish(tt.args.ctx, tt.args.msg, tt.args.opts...)
+ Ω(err).NotTo(HaveOccurred())
+
+ }
+ Ω(cli.Close()).NotTo(HaveOccurred())
})
- }
-}
+ })
-func Test_eventMeshGRPCClient_UnSubscribe(t *testing.T) {
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go runFakeServer(ctx)
- cli, err := New(&conf.GRPCConfig{
- Host: "127.0.0.1",
- Port: 8086,
- ProducerConfig: conf.ProducerConfig{
- ProducerGroup: "test-publish-group",
- },
- ConsumerConfig: conf.ConsumerConfig{
- Enabled: true,
- ConsumerGroup: "test-consumer-group-subscribe",
- PoolSize: 5,
- },
- HeartbeatConfig: conf.HeartbeatConfig{
- Period: time.Second * 5,
- Timeout: time.Second * 3,
- },
+ Context("eventMeshGRPCClient_webhook_subscribe test ", func() {
+ It("webhook_subscribe success", func() {
+ cli, err := New(&conf.GRPCConfig{
+ Host: "127.0.0.1",
+ Port: 8086,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-publish-group",
+ },
+ ConsumerConfig: conf.ConsumerConfig{
+ Enabled: true,
+ ConsumerGroup: "test-consumer-group-subscribe",
+ PoolSize: 5,
+ },
+ HeartbeatConfig: conf.HeartbeatConfig{
+ Period: time.Second * 5,
+ Timeout: time.Second * 3,
+ },
+ })
+ Ω(err).NotTo(HaveOccurred())
+ Ω(cli.SubscribeWebhook(conf.SubscribeItem{
+ SubscribeMode: 1,
+ SubscribeType: 1,
+ Topic: "topic-1",
+ }, "http://localhost:8080/onmessage")).NotTo(HaveOccurred())
+ time.Sleep(time.Second * 5)
+ Ω(cli.Close()).NotTo(HaveOccurred())
+ })
})
- assert.NoError(t, err, "create grpc client")
- err = cli.SubscribeStream(conf.SubscribeItem{
- SubscribeMode: 1,
- SubscribeType: 1,
- Topic: "topic-1",
- }, func(message *proto.SimpleMessage) interface{} {
- t.Logf("receive subscribe response:%s", message.String())
- return nil
+
+ Context("eventMeshGRPCClient_Subscribe test ", func() {
+
+ type args struct {
+ item conf.SubscribeItem
+ handler OnMessage
+ }
+
+ It("subcribe one success", func() {
+ cli, err := New(&conf.GRPCConfig{
+ Host: "127.0.0.1",
+ Port: 8086,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-publish-group",
+ },
+ ConsumerConfig: conf.ConsumerConfig{
+ Enabled: true,
+ ConsumerGroup: "test-consumer-group-subscribe",
+ PoolSize: 5,
+ },
+ HeartbeatConfig: conf.HeartbeatConfig{
+ Period: time.Second * 5,
+ Timeout: time.Second * 3,
+ },
+ })
+ Ω(err).NotTo(HaveOccurred())
+
+ tests := []struct {
+ name string
+ args args
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "subcribe one",
+ args: args{
+ item: conf.SubscribeItem{
+ SubscribeMode: 1,
+ SubscribeType: 1,
+ Topic: "topic-1",
+ },
+ handler: func(message *proto.SimpleMessage) interface{} {
+ return nil
+ },
+ },
+ },
+ }
+ for _, tt := range tests {
+ err := cli.SubscribeStream(tt.args.item, tt.args.handler)
+ Ω(err).NotTo(HaveOccurred())
+ }
+ Ω(cli.Close()).NotTo(HaveOccurred())
+ })
})
- assert.NoError(t, err, "subscribe err")
- tests := []struct {
- name string
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "unsubcribe",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- assert.NoError(t, cli.UnSubscribe())
- assert.NoError(t, cli.Close())
+
+ Context("eventMeshGRPCClient_UnSubscribe test ", func() {
+ It("unsubcribe success", func() {
+ cli, err := New(&conf.GRPCConfig{
+ Host: "127.0.0.1",
+ Port: 8086,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-publish-group",
+ },
+ ConsumerConfig: conf.ConsumerConfig{
+ Enabled: true,
+ ConsumerGroup: "test-consumer-group-subscribe",
+ PoolSize: 5,
+ },
+ HeartbeatConfig: conf.HeartbeatConfig{
+ Period: time.Second * 5,
+ Timeout: time.Second * 3,
+ },
+ })
+ Ω(err).NotTo(HaveOccurred())
+
+ err = cli.SubscribeStream(conf.SubscribeItem{
+ SubscribeMode: 1,
+ SubscribeType: 1,
+ Topic: "topic-1",
+ }, func(message *proto.SimpleMessage) interface{} {
+ return nil
+ })
+ Ω(err).NotTo(HaveOccurred())
+ Ω(cli.UnSubscribe()).NotTo(HaveOccurred())
+ Ω(cli.Close()).NotTo(HaveOccurred())
})
- }
-}
+ })
-type fakeidg struct {
-}
+ Context("eventMeshGRPCClient_UnSubscribe test ", func() {
+ type args struct {
+ ctx context.Context
+ }
+ It("unsubcribe success", func() {
+ cli := &eventMeshGRPCClient{
+ idg: &fakeidg{},
+ }
-func (f *fakeidg) Next() string {
- return "fake"
-}
+ tests := []struct {
+ name string
+ args args
+ want string
+ }{
+ {
+ name: "setup with uid",
+ args: args{
+ ctx: context.WithValue(context.Background(), GRPC_ID_KEY, "value"),
+ },
+ want: "value",
+ },
+ {
+ name: "setup without uid",
+ args: args{
+ ctx: context.TODO(),
+ },
+ want: "fake",
+ },
+ }
-func Test_eventMeshGRPCClient_setupContext(t *testing.T) {
- type args struct {
- ctx context.Context
- }
-
- cli := &eventMeshGRPCClient{
- idg: &fakeidg{},
- }
- tests := []struct {
- name string
- args args
- want string
- }{
- {
- name: "setup with uid",
- args: args{
- ctx: context.WithValue(context.Background(), GRPC_ID_KEY, "value"),
- },
- want: "value",
- },
- {
- name: "setup without uid",
- args: args{
- ctx: context.TODO(),
- },
- want: "fake",
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- ctx := cli.setupContext(tt.args.ctx)
- assert.Equal(t, ctx.Value(GRPC_ID_KEY).(string), tt.want)
+ for _, tt := range tests {
+ ctx := cli.setupContext(tt.args.ctx)
+ Ω(ctx.Value(GRPC_ID_KEY).(string)).To(Equal(tt.want))
+ }
+
+ Ω(cli.Close()).NotTo(HaveOccurred())
})
- }
-}
+ })
+})
diff --git a/eventmesh-sdk-go/grpc/dispatcher_test.go b/eventmesh-sdk-go/grpc/dispatcher_test.go
index 49392057f..6362d3edf 100644
--- a/eventmesh-sdk-go/grpc/dispatcher_test.go
+++ b/eventmesh-sdk-go/grpc/dispatcher_test.go
@@ -16,50 +16,58 @@
package grpc
import (
- "fmt"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
"github.com/stretchr/testify/assert"
"sync"
- "testing"
)
-func Test_messageDispatcher_addHandler(t *testing.T) {
- type fields struct {
- topicMap *sync.Map
- poolsize int
- }
- type args struct {
- topic string
- hdl OnMessage
- }
- tests := []struct {
- name string
- fields fields
- args args
- wantErr assert.ErrorAssertionFunc
- }{
- {
- name: "test add handler",
- fields: fields{
- topicMap: new(sync.Map),
- poolsize: 5,
- },
- args: args{
- topic: "handler-1",
- hdl: func(message *proto.SimpleMessage) interface{} {
- t.Logf("handle message")
- return nil
+var _ = Describe("dispatcher test", func() {
+
+ Context("messageDispatcher_addHandler test ", func() {
+ type fields struct {
+ topicMap *sync.Map
+ poolsize int
+ }
+
+ type args struct {
+ topic string
+ hdl OnMessage
+ }
+ It("test add handler", func() {
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ wantErr assert.ErrorAssertionFunc
+ }{
+ {
+ name: "test add handler",
+ fields: fields{
+ topicMap: new(sync.Map),
+ poolsize: 5,
+ },
+ args: args{
+ topic: "handler-1",
+ hdl: func(message *proto.SimpleMessage) interface{} {
+ return nil
+ },
+ },
+ wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+ return true
+ },
},
- },
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- m := &messageDispatcher{
- topicMap: tt.fields.topicMap,
- poolsize: tt.fields.poolsize,
}
- tt.wantErr(t, m.addHandler(tt.args.topic, tt.args.hdl), fmt.Sprintf("addHandler(%v, %v)", tt.args.topic, tt.args.hdl))
+
+ for _, tt := range tests {
+ m := &messageDispatcher{
+ topicMap: tt.fields.topicMap,
+ poolsize: tt.fields.poolsize,
+ }
+ Ω(m.addHandler(tt.args.topic, tt.args.hdl)).To(BeNil())
+ }
})
- }
-}
+
+ })
+})
diff --git a/eventmesh-sdk-go/grpc/fake_grpcserver.go b/eventmesh-sdk-go/grpc/fake_grpcserver.go
index 969dc9f54..0cbf0ce6a 100644
--- a/eventmesh-sdk-go/grpc/fake_grpcserver.go
+++ b/eventmesh-sdk-go/grpc/fake_grpcserver.go
@@ -19,6 +19,7 @@ import (
"context"
"fmt"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/seq"
+ "google.golang.org/grpc"
"io"
"io/ioutil"
"net"
@@ -30,8 +31,6 @@ import (
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/id"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/log"
-
- "google.golang.org/grpc"
)
// fakeServer used to do the test
@@ -62,7 +61,9 @@ func runFakeServer(ctx context.Context) error {
go func() {
select {
case <-ctx.Done():
- srv.GracefulStop()
+ if srv != nil {
+ srv.GracefulStop()
+ }
}
}()
log.Infof("serve fake server on:%v", srv.GetServiceInfo())
@@ -98,6 +99,7 @@ func (f *fakeServer) Subscribe(ctx context.Context, msg *proto.Subscription) (*p
func (f *fakeServer) SubscribeStream(srv proto.ConsumerService_SubscribeStreamServer) error {
wg := new(sync.WaitGroup)
wg.Add(2)
+
go func() {
defer wg.Done()
for {
@@ -113,6 +115,7 @@ func (f *fakeServer) SubscribeStream(srv proto.ConsumerService_SubscribeStreamSe
log.Infof("rece sub:%s", sub.String())
}
}()
+
go func() {
defer func() {
wg.Done()
@@ -150,6 +153,7 @@ func (f *fakeServer) SubscribeStream(srv proto.ConsumerService_SubscribeStreamSe
time.Sleep(time.Second * 5)
}
}()
+
wg.Wait()
log.Infof("close SubscribeStream")
return nil
@@ -167,7 +171,7 @@ func (f *fakeServer) Unsubscribe(ctx context.Context, msg *proto.Subscription) (
func (f *fakeServer) Heartbeat(ctx context.Context, msg *proto.Heartbeat) (*proto.Response, error) {
log.Infof("fake-server, receive heartbeat request:%v", msg.String())
return &proto.Response{
- RespCode: "OK",
+ RespCode: "0",
RespMsg: "OK",
RespTime: time.Now().Format("2006-01-02 15:04:05"),
}, nil
diff --git a/eventmesh-sdk-go/grpc/grpc_suite_test.go b/eventmesh-sdk-go/grpc/grpc_suite_test.go
new file mode 100644
index 000000000..4689619b4
--- /dev/null
+++ b/eventmesh-sdk-go/grpc/grpc_suite_test.go
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+ "context"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
+ "testing"
+)
+
+func TestGRPC(t *testing.T) {
+ RegisterFailHandler(Fail)
+ RunSpecs(t, "grpc module Tests")
+}
+
+var gcancel context.CancelFunc
+
+var _ = BeforeSuite(func() {
+ ctx, cancel := context.WithCancel(context.Background())
+ //defer cancel()
+ gcancel = cancel
+ go runFakeServer(ctx)
+ go runWebhookServer(ctx)
+})
+
+var _ = AfterSuite(func() {
+ gcancel()
+})
diff --git a/eventmesh-sdk-go/grpc/heartbeat_test.go b/eventmesh-sdk-go/grpc/heartbeat_test.go
index 45f6ed161..8a6de3371 100644
--- a/eventmesh-sdk-go/grpc/heartbeat_test.go
+++ b/eventmesh-sdk-go/grpc/heartbeat_test.go
@@ -16,61 +16,49 @@
package grpc
import (
- "context"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
- "github.com/stretchr/testify/assert"
- "testing"
+ . "github.com/onsi/ginkgo"
+ . "github.com/onsi/gomega"
"time"
)
-func Test_eventMeshHeartbeat_sendMsg(t *testing.T) {
- // run fake server
- ctx, cancel := context.WithCancel(context.Background())
- defer cancel()
- go runFakeServer(ctx)
- cli, err := New(&conf.GRPCConfig{
- Host: "127.0.0.1",
- Port: 8086,
- ProducerConfig: conf.ProducerConfig{
- ProducerGroup: "test-publish-group",
- LoadBalancerType: conf.Random,
- },
- ConsumerConfig: conf.ConsumerConfig{
- Enabled: true,
- ConsumerGroup: "fake-consumer",
- PoolSize: 5,
- },
- HeartbeatConfig: conf.HeartbeatConfig{
- Period: time.Second * 5,
- Timeout: time.Second * 3,
- },
- })
- topic := "fake-topic"
- assert.NoError(t, cli.SubscribeStream(conf.SubscribeItem{
- SubscribeType: 1,
- SubscribeMode: 1,
- Topic: topic,
- }, func(message *proto.SimpleMessage) interface{} {
- t.Logf("receive sub msg:%v", message.String())
- return nil
- }))
- rcli := cli.(*eventMeshGRPCClient)
- beat := rcli.heartbeat
- assert.NoError(t, err, "create grpc client")
- defer assert.NoError(t, cli.Close())
- tests := []struct {
- name string
- want error
- }{
- {
- want: nil,
- },
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- err := beat.sendMsg(beat.client)
- assert.NoError(t, err)
+var _ = Describe("dispatcher test", func() {
+
+ Context("messageDispatcher_addHandler test ", func() {
+ It("fake-topic", func() {
+ cli, err := New(&conf.GRPCConfig{
+ Host: "127.0.0.1",
+ Port: 8086,
+ ProducerConfig: conf.ProducerConfig{
+ ProducerGroup: "test-publish-group",
+ LoadBalancerType: conf.Random,
+ },
+ ConsumerConfig: conf.ConsumerConfig{
+ Enabled: true,
+ ConsumerGroup: "fake-consumer",
+ PoolSize: 5,
+ },
+ HeartbeatConfig: conf.HeartbeatConfig{
+ Period: time.Second * 50,
+ Timeout: time.Second * 30,
+ },
+ })
+ Ω(err).NotTo(HaveOccurred())
+
+ topic := "fake-topic"
+ Ω(cli.SubscribeStream(conf.SubscribeItem{
+ SubscribeType: 1,
+ SubscribeMode: 1,
+ Topic: topic,
+ }, func(message *proto.SimpleMessage) interface{} {
+ return nil
+ })).NotTo(HaveOccurred())
+
+ rcli := cli.(*eventMeshGRPCClient)
+ beat := rcli.heartbeat
+ Ω(beat.sendMsg(beat.client)).NotTo(HaveOccurred())
+ Ω(cli.Close()).NotTo(HaveOccurred())
})
- }
-}
+ })
+})
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org