You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by wa...@apache.org on 2022/09/19 14:56:21 UTC

[incubator-eventmesh] branch eventmesh-server-go updated: add consumer implement

This is an automated email from the ASF dual-hosted git repository.

walleliu pushed a commit to branch eventmesh-server-go
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/eventmesh-server-go by this push:
     new 194ba8e6 add consumer implement
     new 5283d837 Merge pull request #1336 from walleliu1016/eventmesh-server-go-runtime
194ba8e6 is described below

commit 194ba8e6e57bd259c6200a66ec82cd57b3bdf880
Author: walleliu1016 <li...@163.com>
AuthorDate: Mon Sep 19 22:55:10 2022 +0800

    add consumer implement
---
 eventmesh-server-go/config/grpc.go                 |  27 +++
 eventmesh-server-go/go.mod                         |   3 +-
 eventmesh-server-go/go.sum                         |   2 +
 .../{config/grpc.go => pkg/util/id.go}             |  30 ++-
 eventmesh-server-go/pkg/util/id_test.go            |  94 +++++++++
 .../{config/grpc.go => pkg/util/pid.go}            |  23 +-
 .../pkg/util/{until.go => pid_test.go}             |  50 ++---
 .../{config/grpc.go => pkg/version/version.go}     |  17 +-
 .../core/protocol/grpc/config/consumer_group.go    |  80 +++++++
 .../core/protocol/grpc/config/producer_group.go}   |  13 +-
 .../core/protocol/grpc/consumer/consumer.go        | 233 +++++++++++++++++++++
 .../core/protocol/grpc/consumer/group_client.go}   |  32 ++-
 .../core/protocol/grpc/consumer/group_option.go    | 125 +++++++++++
 .../runtime/core/protocol/grpc/consumer/manager.go | 202 ++++++++++++++++++
 .../core/protocol/grpc/push/context.go}            |  27 ++-
 .../runtime/core/protocol/grpc/push/handler.go     |  88 ++++++++
 .../core/protocol/grpc/retry/context.go}           |  24 ++-
 .../runtime/core/wrapper/producer.go               |  90 ++++++++
 .../runtime/core/wrapper/producer_test.go          |  79 +++++++
 19 files changed, 1126 insertions(+), 113 deletions(-)

diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/config/grpc.go
index 39abcef8..adbeb33f 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/config/grpc.go
@@ -15,6 +15,8 @@
 
 package config
 
+import "time"
+
 // GRPCOption configuratin for grpc server
 type GRPCOption struct {
 	Port string `yaml:"port" toml:"port"`
@@ -26,4 +28,29 @@ type GRPCOption struct {
 	// will start on given port, and you can check
 	// on http://ip:port/pprof/debug
 	*PProfOption `yaml:"pprof" toml:"pprof"`
+
+	// SubscribePoolSize pool in handle subscribe msg
+	// default to 10
+	SubscribePoolSize int `yaml:"subscribe-pool-size" toml:"subscribe-pool-size"`
+	// RetryPoolSize pool in handle retry msg
+	// default to 10
+	RetryPoolSize int `yaml:"retry-pool-size" toml:"retry-pool-size"`
+	// PushMessagePoolSize pool to push message
+	// default to 10
+	PushMessagePoolSize int `yaml:"push-message-pool-size" toml:"push-message-pool-size"`
+
+	//MsgReqNumPerSecond
+	MsgReqNumPerSecond float64 `yaml:"msg-req-num-per-second" toml:"msg-req-num-per-second"`
+
+	// RegistryName name for registry plugin support nacos or etcd
+	RegistryName string `yaml:"registry-name" toml:"registry-name"`
+
+	// Cluster cluster for grpc server
+	Cluster string `yaml:"cluster" toml:"cluster"`
+
+	// IDC idc for grpc server
+	IDC string `yaml:"idc" toml:"idc"`
+
+	// SessionExpiredInMills internal to clean the not work session consumer
+	SessionExpiredInMills time.Duration `yaml:"session-expired-in-mills"`
 }
diff --git a/eventmesh-server-go/go.mod b/eventmesh-server-go/go.mod
index 5a40104f..e3e34066 100644
--- a/eventmesh-server-go/go.mod
+++ b/eventmesh-server-go/go.mod
@@ -22,6 +22,7 @@ require (
 	github.com/gogf/gf v1.16.9
 	github.com/hashicorp/go-multierror v1.1.1
 	github.com/lestrrat-go/strftime v1.0.6
+	github.com/liyue201/gostl v1.0.1
 	github.com/nacos-group/nacos-sdk-go/v2 v2.0.3
 	github.com/pkg/errors v0.9.1
 	github.com/spf13/cobra v1.5.0
@@ -87,7 +88,7 @@ require (
 	github.com/deckarep/golang-set/v2 v2.1.0
 	github.com/gin-contrib/pprof v1.4.0
 	github.com/gin-gonic/gin v1.8.1
-	github.com/golang/mock v1.6.0
+	github.com/golang/mock v1.6.0 // indirect
 	github.com/google/uuid v1.1.2
 	github.com/stretchr/testify v1.8.0
 	github.com/unrolled/secure v1.12.0
diff --git a/eventmesh-server-go/go.sum b/eventmesh-server-go/go.sum
index 9faec8ee..7eda5f12 100644
--- a/eventmesh-server-go/go.sum
+++ b/eventmesh-server-go/go.sum
@@ -154,6 +154,8 @@ github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc h1:RKf14vYWi2t
 github.com/lestrrat-go/envload v0.0.0-20180220234015-a3eb8ddeffcc/go.mod h1:kopuH9ugFRkIXf3YoqHKyrJ9YfUFsckUU9S7B+XP+is=
 github.com/lestrrat-go/strftime v1.0.6 h1:CFGsDEt1pOpFNU+TJB0nhz9jl+K0hZSLE205AhTIGQQ=
 github.com/lestrrat-go/strftime v1.0.6/go.mod h1:f7jQKgV5nnJpYgdEasS+/y7EsTb8ykN2z68n3TtcTaw=
+github.com/liyue201/gostl v1.0.1 h1:VQdvogZ90WpCb5WdG9UxS6r5ulnYEp8VfEMEZXVtpIs=
+github.com/liyue201/gostl v1.0.1/go.mod h1:rgK+T1a0sQ1+CsAonfuD1J8C4iuGfOU9VAt9mmR/m10=
 github.com/mattn/go-colorable v0.1.8 h1:c1ghPdyEDarC70ftn0y+A/Ee++9zz8ljHG1b13eJ0s8=
 github.com/mattn/go-colorable v0.1.8/go.mod h1:u6P/XSegPjTcexA+o6vUJrdnUu04hMope9wVRipJSqc=
 github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU=
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/pkg/util/id.go
similarity index 57%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/pkg/util/id.go
index 39abcef8..5a3833bf 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/pkg/util/id.go
@@ -13,17 +13,27 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package config
+package util
 
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
-	Port string `yaml:"port" toml:"port"`
+import (
+	"fmt"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/version"
+	"strings"
+)
 
-	// TLSOption process with the tls configuration
-	*TLSOption `yaml:"tls" toml:"tls"`
+func BuildMeshTcpClientID(sys, purpose, cluster string) string {
+	return fmt.Sprintf("%v-%v-%v-%v-%v",
+		strings.TrimSpace(sys),
+		strings.TrimSpace(purpose),
+		strings.TrimSpace(cluster),
+		strings.TrimSpace(version.Current),
+		strings.TrimSpace(PID()))
+}
 
-	// PProfOption if pprof is enabled, server
-	// will start on given port, and you can check
-	// on http://ip:port/pprof/debug
-	*PProfOption `yaml:"pprof" toml:"pprof"`
+func BuildMeshClientID(group, cluster string) string {
+	return fmt.Sprintf("%v-(%v)-%v-%v",
+		strings.TrimSpace(group),
+		strings.TrimSpace(cluster),
+		strings.TrimSpace(version.Current),
+		strings.TrimSpace(PID()))
 }
diff --git a/eventmesh-server-go/pkg/util/id_test.go b/eventmesh-server-go/pkg/util/id_test.go
new file mode 100644
index 00000000..61d15a6a
--- /dev/null
+++ b/eventmesh-server-go/pkg/util/id_test.go
@@ -0,0 +1,94 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package util
+
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/version"
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestBuildMeshClientID(t *testing.T) {
+	type args struct {
+		group   string
+		cluster string
+	}
+	tests := []struct {
+		name string
+		args args
+		want string
+	}{
+		{
+			name: "build client id",
+			args: args{
+				group:   "test-group",
+				cluster: "idc",
+			},
+			want: "test-group-(idc)-" + version.Current + "-" + PID(),
+		},
+		{
+			name: "build client id with upper",
+			args: args{
+				group:   "Test-group",
+				cluster: "IDC",
+			},
+			want: "Test-group-(IDC)-" + version.Current + "-" + PID(),
+		},
+		{
+			name: "build client id wit space",
+			args: args{
+				group:   " Test-group   ",
+				cluster: "idc ",
+			},
+			want: "Test-group-(idc)-" + version.Current + "-" + PID(),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			if got := BuildMeshClientID(tt.args.group, tt.args.cluster); got != tt.want {
+				t.Errorf("BuildMeshClientID() = %v, want %v", got, tt.want)
+			}
+		})
+	}
+}
+
+func TestBuildMeshTcpClientID(t *testing.T) {
+	type args struct {
+		sys     string
+		purpose string
+		cluster string
+	}
+	tests := []struct {
+		name string
+		args args
+		want string
+	}{
+		{
+			name: "uniq  mesh tcp client id",
+			args: args{
+				sys:     "1234",
+				purpose: "test",
+				cluster: "c1",
+			},
+			want: "1234-test-c1-" + version.Current + "-" + PID(),
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			assert.Equalf(t, tt.want, BuildMeshTcpClientID(tt.args.sys, tt.args.purpose, tt.args.cluster), "BuildMeshTcpClientID(%v, %v, %v)", tt.args.sys, tt.args.purpose, tt.args.cluster)
+		})
+	}
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/pkg/util/pid.go
similarity index 68%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/pkg/util/pid.go
index 39abcef8..97208d9f 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/pkg/util/pid.go
@@ -13,17 +13,20 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package config
+package util
 
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
-	Port string `yaml:"port" toml:"port"`
+import (
+	"fmt"
+	"os"
+)
 
-	// TLSOption process with the tls configuration
-	*TLSOption `yaml:"tls" toml:"tls"`
+var _pid string
 
-	// PProfOption if pprof is enabled, server
-	// will start on given port, and you can check
-	// on http://ip:port/pprof/debug
-	*PProfOption `yaml:"pprof" toml:"pprof"`
+func init() {
+	_pid = fmt.Sprintf("%v", os.Getpid())
+}
+
+// PID return the current process id
+func PID() string {
+	return _pid
 }
diff --git a/eventmesh-server-go/pkg/util/until.go b/eventmesh-server-go/pkg/util/pid_test.go
similarity index 52%
rename from eventmesh-server-go/pkg/util/until.go
rename to eventmesh-server-go/pkg/util/pid_test.go
index 7669be21..42483d96 100644
--- a/eventmesh-server-go/pkg/util/until.go
+++ b/eventmesh-server-go/pkg/util/pid_test.go
@@ -16,44 +16,22 @@
 package util
 
 import (
-	"context"
-	"time"
+	"github.com/stretchr/testify/assert"
+	"testing"
 )
 
-// Condition the condition to exit the loop
-type Condition func() bool
-
-// defaultInternal internal to check the condition
-var defaultInternal = time.Second
-
-// Until wait for the condition is true
-type Until struct {
-	Cond     Condition
-	internal time.Duration
-}
-
-// NewUntil create a new until instance
-func NewUntil(cond Condition, internal time.Duration) *Until {
-	if internal == time.Duration(0) {
-		internal = defaultInternal
+func TestPID(t *testing.T) {
+	tests := []struct {
+		name string
+	}{
+		{
+			name: "pid",
+		},
 	}
-	return &Until{
-		Cond:     cond,
-		internal: internal,
-	}
-}
-
-// Wait loop until the condition is true
-func (u *Until) Wait(ctx context.Context) {
-	tick := time.NewTicker(u.internal)
-	for {
-		select {
-		case <-ctx.Done():
-			return
-		case <-tick.C:
-			if u.Cond() {
-				return
-			}
-		}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			got := PID()
+			assert.NotEmpty(t, got)
+		})
 	}
 }
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/pkg/version/version.go
similarity index 67%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/pkg/version/version.go
index 39abcef8..2a7828fd 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/pkg/version/version.go
@@ -13,17 +13,8 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package config
+package version
 
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
-	Port string `yaml:"port" toml:"port"`
-
-	// TLSOption process with the tls configuration
-	*TLSOption `yaml:"tls" toml:"tls"`
-
-	// PProfOption if pprof is enabled, server
-	// will start on given port, and you can check
-	// on http://ip:port/pprof/debug
-	*PProfOption `yaml:"pprof" toml:"pprof"`
-}
+var (
+	Current = "v0.0.1"
+)
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/config/consumer_group.go b/eventmesh-server-go/runtime/core/protocol/grpc/config/consumer_group.go
new file mode 100644
index 00000000..10218232
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/config/consumer_group.go
@@ -0,0 +1,80 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package config
+
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	"github.com/liyue201/gostl/ds/set"
+	"sync"
+)
+
+type ConsumerGroupConfig struct {
+	ConsumerGroup string
+
+	// key is topic, value is  ConsumerGroupTopicConfig
+	ConsumerGroupTopicConfigs sync.Map[ConsumerGroupTopicConfig]
+}
+
+type GRPCType string
+
+const (
+	WEBHOOK GRPCType = "WEBHOOK"
+	STREAM  GRPCType = "STREAM"
+)
+
+type StateAction string
+
+const (
+	NEW    StateAction = "NEW"
+	CHANGE StateAction = "CHANGE"
+	DELETE StateAction = "DELETE"
+)
+
+type ConsumerGroupTopicConfig struct {
+	ConsumerGroup    string
+	Topic            string
+	SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+	GRPCType         GRPCType
+	// IDCWebhookURLs webhook urls seperated by IDC
+	// key is IDC, value is vector.Vector
+	IDCWebhookURLs *sync.Map
+
+	// AllURLs all webhook urls, ignore idc
+	AllURLs *set.Set
+}
+
+type ConsumerGroupMetadata struct {
+	ConsumerGroup              string
+	ConsumerGroupTopicMetadata *sync.Map
+}
+
+type ConsumerGroupTopicMetadata struct {
+	ConsumerGroup string
+	Topic         string
+	AllURLs       *set.Set
+}
+
+type ConsumerGroupStateEvent struct {
+	ConsumerGroup            string
+	ConsumerGroupConfig      *ConsumerGroupConfig
+	ConsumerGroupStateAction StateAction
+}
+
+type ConsumerGroupTopicConfChangeEvent struct {
+	Action                   StateAction
+	ConsumerGroup            string
+	ConsumerGroupTopicConfig *ConsumerGroupTopicConfig
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/runtime/core/protocol/grpc/config/producer_group.go
similarity index 69%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/runtime/core/protocol/grpc/config/producer_group.go
index 39abcef8..2b72c99e 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/config/producer_group.go
@@ -15,15 +15,6 @@
 
 package config
 
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
-	Port string `yaml:"port" toml:"port"`
-
-	// TLSOption process with the tls configuration
-	*TLSOption `yaml:"tls" toml:"tls"`
-
-	// PProfOption if pprof is enabled, server
-	// will start on given port, and you can check
-	// on http://ip:port/pprof/debug
-	*PProfOption `yaml:"pprof" toml:"pprof"`
+type ProducerGroupConfig struct {
+	GroupName string `json:"groupName"`
 }
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer.go
new file mode 100644
index 00000000..67e263c5
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer.go
@@ -0,0 +1,233 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumer
+
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/push"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/wrapper"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	cloudv2 "github.com/cloudevents/sdk-go/v2"
+	"github.com/pkg/errors"
+	"sync"
+	"time"
+)
+
+var (
+	ErrNoConnectorPlugin    = errors.New("no connector plugin found")
+	ErrNewConsumerConnector = errors.New("create consumer connector err")
+	ErrNewProducerConnector = errors.New("create producer connector err")
+)
+
+type EventMeshConsumer struct {
+	ConsumerGroup      string
+	persistentConsumer *wrapper.Consumer
+	broadcastConsumer  *wrapper.Consumer
+	messageHandler     *push.MessageHandler
+	ServiceState       consts.ServiceState
+	// consumerGroupTopicConfig key is topic
+	// value is ConsumerGroupTopicOption
+	consumerGroupTopicConfig *sync.Map
+}
+
+func NewEventMeshConsumer(consumerGroup string) (*EventMeshConsumer, error) {
+	pushHandler, err := push.NewMessageHandler(consumerGroup)
+	if err != nil {
+		return nil, err
+	}
+	cons, err := wrapper.NewConsumer()
+	if err != nil {
+		return nil, ErrNewConsumerConnector
+	}
+	bcros, err := wrapper.NewConsumer()
+	if err != nil {
+		return nil, ErrNewConsumerConnector
+	}
+	return &EventMeshConsumer{
+		ConsumerGroup:            consumerGroup,
+		messageHandler:           pushHandler,
+		persistentConsumer:       cons,
+		broadcastConsumer:        bcros,
+		consumerGroupTopicConfig: new(sync.Map),
+	}, nil
+}
+
+func (e *EventMeshConsumer) Init() error {
+	// no topics, don't init the consumer
+	if e.ConsumerGroupSize() == 0 {
+		return nil
+	}
+
+	persistProps := make(map[string]string)
+	persistProps["isBroadcast"] = "false"
+	persistProps["consumerGroup"] = e.ConsumerGroup
+	persistProps["eventMeshIDC"] = config.GlobalConfig().Server.GRPCOption.IDC
+	persistProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
+		config.GlobalConfig().Server.GRPCOption.Cluster)
+	if err := e.persistentConsumer.Init(persistProps); err != nil {
+		return err
+	}
+	clusterEventListener := e.createEventListener(pb.Subscription_SubscriptionItem_CLUSTERING)
+	e.persistentConsumer.RegisterListener(clusterEventListener)
+
+	broadcastProps := make(map[string]string)
+	broadcastProps["isBroadcast"] = "false"
+	broadcastProps["consumerGroup"] = e.ConsumerGroup
+	broadcastProps["eventMeshIDC"] = config.GlobalConfig().Server.GRPCOption.IDC
+	broadcastProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
+		config.GlobalConfig().Server.GRPCOption.Cluster)
+	if err := e.broadcastConsumer.Init(broadcastProps); err != nil {
+		return err
+	}
+	broadcastEventListener := e.createEventListener(pb.Subscription_SubscriptionItem_BROADCASTING)
+	e.broadcastConsumer.RegisterListener(broadcastEventListener)
+	e.ServiceState = consts.INITED
+
+	log.Infof("init the eventmesh consumer success, group:%v", e.ConsumerGroup)
+	return nil
+}
+
+func (e *EventMeshConsumer) Start() error {
+	// no topics, don't start the consumer
+	if e.ConsumerGroupSize() == 0 {
+		return nil
+	}
+
+	e.consumerGroupTopicConfig.Range(func(key, value any) bool {
+		topic := key.(string)
+		opt := value.(*ConsumerGroupTopicOption).SubscriptionMode
+		switch opt {
+		case pb.Subscription_SubscriptionItem_CLUSTERING:
+			e.persistentConsumer.Subscribe(topic)
+		case pb.Subscription_SubscriptionItem_BROADCASTING:
+			e.broadcastConsumer.Subscribe(topic)
+		default:
+			log.Warnf("un support sub mode:%v", opt)
+		}
+		return true
+	})
+
+	if err := e.broadcastConsumer.Start(); err != nil {
+		return err
+	}
+	if err := e.persistentConsumer.Start(); err != nil {
+		return err
+	}
+	return nil
+}
+
+// RegisterClient Register client's topic information
+// return true if this EventMeshConsumer required restart because of the topic changes
+func (e *EventMeshConsumer) RegisterClient(cli *GroupClient) bool {
+	var (
+		consumerTopicOption *ConsumerGroupTopicOption
+		restart             = false
+	)
+	val, ok := e.consumerGroupTopicConfig.Load(cli.Topic)
+	if !ok {
+		consumerTopicOption = NewConsumerGroupTopicOption(cli.ConsumerGroup, cli.Topic, cli.SubscriptionMode, cli.GRPCType)
+		e.consumerGroupTopicConfig.Store(cli.Topic, consumerTopicOption)
+		restart = true
+	} else {
+		consumerTopicOption = val.(*ConsumerGroupTopicOption)
+	}
+	consumerTopicOption.RegisterClient(cli)
+	return restart
+}
+
+// DeRegisterClient deregister client's topic information and return true if this EventMeshConsumer
+// required restart because of the topic changes
+// return true if the underlining EventMeshConsumer needs to restart later; false otherwise
+func (e *EventMeshConsumer) DeRegisterClient(cli *GroupClient) bool {
+	var (
+		consumerTopicOption *ConsumerGroupTopicOption
+	)
+	val, ok := e.consumerGroupTopicConfig.Load(cli.Topic)
+	if !ok {
+		return false
+	}
+	consumerTopicOption = val.(*ConsumerGroupTopicOption)
+	consumerTopicOption.DeregisterClient(cli)
+	e.consumerGroupTopicConfig.Delete(cli.Topic)
+	return true
+}
+
+func (e *EventMeshConsumer) Shutdown() error {
+	if err := e.persistentConsumer.Shutdown(); err != nil {
+		return err
+	}
+	if err := e.broadcastConsumer.Shutdown(); err != nil {
+		return err
+	}
+	return nil
+}
+
+func (e *EventMeshConsumer) ConsumerGroupSize() int {
+	count := 0
+	e.consumerGroupTopicConfig.Range(func(key, value any) bool {
+		count++
+		return true
+	})
+	return count
+}
+
+func (e *EventMeshConsumer) createEventListener(mode pb.Subscription_SubscriptionItem_SubscriptionMode) *connector.EventListener {
+	return &connector.EventListener{
+		Consume: func(event *cloudv2.Event, commitFunc connector.CommitFunc) error {
+			var commitAction connector.EventMeshAction
+			defer commitFunc(commitAction)
+
+			eventclone := event.Clone()
+			eventclone.SetExtension(consts.REQ_MQ2EVENTMESH_TIMESTAMP, time.Now().UnixMilli())
+			topic := event.Subject()
+			bizSeqNo := eventclone.Extensions()[consts.PROPERTY_MESSAGE_SEARCH_KEYS]
+			uniqueID := eventclone.Extensions()[consts.RMB_UNIQ_ID]
+			log.Infof("mq to eventmesh, topic:%v, bizSeqNo:%v, uniqueID:%v", topic, bizSeqNo, uniqueID)
+
+			val, ok := e.consumerGroupTopicConfig.Load(topic)
+			if !ok {
+				log.Debugf("no active consumer for topic:%v", topic)
+				commitAction = connector.CommitMessage
+				return nil
+			}
+
+			topicConfig := val.(*ConsumerGroupTopicOption)
+			tpy := topicConfig.GRPCType
+			mctx := &push.MessageContext{
+				GrpcType:         tpy,
+				ConsumerGroup:    e.ConsumerGroup,
+				SubscriptionMode: mode,
+				Event:            &eventclone,
+			}
+			if err := e.messageHandler.Handler(mctx); err != nil {
+				log.Warnf("handle msg err:%v, topic:%v, group:%v", err, topic, topicConfig.ConsumerGroup)
+				// can not handle the message due to the capacity limit is reached
+				// wait for 5 seconds and send this message back to mq and consume again
+				time.Sleep(time.Second * 5)
+				//e.sendMessageBack()
+				commitAction = connector.CommitMessage
+				return nil
+			}
+
+			commitAction = connector.ManualAck
+			return nil
+		},
+	}
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_client.go
similarity index 54%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_client.go
index 39abcef8..3ed84c18 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_client.go
@@ -13,17 +13,27 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package config
+package consumer
 
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
-	Port string `yaml:"port" toml:"port"`
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	"time"
+)
 
-	// TLSOption process with the tls configuration
-	*TLSOption `yaml:"tls" toml:"tls"`
-
-	// PProfOption if pprof is enabled, server
-	// will start on given port, and you can check
-	// on http://ip:port/pprof/debug
-	*PProfOption `yaml:"pprof" toml:"pprof"`
+// GroupClient consumer group client details
+type GroupClient struct {
+	ENV              string
+	IDC              string
+	ConsumerGroup    string
+	Topic            string
+	GRPCType         config.GRPCType
+	URL              string
+	SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+	SYS              string
+	IP               string
+	PID              string
+	Hostname         string
+	APIVersion       string
+	LastUPtime       time.Time
 }
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_option.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_option.go
new file mode 100644
index 00000000..0cea6116
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/group_option.go
@@ -0,0 +1,125 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumer
+
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	"github.com/liyue201/gostl/ds/set"
+	"sync"
+)
+
+type RegisterClient func(*GroupClient)
+type DeregisterClient func(*GroupClient)
+
+// ConsumerGroupTopicOption refers to ConsumerGroupTopicConfig
+type ConsumerGroupTopicOption struct {
+	ConsumerGroup    string
+	Topic            string
+	SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+	GRPCType         config.GRPCType
+	RegisterClient   RegisterClient
+	DeregisterClient DeregisterClient
+}
+
+func NewConsumerGroupTopicOption(cg string,
+	topic string,
+	mode pb.Subscription_SubscriptionItem_SubscriptionMode,
+	gtype config.GRPCType) *ConsumerGroupTopicOption {
+	return &ConsumerGroupTopicOption{
+		ConsumerGroup:    cg,
+		Topic:            topic,
+		SubscriptionMode: mode,
+		GRPCType:         gtype,
+	}
+}
+
+// WebhookGroupTopicOption topic option for subscribe with webhook
+type WebhookGroupTopicOption struct {
+	*ConsumerGroupTopicOption
+
+	// IDCWebhookURLs webhook urls seperated by IDC
+	// key is IDC, value is vector.Vector
+	IDCWebhookURLs *sync.Map
+
+	// AllURLs all webhook urls, ignore idc
+	AllURLs *set.Set
+}
+
+func NewWebhookGroupTopicOption(cg string,
+	topic string,
+	mode pb.Subscription_SubscriptionItem_SubscriptionMode,
+	gtype config.GRPCType) *WebhookGroupTopicOption {
+	opt := &WebhookGroupTopicOption{
+		ConsumerGroupTopicOption: NewConsumerGroupTopicOption(cg, topic, mode, config.WEBHOOK),
+		IDCWebhookURLs:           new(sync.Map),
+		AllURLs:                  set.New(set.WithGoroutineSafe()),
+	}
+	opt.ConsumerGroupTopicOption.RegisterClient = func(cli *GroupClient) {
+		if cli.GRPCType != config.WEBHOOK {
+			log.Warnf("invalid grpc type:%v, with provide WEBHOOK", cli.GRPCType)
+			return
+		}
+		iwu, ok := opt.IDCWebhookURLs.Load(cli.IDC)
+		if !ok {
+			newIDCURLs := set.New(set.WithGoroutineSafe())
+			newIDCURLs.Insert(cli.URL)
+			opt.IDCWebhookURLs.Store(cli.IDC, newIDCURLs)
+		} else {
+			val := iwu.(*set.Set)
+			val.Insert(cli.URL)
+			opt.IDCWebhookURLs.Store(cli.IDC, val)
+		}
+		opt.AllURLs.Insert(cli.URL)
+	}
+
+	opt.ConsumerGroupTopicOption.DeregisterClient = func(cli *GroupClient) {
+		val, ok := opt.IDCWebhookURLs.Load(cli.IDC)
+		if !ok {
+			return
+		}
+		idcURLs := val.(*set.Set)
+		idcURLs.Erase(cli.URL)
+		opt.AllURLs.Erase(cli.URL)
+	}
+	return opt
+}
+
+// StreamGroupTopicOption topic option for subscribe with stream
+type StreamGroupTopicOption struct {
+	*ConsumerGroupTopicOption
+}
+
+func NewWStreamGroupTopicOption(cg string,
+	topic string,
+	mode pb.Subscription_SubscriptionItem_SubscriptionMode,
+	gtype config.GRPCType) *StreamGroupTopicOption {
+	opt := &StreamGroupTopicOption{
+		ConsumerGroupTopicOption: NewConsumerGroupTopicOption(cg, topic, mode, config.STREAM),
+	}
+	opt.RegisterClient = func(cli *GroupClient) {
+		if cli.GRPCType != config.STREAM {
+			log.Warnf("invalid grpc type:%v, with provide STREAM", cli.GRPCType)
+			return
+		}
+
+	}
+	opt.DeregisterClient = func(cli *GroupClient) {
+
+	}
+	return opt
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/manager.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/manager.go
new file mode 100644
index 00000000..133a7629
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/manager.go
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumer
+
+import (
+	config2 "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/consts"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+	"github.com/liyue201/gostl/ds/set"
+	"github.com/pkg/errors"
+	"sync"
+	"time"
+)
+
+var (
+	ErrNoConsumerClient = errors.New("no consumer group client")
+)
+
+type Manager struct {
+	// consumerClients store all consumer clients
+	// key is consumer group, value is set *GroupClient
+	consumerGroupClients *sync.Map
+
+	// consumers eventmesh consumer instances
+	// key is consumer group, value is EventMeshConsumer
+	consumers *sync.Map
+}
+
+// NewManager create new consumer manager
+func NewManager() (*Manager, error) {
+	return &Manager{
+		consumers:            new(sync.Map),
+		consumerGroupClients: new(sync.Map),
+	}, nil
+}
+
+func (c *Manager) GetConsumer(consumerGroup string) (*EventMeshConsumer, error) {
+	val, ok := c.consumers.Load(consumerGroup)
+	if ok {
+		return val.(*EventMeshConsumer), nil
+	}
+	cu, err := NewEventMeshConsumer(consumerGroup)
+	if err != nil {
+		return nil, err
+	}
+	c.consumers.Store(consumerGroup, cu)
+	return cu, nil
+}
+
+func (c *Manager) RegisterClient(cli *GroupClient) error {
+	val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
+	if !ok {
+		cliset := set.New(set.WithGoroutineSafe())
+		cliset.Insert(cli)
+		c.consumerGroupClients.Store(cli.ConsumerGroup, cliset)
+		return nil
+	}
+	localClients := val.(*set.Set)
+	found := false
+	for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
+		lc := iter.Value().(*GroupClient)
+		if lc.GRPCType == config.WEBHOOK {
+			lc.URL = cli.URL
+			lc.LastUPtime = cli.LastUPtime
+			found = true
+			break
+		}
+		if lc.GRPCType == config.STREAM {
+			// TODO add event emitter
+			lc.LastUPtime = cli.LastUPtime
+			found = true
+			break
+		}
+	}
+	if !found {
+		localClients.Insert(cli)
+	}
+	return nil
+}
+
+func (c *Manager) DeRegisterClient(cli *GroupClient) error {
+	val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
+	if !ok {
+		log.Debugf("no consumer group client found, name:%v", cli.ConsumerGroup)
+		return nil
+	}
+	localClients := val.(*set.Set)
+	for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
+		lc := iter.Value().(*GroupClient)
+		if lc.Topic == cli.Topic {
+			if lc.GRPCType == config.STREAM {
+				// TODO
+				// close the GRPC client stream before removing it
+			}
+			localClients.Erase(lc)
+		}
+	}
+	if localClients.Size() == 0 {
+		c.consumerGroupClients.Delete(cli.ConsumerGroup)
+	}
+	return nil
+}
+
+func (c *Manager) restartConsumer(consumerGroup string) error {
+	val, ok := c.consumers.Load(consumerGroup)
+	if !ok {
+		return nil
+	}
+	emconsumer := val.(*EventMeshConsumer)
+	if emconsumer.ServiceState == consts.RUNNING {
+		if err := emconsumer.Shutdown(); err != nil {
+			return err
+		}
+	}
+	if err := emconsumer.Init(); err != nil {
+		return err
+	}
+	if err := emconsumer.Start(); err != nil {
+		return err
+	}
+	if emconsumer.ServiceState != consts.RUNNING {
+		log.Warnf("restart eventmesh consumer failed, status:%v", emconsumer.ServiceState)
+		c.consumers.Delete(consumerGroup)
+	}
+	return nil
+}
+
+func (c *Manager) UpdateClientTime(cli *GroupClient) {
+	val, ok := c.consumerGroupClients.Load(cli.ConsumerGroup)
+	if !ok {
+		log.Debugf("no consumer group client found, name:%v", cli.ConsumerGroup)
+		return
+	}
+	localClients := val.(*set.Set)
+	for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
+		iter.Value().(*GroupClient).LastUPtime = time.Now()
+	}
+}
+
+func (c *Manager) clientCheck() {
+	sessionExpiredInMills := config2.GlobalConfig().Server.GRPCOption.SessionExpiredInMills
+	tk := time.NewTicker(sessionExpiredInMills)
+	go func() {
+		for range tk.C {
+			var consumerGroupRestart []string
+			c.consumerGroupClients.Range(func(key, value any) bool {
+				localClients := value.(*set.Set)
+				for iter := localClients.Begin(); iter.IsValid(); iter.Next() {
+					lc := iter.Value().(*GroupClient)
+					if time.Now().Sub(lc.LastUPtime) > sessionExpiredInMills {
+						log.Warnf("client:%v lastUpdate time:%v over three heartbeat cycles. Removing it",
+							lc.ConsumerGroup, lc.LastUPtime)
+						emconsumer, err := c.GetConsumer(lc.ConsumerGroup)
+						if err != nil {
+							log.Warnf("get eventmesh consumer:%v failed, err:%v", lc.ConsumerGroup, err)
+							return true
+						}
+						if err := c.DeRegisterClient(lc); err != nil {
+							log.Warnf("deregistry client:%v err:%v", lc.ConsumerGroup, err)
+							return true
+						}
+						if ok := emconsumer.DeRegisterClient(lc); !ok {
+							log.Warnf("failed deregistry client:%v in eventmesh consumer", lc.ConsumerGroup)
+							return true
+						}
+						consumerGroupRestart = append(consumerGroupRestart, lc.ConsumerGroup)
+					}
+				}
+				for _, rs := range consumerGroupRestart {
+					if err := c.restartConsumer(rs); err != nil {
+						log.Warnf("deregistry consumer:%v  err:%v", rs, err)
+						return true
+					}
+				}
+				return true
+			})
+		}
+	}()
+}
+
+func (c *Manager) Start() error {
+	log.Infof("start consumer manager")
+	return nil
+}
+
+func (c *Manager) Stop() error {
+	return nil
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/runtime/core/protocol/grpc/push/context.go
similarity index 54%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/runtime/core/protocol/grpc/push/context.go
index 39abcef8..49ee5654 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/push/context.go
@@ -13,17 +13,22 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package config
+package push
 
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
-	Port string `yaml:"port" toml:"port"`
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/consumer"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	cloudv2 "github.com/cloudevents/sdk-go/v2"
+)
 
-	// TLSOption process with the tls configuration
-	*TLSOption `yaml:"tls" toml:"tls"`
-
-	// PProfOption if pprof is enabled, server
-	// will start on given port, and you can check
-	// on http://ip:port/pprof/debug
-	*PProfOption `yaml:"pprof" toml:"pprof"`
+type MessageContext struct {
+	MsgRandomNo      string
+	SubscriptionMode pb.Subscription_SubscriptionItem_SubscriptionMode
+	GrpcType         config.GRPCType
+	ConsumerGroup    string
+	Event            *cloudv2.Event
+	TopicConfig      *config.ConsumerGroupTopicConfig
+	// channel for server
+	Consumer *consumer.EventMeshConsumer
 }
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/push/handler.go b/eventmesh-server-go/runtime/core/protocol/grpc/push/handler.go
new file mode 100644
index 00000000..e149a29e
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/push/handler.go
@@ -0,0 +1,88 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package push
+
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/pkg/errors"
+	"sync"
+	"time"
+)
+
+var (
+	ConsumerGroupWaitingRequestThreshold = 1000
+
+	ErrRequestReachMaxThreshold = errors.New("request reach the max threshold")
+)
+
+type MessageHandler struct {
+	//pool *ants.Pool
+	// waitingRequests waiting to request
+	// key to consumerGroup value to []*PushRequest
+	waitingRequests *sync.Map
+}
+
+func NewMessageHandler(consumerGroup string) (*MessageHandler, error) {
+	wr := new(sync.Map)
+	// TODO need goroutine safe in []*Request{}
+	wr.Store(consumerGroup, []*Request{})
+	hdl := &MessageHandler{
+		//pool:            p,
+		waitingRequests: wr,
+	}
+	go hdl.checkTimeout()
+	return hdl, nil
+}
+
+func (m *MessageHandler) checkTimeout() {
+	tk := time.NewTicker(time.Second)
+	for range tk.C {
+		m.waitingRequests.Range(func(key, value interface{}) bool {
+			reqs := value.([]*Request)
+			for _, req := range reqs {
+				if req.timeout() {
+
+				}
+			}
+			return true
+		})
+	}
+}
+
+func (m *MessageHandler) Handler(mctx *MessageContext) error {
+	if m.Size() > ConsumerGroupWaitingRequestThreshold {
+		log.Warnf("too many request, reject and send back to MQ, group:%v, threshold:%v",
+			mctx.ConsumerGroup, ConsumerGroupWaitingRequestThreshold)
+		return ErrRequestReachMaxThreshold
+	}
+
+	go func() {
+		pushRequest := &Request{}
+		if err := pushRequest.Try(); err != nil {
+
+		}
+	}()
+	return nil
+}
+
+func (m *MessageHandler) Size() int {
+	count := 0
+	m.waitingRequests.Range(func(key, value any) bool {
+		count++
+		return true
+	})
+	return count
+}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/runtime/core/protocol/grpc/retry/context.go
similarity index 68%
copy from eventmesh-server-go/config/grpc.go
copy to eventmesh-server-go/runtime/core/protocol/grpc/retry/context.go
index 39abcef8..2b90746b 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/retry/context.go
@@ -13,17 +13,21 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package config
+package retry
 
-// GRPCOption configuratin for grpc server
-type GRPCOption struct {
-	Port string `yaml:"port" toml:"port"`
+import "time"
 
-	// TLSOption process with the tls configuration
-	*TLSOption `yaml:"tls" toml:"tls"`
+type Context struct {
+	RetryTimes  int
+	ExecuteTime time.Time
+	Do          func() error
+}
+
+func (c *Context) SetDelay(delay time.Duration) *Context {
+	c.ExecuteTime = time.Now().Add(delay)
+	return c
+}
 
-	// PProfOption if pprof is enabled, server
-	// will start on given port, and you can check
-	// on http://ip:port/pprof/debug
-	*PProfOption `yaml:"pprof" toml:"pprof"`
+func (c *Context) GetDelay() time.Duration {
+	return c.ExecuteTime.Sub(time.Now())
 }
diff --git a/eventmesh-server-go/runtime/core/wrapper/producer.go b/eventmesh-server-go/runtime/core/wrapper/producer.go
new file mode 100644
index 00000000..1dfbe51d
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/wrapper/producer.go
@@ -0,0 +1,90 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wrapper
+
+import (
+	"context"
+	"fmt"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/log"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+	eventv2 "github.com/cloudevents/sdk-go/v2"
+	"time"
+)
+
+type Producer struct {
+	*Base
+	ProducerConnector connector.Producer
+}
+
+var (
+	ErrNoConnectorPlugin = fmt.Errorf("no connector plugin provided")
+	ErrNoConnectorName   = fmt.Errorf("no connector plugin name provided")
+)
+
+// NewProducer create new producer to handle the grpc request
+func NewProducer() (*Producer, error) {
+	connectorPlugin, ok := config.GlobalConfig().Plugins[config.ConnectorPluginType]
+	if !ok {
+		return nil, ErrNoConnectorPlugin
+	}
+	connectorPluginName, ok := connectorPlugin["name"]
+	if !ok {
+		return nil, ErrNoConnectorName
+	}
+	log.Infof("init producer with connector name:%s", connectorPluginName)
+	factory := plugin.Get(connector.ProducerPluginType, connectorPluginName.Value).(connector.ProducerFactory)
+	consu, err := factory.Get()
+	if err != nil {
+		return nil, err
+	}
+	return &Producer{
+		Base:              DefaultBaseWrapper(),
+		ProducerConnector: consu,
+	}, nil
+}
+
+func (c *Producer) Send(ctx context.Context, event *eventv2.Event, callback *connector.SendCallback) error {
+	return c.ProducerConnector.Publish(ctx, event, callback)
+}
+
+func (c *Producer) Request(ctx context.Context, event *eventv2.Event, callback *connector.RequestReplyCallback, timeout time.Duration) error {
+	return c.ProducerConnector.Request(ctx, event, callback, timeout)
+}
+
+func (c *Producer) Reply(ctx context.Context, event *eventv2.Event, callback *connector.SendCallback) error {
+	return c.ProducerConnector.Reply(ctx, event, callback)
+}
+
+func (c *Producer) Start() error {
+	if err := c.ProducerConnector.Start(); err != nil {
+		return err
+	}
+
+	c.Base.Started.CAS(false, true)
+	return nil
+}
+
+func (c *Producer) Shutdown() error {
+	if err := c.ProducerConnector.Shutdown(); err != nil {
+		return err
+	}
+
+	c.Base.Started.CAS(false, true)
+	c.Base.Inited.CAS(false, true)
+	return nil
+}
diff --git a/eventmesh-server-go/runtime/core/wrapper/producer_test.go b/eventmesh-server-go/runtime/core/wrapper/producer_test.go
new file mode 100644
index 00000000..75146b57
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/wrapper/producer_test.go
@@ -0,0 +1,79 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package wrapper
+
+import (
+	"context"
+	"fmt"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector"
+	eventv2 "github.com/cloudevents/sdk-go/v2"
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func TestProducer_Send(t *testing.T) {
+	type fields struct {
+		Base              *Base
+		ProducerConnector connector.Producer
+	}
+	type args struct {
+		ctx      context.Context
+		event    *eventv2.Event
+		callback *connector.SendCallback
+	}
+	factory := plugin.Get(connector.ProducerPluginType, "standalone").(connector.ProducerFactory)
+	produ, err := factory.Get()
+	assert.NoError(t, err)
+	tests := []struct {
+		name    string
+		fields  fields
+		args    args
+		wantErr assert.ErrorAssertionFunc
+	}{
+		{
+			name: "test send",
+			fields: fields{
+				Base:              DefaultBaseWrapper(),
+				ProducerConnector: produ,
+			},
+			args: args{
+				ctx:   context.TODO(),
+				event: &eventv2.Event{},
+				callback: &connector.SendCallback{
+					OnSuccess: func(result *connector.SendResult) {
+						t.Logf("success")
+					},
+					OnError: func(result *connector.ErrorResult) {
+						t.Logf("error")
+					},
+				},
+			},
+			wantErr: func(t assert.TestingT, err error, i ...interface{}) bool {
+				return err != nil
+			},
+		},
+	}
+	for _, tt := range tests {
+		t.Run(tt.name, func(t *testing.T) {
+			c := &Producer{
+				Base:              tt.fields.Base,
+				ProducerConnector: tt.fields.ProducerConnector,
+			}
+			tt.wantErr(t, c.Send(tt.args.ctx, tt.args.event, tt.args.callback), fmt.Sprintf("Send(%v, %v, %v)", tt.args.ctx, tt.args.event, tt.args.callback))
+		})
+	}
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org