You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2023/01/08 09:04:34 UTC

[incubator-eventmesh] branch master updated: fix issue2855

This is an automated email from the ASF dual-hosted git repository.

walleliu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new f2d697cf4 fix issue2855
     new 23855e66e Merge pull request #2857 from jonyangx/issue2855
f2d697cf4 is described below

commit f2d697cf408f0a9101365af1a6180aa9878ddd14
Author: jonyangx <jo...@gmail.com>
AuthorDate: Sun Jan 8 16:07:35 2023 +0800

    fix issue2855
---
 eventmesh-sdk-go/common/id/id_snake_test.go |   4 +-
 eventmesh-sdk-go/grpc/client.go             |   9 +-
 eventmesh-sdk-go/grpc/client_test.go        | 905 +++++++++++++++-------------
 eventmesh-sdk-go/grpc/dispatcher_test.go    |  86 +--
 eventmesh-sdk-go/grpc/fake_grpcserver.go    |  12 +-
 eventmesh-sdk-go/grpc/grpc_suite_test.go    |  44 ++
 eventmesh-sdk-go/grpc/heartbeat_test.go     |  92 ++-
 7 files changed, 649 insertions(+), 503 deletions(-)

diff --git a/eventmesh-sdk-go/common/id/id_snake_test.go b/eventmesh-sdk-go/common/id/id_snake_test.go
index 22228e96d..54d69365f 100644
--- a/eventmesh-sdk-go/common/id/id_snake_test.go
+++ b/eventmesh-sdk-go/common/id/id_snake_test.go
@@ -70,12 +70,12 @@ var _ = Describe("id_snake test", func() {
 
 			flake := NewFlakeWithSonyflake(aSonyflake)
 
-			want := "over the time limit"
+			want := "test error"
 			var ret string
 			defer func() {
 				if err := recover(); err != nil {
 					ret = err.(error).Error()
-					Ω(ret).To(Equal(want))
+					Ω(want).To(Equal(ret))
 				}
 				mockPatches.Reset()
 			}()
diff --git a/eventmesh-sdk-go/grpc/client.go b/eventmesh-sdk-go/grpc/client.go
index 96abf56a4..bb8924d4e 100644
--- a/eventmesh-sdk-go/grpc/client.go
+++ b/eventmesh-sdk-go/grpc/client.go
@@ -174,7 +174,7 @@ func (e *eventMeshGRPCClient) setupContext(ctx context.Context) context.Context
 
 // Close meshclient and free all resources
 func (e *eventMeshGRPCClient) Close() error {
-	log.Infof("close grpc client")
+	log.Infof("begin close grpc client")
 	if e.cancel != nil {
 		e.cancel()
 	}
@@ -190,8 +190,11 @@ func (e *eventMeshGRPCClient) Close() error {
 		}
 		e.eventMeshConsumer = nil
 	}
-	if err := e.grpcConn.Close(); err != nil {
-		log.Warnf("err in close conn with err:%v", err)
+
+	if e.grpcConn != nil {
+		if err := e.grpcConn.Close(); err != nil {
+			log.Warnf("err in close conn with err:%v", err)
+		}
 	}
 
 	log.Infof("success close grpc client")
diff --git a/eventmesh-sdk-go/grpc/client_test.go b/eventmesh-sdk-go/grpc/client_test.go
index f3ebf3b8e..fa1c0b16c 100644
--- a/eventmesh-sdk-go/grpc/client_test.go
+++ b/eventmesh-sdk-go/grpc/client_test.go
@@ -19,446 +19,545 @@ import (
 	"context"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
 	"github.com/stretchr/testify/assert"
 	"google.golang.org/grpc"
-	"testing"
 	"time"
 )
 
-func Test_newEventMeshGRPCClient(t *testing.T) {
-	type args struct {
-		cfg *conf.GRPCConfig
-	}
-	tests := []struct {
-		name    string
-		args    args
-		want    *eventMeshGRPCClient
-		wantErr bool
-	}{
-		{
-			name: "host is empty",
-			args: args{cfg: &conf.GRPCConfig{
-				Host: "",
-			}},
-			wantErr: true,
-			want:    nil,
-		},
-		{
-			name: "producer wrong",
-			args: args{cfg: &conf.GRPCConfig{
-				Host:           "1.1.1.1",
-				ProducerConfig: conf.ProducerConfig{},
-			}},
-			wantErr: true,
-			want:    nil,
-		},
-		{
-			name: "client with send msg",
-			args: args{cfg: &conf.GRPCConfig{
-				Host:         "101.43.84.47",
-				Port:         10205,
-				ENV:          "sendmsgenv",
-				Region:       "sh",
-				IDC:          "idc01",
-				SYS:          "test-system",
-				ProtocolType: "grpc",
-				ProducerConfig: conf.ProducerConfig{
-					ProducerGroup: "test-producer-group",
+type fakeidg struct {
+}
+
+func (f *fakeidg) Next() string {
+	return "fake"
+}
+
+var _ = Describe("client test", func() {
+
+	Context("newEventMeshGRPCClient() test ", func() {
+		type args struct {
+			cfg *conf.GRPCConfig
+		}
+
+		It("host is empty", func() {
+			tests := []struct {
+				name    string
+				args    args
+				want    *eventMeshGRPCClient
+				wantErr bool
+			}{
+				{
+					name: "host is empty",
+					args: args{cfg: &conf.GRPCConfig{
+						Host: "",
+					}},
+					wantErr: true,
+					want:    nil,
 				},
-				Username: "user",
-				Password: "passwd",
-			}},
-			want:    nil,
-			wantErr: false,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			cli, err := newEventMeshGRPCClient(tt.args.cfg)
-			if (err != nil) != tt.wantErr {
-				t.Errorf("newEventMeshGRPCClient() error = %v, wantErr %v", err, tt.wantErr)
-				return
 			}
-			if err == nil {
-				assert.NoError(t, cli.Close())
+
+			for _, tt := range tests {
+				cli, err := newEventMeshGRPCClient(tt.args.cfg)
+				if tt.wantErr {
+					Ω(err).To(HaveOccurred())
+				} else {
+					Ω(err).NotTo(HaveOccurred())
+				}
+
+				if cli != nil {
+					Ω(cli.Close()).NotTo(HaveOccurred())
+				}
 			}
 		})
-	}
-}
 
-func Test_multiple_set_context(t *testing.T) {
-	root := context.Background()
-	onec, cancel := context.WithTimeout(root, time.Second*5)
-	defer cancel()
-	valc := context.WithValue(onec, "test", "got")
+		It("producer wrong", func() {
+			tests := []struct {
+				name    string
+				args    args
+				want    *eventMeshGRPCClient
+				wantErr bool
+			}{
+				{
+					name: "producer wrong",
+					args: args{cfg: &conf.GRPCConfig{
+						Host:           "1.1.1.1",
+						ProducerConfig: conf.ProducerConfig{},
+					}},
+					wantErr: false,
+					want:    nil,
+				},
+			}
 
-	select {
-	case <-valc.Done():
-		val := valc.Value("test")
-		t.Logf("5 s reached, value in context:%v", val)
-	case <-time.After(time.Second * 10):
-		t.Logf("ooor, 10s timeout")
-	}
+			for _, tt := range tests {
+				cli, err := newEventMeshGRPCClient(tt.args.cfg)
+				if tt.wantErr {
+					Ω(err).To(HaveOccurred())
+				} else {
+					Ω(err).NotTo(HaveOccurred())
 
-}
+				}
 
-func Test_eventMeshGRPCClient_Publish(t *testing.T) {
-	// run fake server
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go runFakeServer(ctx)
-	cli, err := New(&conf.GRPCConfig{
-		Host: "127.0.0.1",
-		Port: 8086,
-		ProducerConfig: conf.ProducerConfig{
-			ProducerGroup: "test-publish-group",
-		},
-		ConsumerConfig: conf.ConsumerConfig{
-			Enabled: false,
-		},
-	})
-	assert.NoError(t, err, "create grpc client")
-	type args struct {
-		ctx  context.Context
-		msg  *proto.SimpleMessage
-		opts []grpc.CallOption
-	}
-	tests := []struct {
-		name    string
-		args    args
-		want    *proto.Response
-		wantErr assert.ErrorAssertionFunc
-	}{
-		{
-			name: "publish msg",
-			args: args{
-				ctx: context.TODO(),
-				msg: &proto.SimpleMessage{
-					Header: &proto.RequestHeader{},
-					Topic:  "test-publish-topic",
-				},
-			},
-			wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
-				return true
-			},
-		},
-		{
-			name: "publish with timeout",
-			args: args{
-				ctx: func() context.Context {
-					ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
-					return ctx
-				}(),
-				msg: &proto.SimpleMessage{
-					Header: &proto.RequestHeader{},
-					Topic:  "test-timeout-topic",
+				if cli != nil {
+					Ω(cli.Close()).NotTo(HaveOccurred())
+				}
+			}
+		})
+
+		It("client with send msg", func() {
+			tests := []struct {
+				name    string
+				args    args
+				want    *eventMeshGRPCClient
+				wantErr bool
+			}{
+				{
+					name: "client with send msg",
+					args: args{cfg: &conf.GRPCConfig{
+						Host:         "101.43.84.47",
+						Port:         10205,
+						ENV:          "sendmsgenv",
+						Region:       "sh",
+						IDC:          "idc01",
+						SYS:          "test-system",
+						ProtocolType: "grpc",
+						ProducerConfig: conf.ProducerConfig{
+							ProducerGroup: "test-producer-group",
+						},
+						Username: "user",
+						Password: "passwd",
+					}},
+					want:    nil,
+					wantErr: false,
 				},
-			},
-			wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
-				return true
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			got, err := cli.Publish(tt.args.ctx, tt.args.msg, tt.args.opts...)
-			assert.NoError(t, err)
-			t.Logf("receive publish response:%v", got.String())
-			assert.NoError(t, cli.Close())
+			}
+
+			for _, tt := range tests {
+				cli, err := newEventMeshGRPCClient(tt.args.cfg)
+				if tt.wantErr {
+					Ω(err).To(HaveOccurred())
+				} else {
+					Ω(err).NotTo(HaveOccurred())
+
+				}
+
+				if cli != nil {
+					Ω(cli.Close()).NotTo(HaveOccurred())
+				}
+			}
 		})
-	}
-}
 
-func Test_eventMeshGRPCClient_RequestReply(t *testing.T) {
-	// run fake server
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go runFakeServer(ctx)
-	cli, err := New(&conf.GRPCConfig{
-		Host: "127.0.0.1",
-		Port: 8086,
-		ProducerConfig: conf.ProducerConfig{
-			ProducerGroup: "test-publish-group",
-		},
-		ConsumerConfig: conf.ConsumerConfig{
-			Enabled: false,
-		},
 	})
-	assert.NoError(t, err, "create grpc client")
-	type args struct {
-		ctx  context.Context
-		msg  *proto.SimpleMessage
-		opts []grpc.CallOption
-	}
-	tests := []struct {
-		name    string
-		args    args
-		want    *proto.SimpleMessage
-		wantErr assert.ErrorAssertionFunc
-	}{
-		{
-			name: "test-request-reply",
-			args: args{
-				ctx: context.TODO(),
-				msg: &proto.SimpleMessage{
-					Header: &proto.RequestHeader{},
-					Topic:  "test-request-reply-topic",
-				},
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			got, err := cli.RequestReply(tt.args.ctx, tt.args.msg, tt.args.opts...)
-			assert.NoError(t, err)
-			t.Logf("receive request reply response:%v", got.String())
-			assert.NoError(t, cli.Close())
+
+	Context("multiple_set_context() test ", func() {
+		It("should done", func() {
+			root := context.Background()
+			onec, cancel := context.WithTimeout(root, time.Second*3)
+			defer cancel()
+			val := "test"
+			go func() {
+
+				select {
+				case <-onec.Done():
+					val = "test"
+				case <-time.After(time.Second * 1):
+					val = ""
+				}
+			}()
+
+			time.Sleep(2 * time.Second)
+			Ω(val).To(Equal(""))
+		})
+
+		It("should timeout", func() {
+			root := context.Background()
+			onec, cancel := context.WithTimeout(root, time.Second*1)
+			defer cancel()
+
+			val := "test"
+			go func() {
+
+				select {
+				case <-onec.Done():
+					val = "test"
+					break
+				case <-time.After(time.Second * 3):
+					val = ""
+				}
+			}()
+
+			time.Sleep(2 * time.Second)
+			Ω(val).To(Equal("test"))
 		})
-	}
-}
 
-func Test_eventMeshGRPCClient_BatchPublish(t *testing.T) {
-	// run fake server
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go runFakeServer(ctx)
-	cli, err := New(&conf.GRPCConfig{
-		Host: "127.0.0.1",
-		Port: 8086,
-		ProducerConfig: conf.ProducerConfig{
-			ProducerGroup: "test-publish-group",
-		},
-		ConsumerConfig: conf.ConsumerConfig{
-			Enabled: false,
-		},
 	})
-	assert.NoError(t, err, "create grpc client")
-	type args struct {
-		ctx  context.Context
-		msg  *proto.BatchMessage
-		opts []grpc.CallOption
-	}
-	tests := []struct {
-		name    string
-		args    args
-		want    *proto.Response
-		wantErr assert.ErrorAssertionFunc
-	}{
-		{
-			name: "test batch publish",
-			args: args{
-				ctx: context.TODO(),
-				msg: &proto.BatchMessage{
-					Header:        &proto.RequestHeader{},
-					ProducerGroup: "fake-batch-group",
-					Topic:         "fake-batch-topic",
-					MessageItem: []*proto.BatchMessage_MessageItem{
-						{
-							Content:  "batch-1",
-							Ttl:      "1",
-							UniqueId: "batch-id",
-							SeqNum:   "1",
-							Tag:      "tag",
-							Properties: map[string]string{
-								"from": "test",
-								"type": "batch-msg",
-							},
+
+	Context("eventMeshGRPCClient_Publish test ", func() {
+
+		type args struct {
+			ctx  context.Context
+			msg  *proto.SimpleMessage
+			opts []grpc.CallOption
+		}
+
+		It("publish msg", func() {
+			cli, err := New(&conf.GRPCConfig{
+				Host: "127.0.0.1",
+				Port: 8086,
+				ProducerConfig: conf.ProducerConfig{
+					ProducerGroup: "test-publish-group",
+				},
+				ConsumerConfig: conf.ConsumerConfig{
+					Enabled: false,
+				},
+			})
+			Ω(err).NotTo(HaveOccurred())
+
+			tests := []struct {
+				name    string
+				args    args
+				want    *proto.Response
+				wantErr assert.ErrorAssertionFunc
+			}{
+				{
+					name: "publish msg",
+					args: args{
+						ctx: context.TODO(),
+						msg: &proto.SimpleMessage{
+							Header: &proto.RequestHeader{},
+							Topic:  "test-publish-topic",
 						},
-						{
-							Content:  "batch-2",
-							Ttl:      "2",
-							UniqueId: "batch-id",
-							SeqNum:   "2",
-							Tag:      "tag",
-							Properties: map[string]string{
-								"from": "test",
-								"type": "batch-msg",
-							},
+					},
+					wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+						return true
+					},
+				},
+			}
+
+			for _, tt := range tests {
+				_, err := cli.Publish(tt.args.ctx, tt.args.msg, tt.args.opts...)
+				Ω(err).NotTo(HaveOccurred())
+			}
+
+			Ω(cli.Close()).NotTo(HaveOccurred())
+
+		})
+
+		It("publish with timeout", func() {
+			cli, err := New(&conf.GRPCConfig{
+				Host: "127.0.0.1",
+				Port: 8086,
+				ProducerConfig: conf.ProducerConfig{
+					ProducerGroup: "test-publish-group",
+				},
+				ConsumerConfig: conf.ConsumerConfig{
+					Enabled: false,
+				},
+			})
+			Ω(err).NotTo(HaveOccurred())
+			tests := []struct {
+				name    string
+				args    args
+				want    *proto.Response
+				wantErr assert.ErrorAssertionFunc
+			}{
+				{
+					name: "publish with timeout",
+					args: args{
+						ctx: func() context.Context {
+							ctx, _ := context.WithTimeout(context.Background(), time.Second*10)
+							return ctx
+						}(),
+						msg: &proto.SimpleMessage{
+							Header: &proto.RequestHeader{},
+							Topic:  "test-timeout-topic",
 						},
 					},
+					wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+						return true
+					},
 				},
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			got, err := cli.BatchPublish(tt.args.ctx, tt.args.msg, tt.args.opts...)
-			assert.NoError(t, err)
-			t.Logf("receive request reply response:%v", got.String())
-			assert.NoError(t, cli.Close())
+			}
+
+			for _, tt := range tests {
+				_, err := cli.Publish(tt.args.ctx, tt.args.msg, tt.args.opts...)
+				Ω(err).NotTo(HaveOccurred())
+			}
+
+			Ω(cli.Close()).NotTo(HaveOccurred())
+
 		})
-	}
-}
 
-func Test_eventMeshGRPCClient_webhook_subscribe(t *testing.T) {
-	ctx, cancel := context.WithTimeout(context.Background(), time.Second*3)
-	defer cancel()
-	go runWebhookServer(ctx)
-	go runFakeServer(ctx)
-	cli, err := New(&conf.GRPCConfig{
-		Host: "127.0.0.1",
-		Port: 8086,
-		ProducerConfig: conf.ProducerConfig{
-			ProducerGroup: "test-publish-group",
-		},
-		ConsumerConfig: conf.ConsumerConfig{
-			Enabled:       true,
-			ConsumerGroup: "test-consumer-group-subscribe",
-			PoolSize:      5,
-		},
-		HeartbeatConfig: conf.HeartbeatConfig{
-			Period:  time.Second * 5,
-			Timeout: time.Second * 3,
-		},
 	})
-	assert.NoError(t, err, "create grpc client")
-	assert.NoError(t, cli.SubscribeWebhook(conf.SubscribeItem{
-		SubscribeMode: 1,
-		SubscribeType: 1,
-		Topic:         "topic-1",
-	}, "http://localhost:8080/onmessage"))
-	time.Sleep(time.Second * 5)
-}
 
-func Test_eventMeshGRPCClient_Subscribe(t *testing.T) {
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go runFakeServer(ctx)
-	cli, err := New(&conf.GRPCConfig{
-		Host: "127.0.0.1",
-		Port: 8086,
-		ProducerConfig: conf.ProducerConfig{
-			ProducerGroup: "test-publish-group",
-		},
-		ConsumerConfig: conf.ConsumerConfig{
-			Enabled:       true,
-			ConsumerGroup: "test-consumer-group-subscribe",
-			PoolSize:      5,
-		},
-		HeartbeatConfig: conf.HeartbeatConfig{
-			Period:  time.Second * 5,
-			Timeout: time.Second * 3,
-		},
+	Context("eventMeshGRPCClient_RequestReply test ", func() {
+		type args struct {
+			ctx  context.Context
+			msg  *proto.SimpleMessage
+			opts []grpc.CallOption
+		}
+
+		It("test-request-reply", func() {
+			cli, err := New(&conf.GRPCConfig{
+				Host: "127.0.0.1",
+				Port: 8086,
+				ProducerConfig: conf.ProducerConfig{
+					ProducerGroup: "test-publish-group",
+				},
+				ConsumerConfig: conf.ConsumerConfig{
+					Enabled: false,
+				},
+			})
+			Ω(err).NotTo(HaveOccurred())
+
+			tests := []struct {
+				name    string
+				args    args
+				want    *proto.SimpleMessage
+				wantErr assert.ErrorAssertionFunc
+			}{
+				{
+					name: "test-request-reply",
+					args: args{
+						ctx: context.TODO(),
+						msg: &proto.SimpleMessage{
+							Header: &proto.RequestHeader{},
+							Topic:  "test-request-reply-topic",
+						},
+					},
+				},
+			}
+			for _, tt := range tests {
+				_, err := cli.RequestReply(tt.args.ctx, tt.args.msg, tt.args.opts...)
+				Ω(err).NotTo(HaveOccurred())
+			}
+			Ω(cli.Close()).NotTo(HaveOccurred())
+		})
 	})
-	assert.NoError(t, err, "create grpc client")
-	type args struct {
-		item    conf.SubscribeItem
-		handler OnMessage
-	}
-	tests := []struct {
-		name    string
-		args    args
-		wantErr assert.ErrorAssertionFunc
-	}{
-		{
-			name: "subcribe one",
-			args: args{
-				item: conf.SubscribeItem{
-					SubscribeMode: 1,
-					SubscribeType: 1,
-					Topic:         "topic-1",
+
+	Context("eventMeshGRPCClient_RequestReply test ", func() {
+		type args struct {
+			ctx  context.Context
+			msg  *proto.BatchMessage
+			opts []grpc.CallOption
+		}
+
+		It("test batch publish", func() {
+			cli, err := New(&conf.GRPCConfig{
+				Host: "127.0.0.1",
+				Port: 8086,
+				ProducerConfig: conf.ProducerConfig{
+					ProducerGroup: "test-publish-group",
+				},
+				ConsumerConfig: conf.ConsumerConfig{
+					Enabled: false,
 				},
-				handler: func(message *proto.SimpleMessage) interface{} {
-					t.Logf("receive subscribe response:%s", message.String())
-					return nil
+			})
+			Ω(err).NotTo(HaveOccurred())
+
+			tests := []struct {
+				name    string
+				args    args
+				want    *proto.Response
+				wantErr assert.ErrorAssertionFunc
+			}{
+				{
+					name: "test batch publish",
+					args: args{
+						ctx: context.TODO(),
+						msg: &proto.BatchMessage{
+							Header:        &proto.RequestHeader{},
+							ProducerGroup: "fake-batch-group",
+							Topic:         "fake-batch-topic",
+							MessageItem: []*proto.BatchMessage_MessageItem{
+								{
+									Content:  "batch-1",
+									Ttl:      "1",
+									UniqueId: "batch-id",
+									SeqNum:   "1",
+									Tag:      "tag",
+									Properties: map[string]string{
+										"from": "test",
+										"type": "batch-msg",
+									},
+								},
+								{
+									Content:  "batch-2",
+									Ttl:      "2",
+									UniqueId: "batch-id",
+									SeqNum:   "2",
+									Tag:      "tag",
+									Properties: map[string]string{
+										"from": "test",
+										"type": "batch-msg",
+									},
+								},
+							},
+						},
+					},
 				},
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			err := cli.SubscribeStream(tt.args.item, tt.args.handler)
-			assert.NoError(t, err)
-			assert.NoError(t, cli.Close())
+			}
+
+			for _, tt := range tests {
+				_, err := cli.BatchPublish(tt.args.ctx, tt.args.msg, tt.args.opts...)
+				Ω(err).NotTo(HaveOccurred())
+
+			}
+			Ω(cli.Close()).NotTo(HaveOccurred())
 		})
-	}
-}
+	})
 
-func Test_eventMeshGRPCClient_UnSubscribe(t *testing.T) {
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go runFakeServer(ctx)
-	cli, err := New(&conf.GRPCConfig{
-		Host: "127.0.0.1",
-		Port: 8086,
-		ProducerConfig: conf.ProducerConfig{
-			ProducerGroup: "test-publish-group",
-		},
-		ConsumerConfig: conf.ConsumerConfig{
-			Enabled:       true,
-			ConsumerGroup: "test-consumer-group-subscribe",
-			PoolSize:      5,
-		},
-		HeartbeatConfig: conf.HeartbeatConfig{
-			Period:  time.Second * 5,
-			Timeout: time.Second * 3,
-		},
+	Context("eventMeshGRPCClient_webhook_subscribe test ", func() {
+		It("webhook_subscribe success", func() {
+			cli, err := New(&conf.GRPCConfig{
+				Host: "127.0.0.1",
+				Port: 8086,
+				ProducerConfig: conf.ProducerConfig{
+					ProducerGroup: "test-publish-group",
+				},
+				ConsumerConfig: conf.ConsumerConfig{
+					Enabled:       true,
+					ConsumerGroup: "test-consumer-group-subscribe",
+					PoolSize:      5,
+				},
+				HeartbeatConfig: conf.HeartbeatConfig{
+					Period:  time.Second * 5,
+					Timeout: time.Second * 3,
+				},
+			})
+			Ω(err).NotTo(HaveOccurred())
+			Ω(cli.SubscribeWebhook(conf.SubscribeItem{
+				SubscribeMode: 1,
+				SubscribeType: 1,
+				Topic:         "topic-1",
+			}, "http://localhost:8080/onmessage")).NotTo(HaveOccurred())
+			time.Sleep(time.Second * 5)
+			Ω(cli.Close()).NotTo(HaveOccurred())
+		})
 	})
-	assert.NoError(t, err, "create grpc client")
-	err = cli.SubscribeStream(conf.SubscribeItem{
-		SubscribeMode: 1,
-		SubscribeType: 1,
-		Topic:         "topic-1",
-	}, func(message *proto.SimpleMessage) interface{} {
-		t.Logf("receive subscribe response:%s", message.String())
-		return nil
+
+	Context("eventMeshGRPCClient_Subscribe test ", func() {
+
+		type args struct {
+			item    conf.SubscribeItem
+			handler OnMessage
+		}
+
+		It("subcribe one success", func() {
+			cli, err := New(&conf.GRPCConfig{
+				Host: "127.0.0.1",
+				Port: 8086,
+				ProducerConfig: conf.ProducerConfig{
+					ProducerGroup: "test-publish-group",
+				},
+				ConsumerConfig: conf.ConsumerConfig{
+					Enabled:       true,
+					ConsumerGroup: "test-consumer-group-subscribe",
+					PoolSize:      5,
+				},
+				HeartbeatConfig: conf.HeartbeatConfig{
+					Period:  time.Second * 5,
+					Timeout: time.Second * 3,
+				},
+			})
+			Ω(err).NotTo(HaveOccurred())
+
+			tests := []struct {
+				name    string
+				args    args
+				wantErr assert.ErrorAssertionFunc
+			}{
+				{
+					name: "subcribe one",
+					args: args{
+						item: conf.SubscribeItem{
+							SubscribeMode: 1,
+							SubscribeType: 1,
+							Topic:         "topic-1",
+						},
+						handler: func(message *proto.SimpleMessage) interface{} {
+							return nil
+						},
+					},
+				},
+			}
+			for _, tt := range tests {
+				err := cli.SubscribeStream(tt.args.item, tt.args.handler)
+				Ω(err).NotTo(HaveOccurred())
+			}
+			Ω(cli.Close()).NotTo(HaveOccurred())
+		})
 	})
-	assert.NoError(t, err, "subscribe err")
-	tests := []struct {
-		name    string
-		wantErr assert.ErrorAssertionFunc
-	}{
-		{
-			name: "unsubcribe",
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			assert.NoError(t, cli.UnSubscribe())
-			assert.NoError(t, cli.Close())
+
+	Context("eventMeshGRPCClient_UnSubscribe test ", func() {
+		It("unsubcribe success", func() {
+			cli, err := New(&conf.GRPCConfig{
+				Host: "127.0.0.1",
+				Port: 8086,
+				ProducerConfig: conf.ProducerConfig{
+					ProducerGroup: "test-publish-group",
+				},
+				ConsumerConfig: conf.ConsumerConfig{
+					Enabled:       true,
+					ConsumerGroup: "test-consumer-group-subscribe",
+					PoolSize:      5,
+				},
+				HeartbeatConfig: conf.HeartbeatConfig{
+					Period:  time.Second * 5,
+					Timeout: time.Second * 3,
+				},
+			})
+			Ω(err).NotTo(HaveOccurred())
+
+			err = cli.SubscribeStream(conf.SubscribeItem{
+				SubscribeMode: 1,
+				SubscribeType: 1,
+				Topic:         "topic-1",
+			}, func(message *proto.SimpleMessage) interface{} {
+				return nil
+			})
+			Ω(err).NotTo(HaveOccurred())
+			Ω(cli.UnSubscribe()).NotTo(HaveOccurred())
+			Ω(cli.Close()).NotTo(HaveOccurred())
 		})
-	}
-}
+	})
 
-type fakeidg struct {
-}
+	Context("eventMeshGRPCClient_UnSubscribe test ", func() {
+		type args struct {
+			ctx context.Context
+		}
+		It("unsubcribe success", func() {
+			cli := &eventMeshGRPCClient{
+				idg: &fakeidg{},
+			}
 
-func (f *fakeidg) Next() string {
-	return "fake"
-}
+			tests := []struct {
+				name string
+				args args
+				want string
+			}{
+				{
+					name: "setup with uid",
+					args: args{
+						ctx: context.WithValue(context.Background(), GRPC_ID_KEY, "value"),
+					},
+					want: "value",
+				},
+				{
+					name: "setup without uid",
+					args: args{
+						ctx: context.TODO(),
+					},
+					want: "fake",
+				},
+			}
 
-func Test_eventMeshGRPCClient_setupContext(t *testing.T) {
-	type args struct {
-		ctx context.Context
-	}
-
-	cli := &eventMeshGRPCClient{
-		idg: &fakeidg{},
-	}
-	tests := []struct {
-		name string
-		args args
-		want string
-	}{
-		{
-			name: "setup with uid",
-			args: args{
-				ctx: context.WithValue(context.Background(), GRPC_ID_KEY, "value"),
-			},
-			want: "value",
-		},
-		{
-			name: "setup without uid",
-			args: args{
-				ctx: context.TODO(),
-			},
-			want: "fake",
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			ctx := cli.setupContext(tt.args.ctx)
-			assert.Equal(t, ctx.Value(GRPC_ID_KEY).(string), tt.want)
+			for _, tt := range tests {
+				ctx := cli.setupContext(tt.args.ctx)
+				Ω(ctx.Value(GRPC_ID_KEY).(string)).To(Equal(tt.want))
+			}
+
+			Ω(cli.Close()).NotTo(HaveOccurred())
 		})
-	}
-}
+	})
+})
diff --git a/eventmesh-sdk-go/grpc/dispatcher_test.go b/eventmesh-sdk-go/grpc/dispatcher_test.go
index 49392057f..6362d3edf 100644
--- a/eventmesh-sdk-go/grpc/dispatcher_test.go
+++ b/eventmesh-sdk-go/grpc/dispatcher_test.go
@@ -16,50 +16,58 @@
 package grpc
 
 import (
-	"fmt"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
 	"github.com/stretchr/testify/assert"
 	"sync"
-	"testing"
 )
 
-func Test_messageDispatcher_addHandler(t *testing.T) {
-	type fields struct {
-		topicMap *sync.Map
-		poolsize int
-	}
-	type args struct {
-		topic string
-		hdl   OnMessage
-	}
-	tests := []struct {
-		name    string
-		fields  fields
-		args    args
-		wantErr assert.ErrorAssertionFunc
-	}{
-		{
-			name: "test add handler",
-			fields: fields{
-				topicMap: new(sync.Map),
-				poolsize: 5,
-			},
-			args: args{
-				topic: "handler-1",
-				hdl: func(message *proto.SimpleMessage) interface{} {
-					t.Logf("handle message")
-					return nil
+var _ = Describe("dispatcher test", func() {
+
+	Context("messageDispatcher_addHandler test ", func() {
+		type fields struct {
+			topicMap *sync.Map
+			poolsize int
+		}
+
+		type args struct {
+			topic string
+			hdl   OnMessage
+		}
+		It("test add handler", func() {
+			tests := []struct {
+				name    string
+				fields  fields
+				args    args
+				wantErr assert.ErrorAssertionFunc
+			}{
+				{
+					name: "test add handler",
+					fields: fields{
+						topicMap: new(sync.Map),
+						poolsize: 5,
+					},
+					args: args{
+						topic: "handler-1",
+						hdl: func(message *proto.SimpleMessage) interface{} {
+							return nil
+						},
+					},
+					wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+						return true
+					},
 				},
-			},
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			m := &messageDispatcher{
-				topicMap: tt.fields.topicMap,
-				poolsize: tt.fields.poolsize,
 			}
-			tt.wantErr(t, m.addHandler(tt.args.topic, tt.args.hdl), fmt.Sprintf("addHandler(%v, %v)", tt.args.topic, tt.args.hdl))
+
+			for _, tt := range tests {
+				m := &messageDispatcher{
+					topicMap: tt.fields.topicMap,
+					poolsize: tt.fields.poolsize,
+				}
+				Ω(m.addHandler(tt.args.topic, tt.args.hdl)).To(BeNil())
+			}
 		})
-	}
-}
+
+	})
+})
diff --git a/eventmesh-sdk-go/grpc/fake_grpcserver.go b/eventmesh-sdk-go/grpc/fake_grpcserver.go
index 969dc9f54..0cbf0ce6a 100644
--- a/eventmesh-sdk-go/grpc/fake_grpcserver.go
+++ b/eventmesh-sdk-go/grpc/fake_grpcserver.go
@@ -19,6 +19,7 @@ import (
 	"context"
 	"fmt"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/seq"
+	"google.golang.org/grpc"
 	"io"
 	"io/ioutil"
 	"net"
@@ -30,8 +31,6 @@ import (
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/common/id"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/log"
-
-	"google.golang.org/grpc"
 )
 
 // fakeServer used to do the test
@@ -62,7 +61,9 @@ func runFakeServer(ctx context.Context) error {
 	go func() {
 		select {
 		case <-ctx.Done():
-			srv.GracefulStop()
+			if srv != nil {
+				srv.GracefulStop()
+			}
 		}
 	}()
 	log.Infof("serve fake server on:%v", srv.GetServiceInfo())
@@ -98,6 +99,7 @@ func (f *fakeServer) Subscribe(ctx context.Context, msg *proto.Subscription) (*p
 func (f *fakeServer) SubscribeStream(srv proto.ConsumerService_SubscribeStreamServer) error {
 	wg := new(sync.WaitGroup)
 	wg.Add(2)
+
 	go func() {
 		defer wg.Done()
 		for {
@@ -113,6 +115,7 @@ func (f *fakeServer) SubscribeStream(srv proto.ConsumerService_SubscribeStreamSe
 			log.Infof("rece sub:%s", sub.String())
 		}
 	}()
+
 	go func() {
 		defer func() {
 			wg.Done()
@@ -150,6 +153,7 @@ func (f *fakeServer) SubscribeStream(srv proto.ConsumerService_SubscribeStreamSe
 			time.Sleep(time.Second * 5)
 		}
 	}()
+
 	wg.Wait()
 	log.Infof("close SubscribeStream")
 	return nil
@@ -167,7 +171,7 @@ func (f *fakeServer) Unsubscribe(ctx context.Context, msg *proto.Subscription) (
 func (f *fakeServer) Heartbeat(ctx context.Context, msg *proto.Heartbeat) (*proto.Response, error) {
 	log.Infof("fake-server, receive heartbeat request:%v", msg.String())
 	return &proto.Response{
-		RespCode: "OK",
+		RespCode: "0",
 		RespMsg:  "OK",
 		RespTime: time.Now().Format("2006-01-02 15:04:05"),
 	}, nil
diff --git a/eventmesh-sdk-go/grpc/grpc_suite_test.go b/eventmesh-sdk-go/grpc/grpc_suite_test.go
new file mode 100644
index 000000000..4689619b4
--- /dev/null
+++ b/eventmesh-sdk-go/grpc/grpc_suite_test.go
@@ -0,0 +1,44 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package grpc
+
+import (
+	"context"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
+	"testing"
+)
+
+func TestGRPC(t *testing.T) {
+	RegisterFailHandler(Fail)
+	RunSpecs(t, "grpc module Tests")
+}
+
+var gcancel context.CancelFunc
+
+var _ = BeforeSuite(func() {
+	ctx, cancel := context.WithCancel(context.Background())
+	//defer cancel()
+	gcancel = cancel
+	go runFakeServer(ctx)
+	go runWebhookServer(ctx)
+})
+
+var _ = AfterSuite(func() {
+	gcancel()
+})
diff --git a/eventmesh-sdk-go/grpc/heartbeat_test.go b/eventmesh-sdk-go/grpc/heartbeat_test.go
index 45f6ed161..8a6de3371 100644
--- a/eventmesh-sdk-go/grpc/heartbeat_test.go
+++ b/eventmesh-sdk-go/grpc/heartbeat_test.go
@@ -16,61 +16,49 @@
 package grpc
 
 import (
-	"context"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/conf"
 	"github.com/apache/incubator-eventmesh/eventmesh-sdk-go/grpc/proto"
-	"github.com/stretchr/testify/assert"
-	"testing"
+	. "github.com/onsi/ginkgo"
+	. "github.com/onsi/gomega"
 	"time"
 )
 
-func Test_eventMeshHeartbeat_sendMsg(t *testing.T) {
-	// run fake server
-	ctx, cancel := context.WithCancel(context.Background())
-	defer cancel()
-	go runFakeServer(ctx)
-	cli, err := New(&conf.GRPCConfig{
-		Host: "127.0.0.1",
-		Port: 8086,
-		ProducerConfig: conf.ProducerConfig{
-			ProducerGroup:    "test-publish-group",
-			LoadBalancerType: conf.Random,
-		},
-		ConsumerConfig: conf.ConsumerConfig{
-			Enabled:       true,
-			ConsumerGroup: "fake-consumer",
-			PoolSize:      5,
-		},
-		HeartbeatConfig: conf.HeartbeatConfig{
-			Period:  time.Second * 5,
-			Timeout: time.Second * 3,
-		},
-	})
-	topic := "fake-topic"
-	assert.NoError(t, cli.SubscribeStream(conf.SubscribeItem{
-		SubscribeType: 1,
-		SubscribeMode: 1,
-		Topic:         topic,
-	}, func(message *proto.SimpleMessage) interface{} {
-		t.Logf("receive sub msg:%v", message.String())
-		return nil
-	}))
-	rcli := cli.(*eventMeshGRPCClient)
-	beat := rcli.heartbeat
-	assert.NoError(t, err, "create grpc client")
-	defer assert.NoError(t, cli.Close())
-	tests := []struct {
-		name string
-		want error
-	}{
-		{
-			want: nil,
-		},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			err := beat.sendMsg(beat.client)
-			assert.NoError(t, err)
+var _ = Describe("dispatcher test", func() {
+
+	Context("messageDispatcher_addHandler test ", func() {
+		It("fake-topic", func() {
+			cli, err := New(&conf.GRPCConfig{
+				Host: "127.0.0.1",
+				Port: 8086,
+				ProducerConfig: conf.ProducerConfig{
+					ProducerGroup:    "test-publish-group",
+					LoadBalancerType: conf.Random,
+				},
+				ConsumerConfig: conf.ConsumerConfig{
+					Enabled:       true,
+					ConsumerGroup: "fake-consumer",
+					PoolSize:      5,
+				},
+				HeartbeatConfig: conf.HeartbeatConfig{
+					Period:  time.Second * 50,
+					Timeout: time.Second * 30,
+				},
+			})
+			Ω(err).NotTo(HaveOccurred())
+
+			topic := "fake-topic"
+			Ω(cli.SubscribeStream(conf.SubscribeItem{
+				SubscribeType: 1,
+				SubscribeMode: 1,
+				Topic:         topic,
+			}, func(message *proto.SimpleMessage) interface{} {
+				return nil
+			})).NotTo(HaveOccurred())
+
+			rcli := cli.(*eventMeshGRPCClient)
+			beat := rcli.heartbeat
+			Ω(beat.sendMsg(beat.client)).NotTo(HaveOccurred())
+			Ω(cli.Close()).NotTo(HaveOccurred())
 		})
-	}
-}
+	})
+})


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org