You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/02/02 08:59:44 UTC

[incubator-eventmesh] branch master updated: [ISSUE #2941] Refactor registry nacos (#2960)

This is an automated email from the ASF dual-hosted git repository.

jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git


The following commit(s) were added to refs/heads/master by this push:
     new fcfa45a55 [ISSUE #2941] Refactor registry nacos (#2960)
fcfa45a55 is described below

commit fcfa45a550bf50c93abe6bdd4f6630541ccaf10d
Author: walleliu <li...@163.com>
AuthorDate: Thu Feb 2 16:59:36 2023 +0800

    [ISSUE #2941] Refactor registry nacos (#2960)
    
    * update registry api
    
    * add registry plugin
    
    * add licience for mock go files
    
    * change nacos registry default connection timeout to duration
---
 .../nacos/selector/config.go => config/common.go}  |  20 +-
 eventmesh-server-go/config/config.go               |  15 +-
 eventmesh-server-go/config/config_test.go          |  10 +-
 eventmesh-server-go/config/grpc.go                 |  12 -
 .../config/testdata/test_config.yaml               |   9 +-
 eventmesh-server-go/eventmesh.go                   |   2 +-
 .../plugin/naming/nacos/registry/registry.go       | 170 -----------
 .../plugin/naming/nacos/selector/selector.go       | 120 --------
 .../nacos/registry/config.go => registry/model.go} |  34 ++-
 .../nacos/selector => registry/nacos}/config.go    |  18 +-
 .../nacos/mocks/mock_naming_client_interface.go    | 211 ++++++++++++++
 eventmesh-server-go/plugin/registry/nacos/nacos.go | 241 ++++++++++++++++
 .../plugin/registry/nacos/nacos_test.go            | 314 +++++++++++++++++++++
 .../registry/config.go => registry/registry.go}    |  24 +-
 .../grpc/consumer/consumer_group_client.go         |   8 +-
 .../core/protocol/grpc/consumer/consumer_mesh.go   |   8 +-
 .../grpc/consumer/consumer_processor_test.go       | 186 ++++++++++++
 .../core/protocol/grpc/consumer/message_request.go |   8 +-
 .../core/protocol/grpc/producer/producer_mesh.go   |   4 +-
 .../runtime/proto/pb/mocks/consumer_service.go     | 107 +++++++
 .../runtime/proto/pb/mocks/heartbeat_service.go    |  78 +++++
 .../runtime/proto/pb/mocks/producer_service.go     | 108 +++++++
 .../config.go => runtime/registry/registry.go}     |  19 +-
 23 files changed, 1330 insertions(+), 396 deletions(-)

diff --git a/eventmesh-server-go/plugin/naming/nacos/selector/config.go b/eventmesh-server-go/config/common.go
similarity index 73%
copy from eventmesh-server-go/plugin/naming/nacos/selector/config.go
copy to eventmesh-server-go/config/common.go
index 95cbb8280..400c221de 100644
--- a/eventmesh-server-go/plugin/naming/nacos/selector/config.go
+++ b/eventmesh-server-go/config/common.go
@@ -13,18 +13,12 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package selector
+package config
 
-// Config selector config
-type Config struct {
-	ServiceName string
-	Weight      int
-	ClusterName string
-	GroupName   string
-	Metadata    map[string]string
-}
-
-// PluginConfig define selector plugin config
-type PluginConfig struct {
-	AddressList string `yaml:"address_list"`
+type Common struct {
+	Name         string `yaml:"name" toml:"name"`
+	RegistryName string `yaml:"registry-name" toml:"registry-name"`
+	Cluster      string `yaml:"cluster" toml:"cluster"`
+	Env          string `yaml:"env" toml:"env"`
+	IDC          string `yaml:"idc" toml:"idc"`
 }
diff --git a/eventmesh-server-go/config/config.go b/eventmesh-server-go/config/config.go
index 626e7674b..77d11f0a6 100644
--- a/eventmesh-server-go/config/config.go
+++ b/eventmesh-server-go/config/config.go
@@ -32,7 +32,7 @@ const (
 )
 
 type Config struct {
-	Name   string `yaml:"name" toml:"name"`
+	Common *Common `yaml:"common" toml:"common"`
 	Server struct {
 		*HTTPOption `yaml:"http" toml:"http"`
 		*GRPCOption `yaml:"grpc" toml:"grpc"`
@@ -50,8 +50,13 @@ func init() {
 }
 
 func defaultConfig() *Config {
-	cfg := &Config{
-		Name: "eventmesh-server",
+	cfg := &Config{}
+	cfg.Common = &Common{
+		Name:         "eventmesh-server",
+		RegistryName: "eventmesh-go",
+		Cluster:      "1",
+		Env:          "{}",
+		IDC:          "idc1",
 	}
 	cfg.Server.GRPCOption = &GRPCOption{
 		Port: "10010",
@@ -67,10 +72,6 @@ func defaultConfig() *Config {
 		PushMessagePoolSize:   10,
 		ReplyPoolSize:         10,
 		MsgReqNumPerSecond:    5,
-		RegistryName:          "eventmesh-go",
-		Cluster:               "1",
-		Env:                   "{}",
-		IDC:                   "idc1",
 		SessionExpiredInMills: 5 * time.Second,
 		SendMessageTimeout:    5 * time.Second,
 	}
diff --git a/eventmesh-server-go/config/config_test.go b/eventmesh-server-go/config/config_test.go
index a7294be22..0e91a1acb 100644
--- a/eventmesh-server-go/config/config_test.go
+++ b/eventmesh-server-go/config/config_test.go
@@ -28,6 +28,12 @@ func TestConfig_Load(t *testing.T) {
 	assert := testifyassert.New(t)
 
 	config := &Config{}
+	config.Common = &Common{
+		RegistryName: "test",
+		Cluster:      "test",
+		Env:          "env",
+		IDC:          "idc1",
+	}
 	config.Server.GRPCOption = &GRPCOption{
 		Port: "10010",
 		TLSOption: &TLSOption{
@@ -42,10 +48,6 @@ func TestConfig_Load(t *testing.T) {
 		PushMessagePoolSize:   10,
 		ReplyPoolSize:         10,
 		MsgReqNumPerSecond:    5,
-		RegistryName:          "test",
-		Cluster:               "test",
-		Env:                   "env",
-		IDC:                   "idc1",
 		SessionExpiredInMills: 5 * time.Second,
 		SendMessageTimeout:    5 * time.Second,
 	}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/config/grpc.go
index c3acd3c82..12073eb10 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/config/grpc.go
@@ -43,18 +43,6 @@ type GRPCOption struct {
 	//MsgReqNumPerSecond
 	MsgReqNumPerSecond float64 `yaml:"msg-req-num-per-second" toml:"msg-req-num-per-second"`
 
-	// RegistryName name for registry plugin support nacos or etcd
-	RegistryName string `yaml:"registry-name" toml:"registry-name"`
-
-	// Cluster cluster for grpc server
-	Cluster string `yaml:"cluster" toml:"cluster"`
-
-	// Env for env variable
-	Env string `yaml:"env" toml:"env"`
-
-	// IDC idc for grpc server
-	IDC string `yaml:"idc" toml:"idc"`
-
 	// SessionExpiredInMills internal to clean the not work session consumer
 	SessionExpiredInMills time.Duration `yaml:"session-expired-in-mills"`
 	// SendMessageTimeout timeout in send message
diff --git a/eventmesh-server-go/config/testdata/test_config.yaml b/eventmesh-server-go/config/testdata/test_config.yaml
index ec10468b1..6527c7123 100644
--- a/eventmesh-server-go/config/testdata/test_config.yaml
+++ b/eventmesh-server-go/config/testdata/test_config.yaml
@@ -13,6 +13,11 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
+common:
+  cluster: "test"
+  registry-name: "test"
+  env: "env"
+  idc: "idc1"
 server:
   grpc:
     port: 10010
@@ -27,10 +32,6 @@ server:
     push-message-pool-size: 10
     reply-pool-size: 10
     msg-req-num-per-second: 5
-    cluster: "test"
-    registry-name: "test"
-    env: "env"
-    idc: "idc1"
     session-expired-in-mills: 5s
     send-message-timeout: 5s
   http:
diff --git a/eventmesh-server-go/eventmesh.go b/eventmesh-server-go/eventmesh.go
index d13dd59d5..046b0ceb1 100644
--- a/eventmesh-server-go/eventmesh.go
+++ b/eventmesh-server-go/eventmesh.go
@@ -27,8 +27,8 @@ import (
 	_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/standalone"
 	_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/database/mysql"
 	_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/metrics/prometheus"
-	_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/naming/nacos/registry"
 	_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/protocol/cloudevents"
+	_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry/nacos"
 )
 
 var confPath string
diff --git a/eventmesh-server-go/plugin/naming/nacos/registry/registry.go b/eventmesh-server-go/plugin/naming/nacos/registry/registry.go
deleted file mode 100644
index 9237c151f..000000000
--- a/eventmesh-server-go/plugin/naming/nacos/registry/registry.go
+++ /dev/null
@@ -1,170 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package registry
-
-import (
-	"fmt"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/naming/registry"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
-	"github.com/gogf/gf/util/gconv"
-	"github.com/nacos-group/nacos-sdk-go/v2/clients"
-	"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
-	"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
-	"github.com/nacos-group/nacos-sdk-go/v2/vo"
-	"github.com/pkg/errors"
-	"net"
-	"strconv"
-	"strings"
-)
-
-const (
-	DefaultClusterName = "DEFAULT"
-	DefaultGroupName   = "DEFAULT_GROUP"
-)
-
-const (
-	defaultConnectTimeout = 5000
-	defaultWeight         = 100
-)
-
-func init() {
-	plugin.Register("nacos", &Registry{})
-}
-
-// Registry register service
-type Registry struct {
-	Provider naming_client.INamingClient
-	cfg      *Config
-	host     string
-	port     int
-}
-
-// newRegistry 新建实例
-func newRegistry(provider naming_client.INamingClient, cfg *Config) *Registry {
-	return &Registry{
-		Provider: provider,
-		cfg:      cfg,
-	}
-}
-
-// Type return registry type
-func (r *Registry) Type() string {
-	return "registry"
-}
-
-// Setup setup config
-func (r *Registry) Setup(name string, configDec plugin.Decoder) error {
-	if configDec == nil {
-		return errors.New("registry config decoder empty")
-	}
-	conf := &PluginConfig{}
-	if err := configDec.Decode(conf); err != nil {
-		return err
-	}
-	return r.register(conf)
-}
-
-// Register registry service, application can invoke this method to register service to remote registry-server
-func (r *Registry) Register(_ string) error {
-	host, portRaw, err := net.SplitHostPort(r.cfg.Address)
-	if err != nil {
-		return err
-	}
-	port, err := strconv.ParseInt(portRaw, 10, 64)
-	if err != nil {
-		return err
-	}
-	r.host = host
-	r.port = gconv.Int(port)
-	if r.cfg.Weight == 0 {
-		r.cfg.Weight = defaultWeight
-	}
-	var req = vo.RegisterInstanceParam{
-		Ip:          r.host,
-		Port:        uint64(r.port),
-		ServiceName: r.cfg.ServiceName,
-		GroupName:   DefaultGroupName,
-		Healthy:     true,
-		Enable:      true,
-		Weight:      gconv.Float64(r.cfg.Weight),
-	}
-	result, err := r.Provider.RegisterInstance(req)
-	if err != nil {
-		return errors.Wrap(err, "fail to Register instance")
-	}
-	if !result {
-		return errors.New("fail to Register instance")
-	}
-	return nil
-}
-
-// Deregister de-registry service, application can invoke this method to de-register service to remote registry-server
-func (r *Registry) Deregister(_ string) error {
-	var req = vo.DeregisterInstanceParam{
-		Ip:          r.host,
-		Port:        uint64(r.port),
-		ServiceName: r.cfg.ServiceName,
-		GroupName:   DefaultGroupName,
-	}
-	result, err := r.Provider.DeregisterInstance(req)
-	if err != nil {
-		return errors.Wrap(err, "fail to Deregister instance")
-	}
-	if !result {
-		return errors.New("fail to Deregister instance")
-	}
-	return nil
-}
-
-func (r *Registry) register(conf *PluginConfig) error {
-	provider, err := r.newProvider(conf)
-	if err != nil {
-		return err
-	}
-	ip := util.GetIP()
-	serverName := conf.ServiceName
-	cfg := &Config{
-		ServiceName: serverName,
-		Address:     fmt.Sprintf("%s:%v", ip, conf.Port),
-	}
-	registry.Register(serverName, newRegistry(provider, cfg))
-	return nil
-}
-
-func (r *Registry) newProvider(cfg *PluginConfig) (naming_client.INamingClient, error) {
-	var p vo.NacosClientParam
-	var addresses []string
-	if len(cfg.AddressList) > 0 {
-		addresses = strings.Split(cfg.AddressList, ",")
-	}
-	for _, address := range addresses {
-		ip, port, err := net.SplitHostPort(address)
-		if err != nil {
-			return nil, err
-		}
-		p.ServerConfigs = append(p.ServerConfigs, constant.ServerConfig{IpAddr: ip, Port: gconv.Uint64(port)})
-	}
-	p.ClientConfig = &constant.ClientConfig{
-		TimeoutMs: defaultConnectTimeout,
-		CacheDir:  cfg.CacheDir,
-	}
-	provider, err := clients.NewNamingClient(p)
-	if err != nil {
-		return nil, err
-	}
-	return provider, nil
-}
diff --git a/eventmesh-server-go/plugin/naming/nacos/selector/selector.go b/eventmesh-server-go/plugin/naming/nacos/selector/selector.go
deleted file mode 100644
index 297ebc5b2..000000000
--- a/eventmesh-server-go/plugin/naming/nacos/selector/selector.go
+++ /dev/null
@@ -1,120 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements.  See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License.  You may obtain a copy of the License at
-//
-//     http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package selector
-
-import (
-	"errors"
-	"fmt"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/naming/registry"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/naming/selector"
-	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
-	nacos_registry "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/naming/nacos/registry"
-	"github.com/gogf/gf/util/gconv"
-	"github.com/nacos-group/nacos-sdk-go/v2/clients"
-	"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
-	"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
-	"github.com/nacos-group/nacos-sdk-go/v2/vo"
-	"net"
-	"strconv"
-	"strings"
-)
-
-const (
-	defaultConnectTimeout = 5000
-)
-
-func init() {
-	plugin.Register("nacos", &Selector{})
-}
-
-// Selector 路由选择器
-type Selector struct {
-	Client naming_client.INamingClient
-}
-
-// newRegistry 新建实例
-func newSelector(client naming_client.INamingClient) *Selector {
-	return &Selector{
-		Client: client,
-	}
-}
-
-// Type return registry type
-func (s *Selector) Type() string {
-	return "selector"
-}
-
-// Setup setup config
-func (s *Selector) Setup(name string, configDec plugin.Decoder) error {
-	if configDec == nil {
-		return errors.New("selector config decoder empty")
-	}
-	conf := &PluginConfig{}
-	if err := configDec.Decode(conf); err != nil {
-		return err
-	}
-	client, err := s.newClient(conf)
-	if err != nil {
-		return err
-	}
-	selector.Register(config.GlobalConfig().Name, newSelector(client))
-	return nil
-}
-
-// Select 选择服务节点
-func (s *Selector) Select(serviceName string) (*registry.Instance, error) {
-	instanceReq := vo.SelectOneHealthInstanceParam{
-		ServiceName: serviceName,
-		GroupName:   nacos_registry.DefaultGroupName,
-		Clusters:    []string{nacos_registry.DefaultClusterName},
-	}
-	instance, err := s.Client.SelectOneHealthyInstance(instanceReq)
-	if err != nil {
-		return nil, fmt.Errorf("get one instance err: %s", err.Error())
-	}
-	if instance == nil {
-		return nil, fmt.Errorf("get one instance return empty")
-	}
-	return &registry.Instance{
-		Address:     net.JoinHostPort(instance.Ip, strconv.Itoa(int(instance.Port))),
-		ServiceName: serviceName,
-		Weight:      gconv.Int(instance.Weight),
-		Clusters:    instance.ClusterName,
-		Metadata:    instance.Metadata,
-	}, nil
-}
-
-func (s *Selector) newClient(cfg *PluginConfig) (naming_client.INamingClient, error) {
-	var p vo.NacosClientParam
-	var addresses []string
-	if len(cfg.AddressList) > 0 {
-		addresses = strings.Split(cfg.AddressList, ",")
-	}
-	for _, address := range addresses {
-		ip, port, err := net.SplitHostPort(address)
-		if err != nil {
-			return nil, err
-		}
-		p.ServerConfigs = append(p.ServerConfigs, constant.ServerConfig{IpAddr: ip, Port: gconv.Uint64(port)})
-	}
-	p.ClientConfig = &constant.ClientConfig{TimeoutMs: defaultConnectTimeout}
-	provider, err := clients.NewNamingClient(p)
-	if err != nil {
-		return nil, err
-	}
-	return provider, nil
-}
diff --git a/eventmesh-server-go/plugin/naming/nacos/registry/config.go b/eventmesh-server-go/plugin/registry/model.go
similarity index 56%
copy from eventmesh-server-go/plugin/naming/nacos/registry/config.go
copy to eventmesh-server-go/plugin/registry/model.go
index 11fd86c23..495331209 100644
--- a/eventmesh-server-go/plugin/naming/nacos/registry/config.go
+++ b/eventmesh-server-go/plugin/registry/model.go
@@ -15,18 +15,28 @@
 
 package registry
 
-// Config registry config
-type Config struct {
-	ServiceName string
-	Weight      int
-	Address     string
-	Metadata    map[string]string
+import "time"
+
+type EventMeshDataInfo struct {
+	EventMeshClusterName string
+	EventMeshName        string
+	Endpoint             string
+	LastUpdateTimestamp  time.Time
+	Metadata             map[string]string
+}
+
+type EventMeshRegisterInfo struct {
+	EventMeshClusterName    string
+	EventMeshName           string
+	EndPoint                string
+	EventMeshInstanceNumMap map[string]map[string]int
+	Metadata                map[string]string
+	ProtocolType            string
 }
 
-// PluginConfig define registry plugin config
-type PluginConfig struct {
-	ServiceName string `yaml:"service_name"`
-	CacheDir    string `yaml:"cache-dir"`
-	Port        string `yaml:"port"`         // nacos server port
-	AddressList string `yaml:"address_list"` // nacos server address list
+type EventMeshUnRegisterInfo struct {
+	EventMeshClusterName string
+	EventMeshName        string
+	EndPoint             string
+	ProtocolType         string
 }
diff --git a/eventmesh-server-go/plugin/naming/nacos/selector/config.go b/eventmesh-server-go/plugin/registry/nacos/config.go
similarity index 74%
rename from eventmesh-server-go/plugin/naming/nacos/selector/config.go
rename to eventmesh-server-go/plugin/registry/nacos/config.go
index 95cbb8280..51a95744f 100644
--- a/eventmesh-server-go/plugin/naming/nacos/selector/config.go
+++ b/eventmesh-server-go/plugin/registry/nacos/config.go
@@ -13,18 +13,12 @@
 // See the License for the specific language governing permissions and
 // limitations under the License.
 
-package selector
+package nacos
 
-// Config selector config
+// Config define registry nacos plugin config
 type Config struct {
-	ServiceName string
-	Weight      int
-	ClusterName string
-	GroupName   string
-	Metadata    map[string]string
-}
-
-// PluginConfig define selector plugin config
-type PluginConfig struct {
-	AddressList string `yaml:"address_list"`
+	ServiceName string `yaml:"service-name"`
+	CacheDir    string `yaml:"cache-dir"`
+	Port        string `yaml:"port"`         // nacos server port
+	AddressList string `yaml:"address-list"` // nacos server address list
 }
diff --git a/eventmesh-server-go/plugin/registry/nacos/mocks/mock_naming_client_interface.go b/eventmesh-server-go/plugin/registry/nacos/mocks/mock_naming_client_interface.go
new file mode 100644
index 000000000..84082c4d5
--- /dev/null
+++ b/eventmesh-server-go/plugin/registry/nacos/mocks/mock_naming_client_interface.go
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: ./naming_client_interface.go
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+	reflect "reflect"
+
+	gomock "github.com/golang/mock/gomock"
+	model "github.com/nacos-group/nacos-sdk-go/v2/model"
+	vo "github.com/nacos-group/nacos-sdk-go/v2/vo"
+)
+
+// MockINamingClient is a mock of INamingClient interface.
+type MockINamingClient struct {
+	ctrl     *gomock.Controller
+	recorder *MockINamingClientMockRecorder
+}
+
+// MockINamingClientMockRecorder is the mock recorder for MockINamingClient.
+type MockINamingClientMockRecorder struct {
+	mock *MockINamingClient
+}
+
+// NewMockINamingClient creates a new mock instance.
+func NewMockINamingClient(ctrl *gomock.Controller) *MockINamingClient {
+	mock := &MockINamingClient{ctrl: ctrl}
+	mock.recorder = &MockINamingClientMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockINamingClient) EXPECT() *MockINamingClientMockRecorder {
+	return m.recorder
+}
+
+// CloseClient mocks base method.
+func (m *MockINamingClient) CloseClient() {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "CloseClient")
+}
+
+// CloseClient indicates an expected call of CloseClient.
+func (mr *MockINamingClientMockRecorder) CloseClient() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseClient", reflect.TypeOf((*MockINamingClient)(nil).CloseClient))
+}
+
+// DeregisterInstance mocks base method.
+func (m *MockINamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "DeregisterInstance", param)
+	ret0, _ := ret[0].(bool)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// DeregisterInstance indicates an expected call of DeregisterInstance.
+func (mr *MockINamingClientMockRecorder) DeregisterInstance(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeregisterInstance", reflect.TypeOf((*MockINamingClient)(nil).DeregisterInstance), param)
+}
+
+// GetAllServicesInfo mocks base method.
+func (m *MockINamingClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam) (model.ServiceList, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "GetAllServicesInfo", param)
+	ret0, _ := ret[0].(model.ServiceList)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// GetAllServicesInfo indicates an expected call of GetAllServicesInfo.
+func (mr *MockINamingClientMockRecorder) GetAllServicesInfo(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllServicesInfo", reflect.TypeOf((*MockINamingClient)(nil).GetAllServicesInfo), param)
+}
+
+// GetService mocks base method.
+func (m *MockINamingClient) GetService(param vo.GetServiceParam) (model.Service, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "GetService", param)
+	ret0, _ := ret[0].(model.Service)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// GetService indicates an expected call of GetService.
+func (mr *MockINamingClientMockRecorder) GetService(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetService", reflect.TypeOf((*MockINamingClient)(nil).GetService), param)
+}
+
+// RegisterInstance mocks base method.
+func (m *MockINamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "RegisterInstance", param)
+	ret0, _ := ret[0].(bool)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// RegisterInstance indicates an expected call of RegisterInstance.
+func (mr *MockINamingClientMockRecorder) RegisterInstance(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterInstance", reflect.TypeOf((*MockINamingClient)(nil).RegisterInstance), param)
+}
+
+// SelectAllInstances mocks base method.
+func (m *MockINamingClient) SelectAllInstances(param vo.SelectAllInstancesParam) ([]model.Instance, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "SelectAllInstances", param)
+	ret0, _ := ret[0].([]model.Instance)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// SelectAllInstances indicates an expected call of SelectAllInstances.
+func (mr *MockINamingClientMockRecorder) SelectAllInstances(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectAllInstances", reflect.TypeOf((*MockINamingClient)(nil).SelectAllInstances), param)
+}
+
+// SelectInstances mocks base method.
+func (m *MockINamingClient) SelectInstances(param vo.SelectInstancesParam) ([]model.Instance, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "SelectInstances", param)
+	ret0, _ := ret[0].([]model.Instance)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// SelectInstances indicates an expected call of SelectInstances.
+func (mr *MockINamingClientMockRecorder) SelectInstances(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectInstances", reflect.TypeOf((*MockINamingClient)(nil).SelectInstances), param)
+}
+
+// SelectOneHealthyInstance mocks base method.
+func (m *MockINamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "SelectOneHealthyInstance", param)
+	ret0, _ := ret[0].(*model.Instance)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// SelectOneHealthyInstance indicates an expected call of SelectOneHealthyInstance.
+func (mr *MockINamingClientMockRecorder) SelectOneHealthyInstance(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectOneHealthyInstance", reflect.TypeOf((*MockINamingClient)(nil).SelectOneHealthyInstance), param)
+}
+
+// Subscribe mocks base method.
+func (m *MockINamingClient) Subscribe(param *vo.SubscribeParam) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "Subscribe", param)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// Subscribe indicates an expected call of Subscribe.
+func (mr *MockINamingClientMockRecorder) Subscribe(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockINamingClient)(nil).Subscribe), param)
+}
+
+// Unsubscribe mocks base method.
+func (m *MockINamingClient) Unsubscribe(param *vo.SubscribeParam) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "Unsubscribe", param)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// Unsubscribe indicates an expected call of Unsubscribe.
+func (mr *MockINamingClientMockRecorder) Unsubscribe(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsubscribe", reflect.TypeOf((*MockINamingClient)(nil).Unsubscribe), param)
+}
+
+// UpdateInstance mocks base method.
+func (m *MockINamingClient) UpdateInstance(param vo.UpdateInstanceParam) (bool, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "UpdateInstance", param)
+	ret0, _ := ret[0].(bool)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// UpdateInstance indicates an expected call of UpdateInstance.
+func (mr *MockINamingClientMockRecorder) UpdateInstance(param interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInstance", reflect.TypeOf((*MockINamingClient)(nil).UpdateInstance), param)
+}
diff --git a/eventmesh-server-go/plugin/registry/nacos/nacos.go b/eventmesh-server-go/plugin/registry/nacos/nacos.go
new file mode 100644
index 000000000..052fc195d
--- /dev/null
+++ b/eventmesh-server-go/plugin/registry/nacos/nacos.go
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package nacos
+
+import (
+	"fmt"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry"
+	"github.com/gogf/gf/util/gconv"
+	"github.com/nacos-group/nacos-sdk-go/v2/clients"
+	"github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
+	"github.com/nacos-group/nacos-sdk-go/v2/common/constant"
+	"github.com/nacos-group/nacos-sdk-go/v2/vo"
+	"go.uber.org/atomic"
+	"net"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+var (
+	defaultConnectTimeout         = 5000 * time.Millisecond
+	DefaultClusterName            = "DEFAULT"
+	DefaultGroupName              = "DEFAULT_GROUP"
+	DefaultWeight         float64 = 10
+	protoList                     = []string{"TCP", "HTTP", "GRPC"}
+)
+
+func init() {
+	plugin.Register("nacos", &Registry{})
+}
+
+type Registry struct {
+	initStatus    *atomic.Bool
+	startStatus   *atomic.Bool
+	cfg           *Config
+	client        naming_client.INamingClient
+	registryInfos *sync.Map // map[string]*registry.EventMeshRegisterInfo
+}
+
+func (r *Registry) Type() string {
+	return registry.Type
+}
+
+func (r *Registry) Setup(name string, dec plugin.Decoder) error {
+	conf := &Config{}
+	if err := dec.Decode(conf); err != nil {
+		return err
+	}
+	r.cfg = conf
+	return nil
+}
+
+func (r *Registry) Init() error {
+	r.initStatus = atomic.NewBool(true)
+	r.startStatus = atomic.NewBool(false)
+	r.registryInfos = new(sync.Map)
+	return nil
+}
+
+func (r *Registry) Start() error {
+	var p vo.NacosClientParam
+	var addresses []string
+
+	if len(r.cfg.AddressList) > 0 {
+		addresses = strings.Split(r.cfg.AddressList, ",")
+	}
+
+	for _, address := range addresses {
+		ip, port, err := net.SplitHostPort(address)
+		if err != nil {
+			return err
+		}
+		p.ServerConfigs = append(p.ServerConfigs, constant.ServerConfig{IpAddr: ip, Port: gconv.Uint64(port)})
+	}
+
+	p.ClientConfig = &constant.ClientConfig{
+		TimeoutMs: uint64(defaultConnectTimeout.Milliseconds()),
+		CacheDir:  r.cfg.CacheDir,
+	}
+	cli, err := clients.NewNamingClient(p)
+	if err != nil {
+		return err
+	}
+
+	r.startStatus.Store(true)
+	r.client = cli
+	return nil
+}
+
+func (r *Registry) Shutdown() error {
+	r.startStatus.Store(false)
+	r.initStatus.Store(false)
+	r.client.CloseClient()
+	return nil
+}
+
+func (r *Registry) FindEventMeshInfoByCluster(clusterName string) ([]*registry.EventMeshDataInfo, error) {
+	var infos []*registry.EventMeshDataInfo
+	meshServerName := config.GlobalConfig().Common.Name
+	cluster := config.GlobalConfig().Common.Cluster
+	for _, proto := range protoList {
+		ins, err := r.client.SelectInstances(vo.SelectInstancesParam{
+			Clusters:    []string{clusterName},
+			ServiceName: fmt.Sprintf("%v-%v", meshServerName, proto),
+			GroupName:   cluster,
+			HealthyOnly: true,
+		})
+		if err != nil {
+			return nil, err
+		}
+		if len(ins) == 0 {
+			continue
+		}
+		for _, in := range ins {
+			infos = append(infos, &registry.EventMeshDataInfo{
+				EventMeshClusterName: in.ClusterName,
+				EventMeshName:        in.ServiceName,
+				Endpoint:             fmt.Sprintf("%v:%v", in.Ip, in.Port),
+				LastUpdateTimestamp:  time.Time{},
+				Metadata:             in.Metadata,
+			})
+		}
+	}
+
+	return infos, nil
+}
+
+func (r *Registry) FindAllEventMeshInfo() ([]*registry.EventMeshDataInfo, error) {
+	var infos []*registry.EventMeshDataInfo
+	meshServerName := config.GlobalConfig().Common.Name
+
+	for _, proto := range protoList {
+		ins, err := r.client.SelectInstances(vo.SelectInstancesParam{
+			Clusters:    []string{},
+			ServiceName: fmt.Sprintf("%v-%v", meshServerName, proto),
+			GroupName:   "GROUP",
+			HealthyOnly: true,
+		})
+		if err != nil {
+			return nil, err
+		}
+		if len(ins) == 0 {
+			continue
+		}
+		for _, in := range ins {
+			infos = append(infos, &registry.EventMeshDataInfo{
+				EventMeshClusterName: in.ClusterName,
+				EventMeshName:        in.ServiceName,
+				Endpoint:             fmt.Sprintf("%v:%v", in.Ip, in.Port),
+				LastUpdateTimestamp:  time.Time{},
+				Metadata:             in.Metadata,
+			})
+		}
+	}
+
+	return infos, nil
+}
+
+// FindEventMeshClientDistributionData not used
+// deprecate
+func (r *Registry) FindEventMeshClientDistributionData(clusterName, group, purpose string) (map[string]map[string]int, error) {
+	return nil, nil
+}
+
+func (r *Registry) RegisterMetadata(map[string]string) {
+	r.registryInfos.Range(func(key, value any) bool {
+		r.Register(value.(*registry.EventMeshRegisterInfo))
+		return true
+	})
+}
+
+func (r *Registry) Register(info *registry.EventMeshRegisterInfo) error {
+	ipPort := strings.Split(info.EndPoint, ":")
+	if len(ipPort) != 2 {
+		return fmt.Errorf("endpoint format err")
+	}
+	ip := ipPort[0]
+	port, err := strconv.ParseInt(ipPort[1], 10, 64)
+	if err != nil {
+		return err
+	}
+
+	_, err = r.client.RegisterInstance(vo.RegisterInstanceParam{
+		Ip:          ip,
+		Port:        uint64(port),
+		ServiceName: info.EventMeshName,
+		GroupName:   uniqGroupName(info.ProtocolType),
+		Healthy:     true,
+		Enable:      true,
+		Weight:      DefaultWeight,
+	})
+	if err != nil {
+		return err
+	}
+	r.registryInfos.Store(info.EventMeshName, info)
+	return nil
+}
+
+func (r *Registry) UnRegister(info *registry.EventMeshUnRegisterInfo) error {
+	ipPort := strings.Split(info.EndPoint, ":")
+	if len(ipPort) != 2 {
+		return fmt.Errorf("endpoint format err")
+	}
+	ip := ipPort[0]
+	port, err := strconv.ParseInt(ipPort[1], 10, 64)
+	if err != nil {
+		return err
+	}
+
+	_, err = r.client.DeregisterInstance(vo.DeregisterInstanceParam{
+		Ip:          ip,
+		Port:        uint64(port),
+		ServiceName: info.EventMeshName,
+		GroupName:   uniqGroupName(info.ProtocolType),
+	})
+	if err != nil {
+		return err
+	}
+	r.registryInfos.Delete(info.EventMeshName)
+	return nil
+}
+
+func uniqGroupName(protoType string) string {
+	return fmt.Sprintf("%s-GROUP", protoType)
+}
diff --git a/eventmesh-server-go/plugin/registry/nacos/nacos_test.go b/eventmesh-server-go/plugin/registry/nacos/nacos_test.go
new file mode 100644
index 000000000..fa4aff3a4
--- /dev/null
+++ b/eventmesh-server-go/plugin/registry/nacos/nacos_test.go
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package nacos
+
+import (
+	"fmt"
+	"github.com/nacos-group/nacos-sdk-go/v2/model"
+	"os"
+	"path/filepath"
+	"testing"
+
+	"github.com/golang/mock/gomock"
+	"github.com/nacos-group/nacos-sdk-go/v2/vo"
+	"github.com/stretchr/testify/assert"
+
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry/nacos/mocks"
+)
+
+func Test_Init(t *testing.T) {
+	r := &Registry{}
+	assert.NoError(t, r.Init())
+	assert.True(t, r.initStatus.Load())
+	assert.False(t, r.startStatus.Load())
+}
+
+func Test_Start(t *testing.T) {
+	t.Run("create nameing client", func(t *testing.T) {
+		cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+		assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+		defer os.RemoveAll(cacheDir)
+
+		r := &Registry{
+			cfg: &Config{
+				ServiceName: "test",
+				CacheDir:    "/tmp/",
+				Port:        "8088",
+				AddressList: "127.0.0.1:8081",
+			},
+		}
+		assert.NoError(t, r.Init())
+		ctrl := gomock.NewController(t)
+		mocks.NewMockINamingClient(ctrl)
+		assert.NoError(t, r.Start())
+		assert.NoError(t, r.Shutdown())
+		assert.False(t, r.initStatus.Load())
+		assert.False(t, r.startStatus.Load())
+	})
+}
+
+func Test_Register(t *testing.T) {
+	cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+	assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+	defer os.RemoveAll(cacheDir)
+
+	r := &Registry{
+		cfg: &Config{
+			ServiceName: "test",
+			CacheDir:    "/tmp/",
+			Port:        "8088",
+			AddressList: "127.0.0.1:8081",
+		},
+	}
+	assert.NoError(t, r.Init())
+	assert.True(t, r.initStatus.Load())
+	assert.False(t, r.startStatus.Load())
+	assert.NoError(t, r.Start())
+	ctrl := gomock.NewController(t)
+	nameclient := mocks.NewMockINamingClient(ctrl)
+	meshInfo := &registry.EventMeshRegisterInfo{
+		EventMeshClusterName:    "test",
+		EventMeshName:           "test",
+		EndPoint:                "127.0.0.1:3333",
+		EventMeshInstanceNumMap: map[string]map[string]int{},
+		Metadata:                map[string]string{},
+		ProtocolType:            "GRPC",
+	}
+	nameclient.EXPECT().RegisterInstance(vo.RegisterInstanceParam{
+		Ip:          "127.0.0.1",
+		Port:        3333,
+		ServiceName: "test",
+		GroupName:   uniqGroupName("GRPC"),
+		Healthy:     true,
+		Enable:      true,
+		Weight:      DefaultWeight,
+	}).Return(true, nil).Times(1)
+	r.client = nameclient
+	err := r.Register(meshInfo)
+	assert.NoError(t, err)
+	val, ok := r.registryInfos.Load(meshInfo.EventMeshName)
+	assert.True(t, ok)
+	assert.NotNil(t, val)
+}
+
+func Test_DeRegister(t *testing.T) {
+	cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+	assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+	defer os.RemoveAll(cacheDir)
+
+	r := &Registry{
+		cfg: &Config{
+			ServiceName: "test",
+			CacheDir:    "/tmp/",
+			Port:        "8088",
+			AddressList: "127.0.0.1:8081",
+		},
+	}
+	assert.NoError(t, r.Init())
+	assert.True(t, r.initStatus.Load())
+	assert.False(t, r.startStatus.Load())
+	assert.NoError(t, r.Start())
+	ctrl := gomock.NewController(t)
+	nameclient := mocks.NewMockINamingClient(ctrl)
+	meshInfo := &registry.EventMeshRegisterInfo{
+		EventMeshClusterName:    "test",
+		EventMeshName:           "test",
+		EndPoint:                "127.0.0.1:3333",
+		EventMeshInstanceNumMap: map[string]map[string]int{},
+		Metadata:                map[string]string{},
+		ProtocolType:            "GRPC",
+	}
+	nameclient.EXPECT().RegisterInstance(vo.RegisterInstanceParam{
+		Ip:          "127.0.0.1",
+		Port:        3333,
+		ServiceName: "test",
+		GroupName:   uniqGroupName("GRPC"),
+		Healthy:     true,
+		Enable:      true,
+		Weight:      DefaultWeight,
+	}).Return(true, nil).Times(1)
+	r.client = nameclient
+	err := r.Register(meshInfo)
+	assert.NoError(t, err)
+	val, ok := r.registryInfos.Load(meshInfo.EventMeshName)
+	assert.True(t, ok)
+	assert.NotNil(t, val)
+
+	unmeshInfo := &registry.EventMeshUnRegisterInfo{
+		EventMeshClusterName: "test",
+		EventMeshName:        "test",
+		EndPoint:             "127.0.0.1:3333",
+		ProtocolType:         "GRPC",
+	}
+	nameclient.EXPECT().DeregisterInstance(vo.DeregisterInstanceParam{
+		Ip:          "127.0.0.1",
+		Port:        3333,
+		ServiceName: meshInfo.EventMeshName,
+		GroupName:   uniqGroupName("GRPC"),
+	}).Return(true, nil).Times(1)
+	err = r.UnRegister(unmeshInfo)
+	assert.NoError(t, err)
+	val, ok = r.registryInfos.Load(meshInfo.EventMeshName)
+	assert.False(t, ok)
+	assert.Nil(t, val)
+}
+
+func Test_FindEventMeshInfoByCluster(t *testing.T) {
+	cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+	assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+	defer os.RemoveAll(cacheDir)
+
+	r := &Registry{
+		cfg: &Config{
+			ServiceName: "test",
+			CacheDir:    "/tmp/",
+			Port:        "8088",
+			AddressList: "127.0.0.1:8081",
+		},
+	}
+	assert.NoError(t, r.Init())
+	assert.True(t, r.initStatus.Load())
+	assert.False(t, r.startStatus.Load())
+	assert.NoError(t, r.Start())
+
+	t.Run("return empty", func(t *testing.T) {
+		ctrl := gomock.NewController(t)
+		nameclient := mocks.NewMockINamingClient(ctrl)
+		r.client = nameclient
+		protoList = []string{"GRPC"}
+		nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+			Clusters:    []string{"test"},
+			ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+			GroupName:   "1",
+			HealthyOnly: true,
+		}).Return([]model.Instance{}, nil).AnyTimes()
+		val, err := r.FindEventMeshInfoByCluster("test")
+		assert.NoError(t, err)
+		assert.Equal(t, len(val), 0)
+	})
+
+	t.Run("return 1 instance", func(t *testing.T) {
+		ctrl := gomock.NewController(t)
+		nameclient := mocks.NewMockINamingClient(ctrl)
+		r.client = nameclient
+		protoList = []string{"GRPC"}
+		nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+			Clusters:    []string{"test"},
+			ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+			GroupName:   "1",
+			HealthyOnly: true,
+		}).Return([]model.Instance{
+			{
+				InstanceId: "1",
+			},
+		}, nil).AnyTimes()
+		val, err := r.FindEventMeshInfoByCluster("test")
+		assert.NoError(t, err)
+		assert.Equal(t, len(val), 1)
+	})
+
+	t.Run("return err", func(t *testing.T) {
+		ctrl := gomock.NewController(t)
+		nameclient := mocks.NewMockINamingClient(ctrl)
+		r.client = nameclient
+		protoList = []string{"GRPC"}
+		mockErr := fmt.Errorf("mock err")
+		nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+			Clusters:    []string{"test"},
+			ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+			GroupName:   "1",
+			HealthyOnly: true,
+		}).Return(nil, mockErr).AnyTimes()
+		val, err := r.FindEventMeshInfoByCluster("test")
+		assert.Error(t, err)
+		assert.Equal(t, err, mockErr)
+		assert.Nil(t, val)
+	})
+}
+
+func Test_FindAllEventMeshInfo(t *testing.T) {
+	cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+	assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+	defer os.RemoveAll(cacheDir)
+
+	r := &Registry{
+		cfg: &Config{
+			ServiceName: "test",
+			CacheDir:    "/tmp/",
+			Port:        "8088",
+			AddressList: "127.0.0.1:8081",
+		},
+	}
+	assert.NoError(t, r.Init())
+	assert.True(t, r.initStatus.Load())
+	assert.False(t, r.startStatus.Load())
+	assert.NoError(t, r.Start())
+
+	t.Run("return empty", func(t *testing.T) {
+		ctrl := gomock.NewController(t)
+		nameclient := mocks.NewMockINamingClient(ctrl)
+		r.client = nameclient
+		protoList = []string{"GRPC"}
+		nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+			Clusters:    []string{},
+			ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+			GroupName:   "GROUP",
+			HealthyOnly: true,
+		}).Return([]model.Instance{}, nil).AnyTimes()
+		val, err := r.FindAllEventMeshInfo()
+		assert.NoError(t, err)
+		assert.Equal(t, len(val), 0)
+	})
+
+	t.Run("return 1 instance", func(t *testing.T) {
+		ctrl := gomock.NewController(t)
+		nameclient := mocks.NewMockINamingClient(ctrl)
+		r.client = nameclient
+		protoList = []string{"GRPC"}
+		nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+			Clusters:    []string{},
+			ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+			GroupName:   "GROUP",
+			HealthyOnly: true,
+		}).Return([]model.Instance{
+			{
+				InstanceId: "1",
+			},
+		}, nil).AnyTimes()
+		val, err := r.FindAllEventMeshInfo()
+		assert.NoError(t, err)
+		assert.Equal(t, len(val), 1)
+	})
+
+	t.Run("return err", func(t *testing.T) {
+		ctrl := gomock.NewController(t)
+		nameclient := mocks.NewMockINamingClient(ctrl)
+		r.client = nameclient
+		protoList = []string{"GRPC"}
+		mockErr := fmt.Errorf("mock err")
+		nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+			Clusters:    []string{},
+			ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+			GroupName:   "GROUP",
+			HealthyOnly: true,
+		}).Return(nil, mockErr).AnyTimes()
+		val, err := r.FindAllEventMeshInfo()
+		assert.Error(t, err)
+		assert.Equal(t, err, mockErr)
+		assert.Nil(t, val)
+	})
+}
diff --git a/eventmesh-server-go/plugin/naming/nacos/registry/config.go b/eventmesh-server-go/plugin/registry/registry.go
similarity index 63%
copy from eventmesh-server-go/plugin/naming/nacos/registry/config.go
copy to eventmesh-server-go/plugin/registry/registry.go
index 11fd86c23..4b6f5d5d2 100644
--- a/eventmesh-server-go/plugin/naming/nacos/registry/config.go
+++ b/eventmesh-server-go/plugin/registry/registry.go
@@ -15,18 +15,16 @@
 
 package registry
 
-// Config registry config
-type Config struct {
-	ServiceName string
-	Weight      int
-	Address     string
-	Metadata    map[string]string
-}
+var Type = "registry"
 
-// PluginConfig define registry plugin config
-type PluginConfig struct {
-	ServiceName string `yaml:"service_name"`
-	CacheDir    string `yaml:"cache-dir"`
-	Port        string `yaml:"port"`         // nacos server port
-	AddressList string `yaml:"address_list"` // nacos server address list
+type Interface interface {
+	Init() error
+	Start() error
+	Shutdown() error
+	FindEventMeshInfoByCluster(clusterName string) ([]*EventMeshDataInfo, error)
+	FindAllEventMeshInfo() ([]*EventMeshDataInfo, error)
+	FindEventMeshClientDistributionData(clusterName, group, purpose string) (map[string]map[string]int, error)
+	RegisterMetadata(map[string]string)
+	Register(info *EventMeshRegisterInfo) error
+	UnRegister(info *EventMeshUnRegisterInfo) error
 }
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_group_client.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_group_client.go
index dac62caa0..27935c8bd 100644
--- a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_group_client.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_group_client.go
@@ -46,8 +46,8 @@ type GroupClient struct {
 func DefaultStreamGroupClient() *GroupClient {
 	hostname, _ := os.Hostname()
 	return &GroupClient{
-		ENV:              config.GlobalConfig().Server.GRPCOption.Env,
-		IDC:              config.GlobalConfig().Server.GRPCOption.IDC,
+		ENV:              config.GlobalConfig().Common.Env,
+		IDC:              config.GlobalConfig().Common.IDC,
 		ConsumerGroup:    "ConsumerGroup",
 		Topic:            "Topic",
 		GRPCType:         consts.STREAM,
@@ -66,8 +66,8 @@ func DefaultStreamGroupClient() *GroupClient {
 func DefaultWebhookGroupClient() *GroupClient {
 	hostname, _ := os.Hostname()
 	return &GroupClient{
-		ENV:              config.GlobalConfig().Server.GRPCOption.Env,
-		IDC:              config.GlobalConfig().Server.GRPCOption.IDC,
+		ENV:              config.GlobalConfig().Common.Env,
+		IDC:              config.GlobalConfig().Common.IDC,
 		ConsumerGroup:    "ConsumerGroup",
 		Topic:            "Topic",
 		GRPCType:         consts.WEBHOOK,
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_mesh.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_mesh.go
index 49fcb729a..ac766af17 100644
--- a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_mesh.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_mesh.go
@@ -92,9 +92,9 @@ func (e *eventMeshConsumer) Init() error {
 	persistProps := make(map[string]string)
 	persistProps["isBroadcast"] = "false"
 	persistProps["consumerGroup"] = e.ConsumerGroup
-	persistProps["eventMeshIDC"] = config.GlobalConfig().Server.GRPCOption.IDC
+	persistProps["eventMeshIDC"] = config.GlobalConfig().Common.IDC
 	persistProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
-		config.GlobalConfig().Server.GRPCOption.Cluster)
+		config.GlobalConfig().Common.Cluster)
 	if err := e.persistentConsumer.Init(persistProps); err != nil {
 		return err
 	}
@@ -104,9 +104,9 @@ func (e *eventMeshConsumer) Init() error {
 	broadcastProps := make(map[string]string)
 	broadcastProps["isBroadcast"] = "false"
 	broadcastProps["consumerGroup"] = e.ConsumerGroup
-	broadcastProps["eventMeshIDC"] = config.GlobalConfig().Server.GRPCOption.IDC
+	broadcastProps["eventMeshIDC"] = config.GlobalConfig().Common.IDC
 	broadcastProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
-		config.GlobalConfig().Server.GRPCOption.Cluster)
+		config.GlobalConfig().Common.Cluster)
 	if err := e.broadcastConsumer.Init(broadcastProps); err != nil {
 		return err
 	}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_processor_test.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_processor_test.go
new file mode 100644
index 000000000..528290010
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_processor_test.go
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumer
+
+import (
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/validator"
+	"github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	"github.com/stretchr/testify/assert"
+	"testing"
+)
+
+func Test_processor_Subscribe(t *testing.T) {
+	createHeader := func() *pb.RequestHeader {
+		return &pb.RequestHeader{
+			Env:             "env",
+			Region:          "sh",
+			Idc:             "Idc",
+			Ip:              util.GetIP(),
+			Pid:             util.GetIP(),
+			Sys:             "Sys",
+			Username:        "em",
+			Password:        "pw",
+			Language:        "go",
+			ProtocolType:    "cloudeevents",
+			ProtocolVersion: "v1",
+			ProtocolDesc:    "for mock",
+		}
+	}
+	tests := []struct {
+		name   string
+		expect func(t *testing.T)
+	}{
+		{
+			name: "header no idc err",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				hdr.Idc = ""
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+				assert.Equal(t, err, validator.ErrHeaderNoIDC)
+			},
+		},
+		{
+			name: "header no ip err",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				hdr.Ip = ""
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+				assert.Equal(t, err, validator.ErrHeaderNoIP)
+			},
+		},
+		{
+			name: "header no env err",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				hdr.Env = ""
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+				assert.Equal(t, err, validator.ErrHeaderNoENV)
+			},
+		},
+		{
+			name: "header no pid err",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				hdr.Pid = ""
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+				assert.Equal(t, err, validator.ErrHeaderNoPID)
+			},
+		},
+		{
+			name: "header no sys err",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				hdr.Sys = ""
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+				assert.Equal(t, err, validator.ErrHeaderNoSYS)
+			},
+		},
+		{
+			name: "header no username err",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				hdr.Username = ""
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+				assert.Equal(t, err, validator.ErrHeaderNoUser)
+			},
+		},
+		{
+			name: "header no passwd err",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				hdr.Password = ""
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+				assert.Equal(t, err, validator.ErrHeaderNoPASSWD)
+			},
+		},
+		{
+			name: "header no language err",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				hdr.Language = ""
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+				assert.Equal(t, err, validator.ErrHeaderNoLANG)
+			},
+		},
+		{
+			name: "webhook no url",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				_, err := p.Subscribe(nil, &pb.Subscription{Header: hdr, Url: ""})
+				assert.Equal(t, err, validator.ErrSubscriptionNoURL)
+			},
+		},
+		{
+			name: "stream no topic",
+			expect: func(t *testing.T) {
+				p := &processor{}
+				hdr := createHeader()
+				_, err := p.Subscribe(nil, &pb.Subscription{
+					Header: hdr,
+					Url:    "http://mock.com",
+				})
+				assert.Equal(t, err, validator.ErrSubscriptionNoItem)
+			},
+		},
+		{
+			name: "register client err",
+			expect: func(t *testing.T) {
+				mockMgr, _ := NewConsumerManager()
+				hdr := createHeader()
+				req := &pb.Subscription{
+					Header: hdr,
+					Url:    "http://mock.com",
+					SubscriptionItems: []*pb.Subscription_SubscriptionItem{
+						{
+							Topic: "test-topic",
+							Mode:  pb.Subscription_SubscriptionItem_CLUSTERING,
+							Type:  pb.Subscription_SubscriptionItem_ASYNC,
+						},
+					},
+				}
+				p := &processor{}
+				resp, err := p.Subscribe(mockMgr, req)
+				assert.NoError(t, err)
+				t.Log(resp.String())
+			},
+		},
+	}
+
+	err := config.GlobalConfig().Plugins.Setup()
+	assert.NoError(t, err)
+	plugin.SetActivePlugin(config.GlobalConfig().ActivePlugins)
+	for _, tc := range tests {
+		t.Run(tc.name, func(t *testing.T) {
+			tc.expect(t)
+		})
+	}
+}
+
+func Test_processor_Unsubscribe(t *testing.T) {
+
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/message_request.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/message_request.go
index e32d4dbf6..19fb90445 100644
--- a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/message_request.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/message_request.go
@@ -156,10 +156,10 @@ func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error) {
 		httpHdr.Set(grpc.REQUEST_CODE, grpc.HTTP_PUSH_CLIENT_ASYNC)
 		httpHdr.Set(grpc.LANGUAGE, "Go")
 		httpHdr.Set(grpc.Version, "1.0")
-		httpHdr.Set(grpc.EVENTMESHCLUSTER, config.GlobalConfig().Server.Cluster)
-		httpHdr.Set(grpc.EVENTMESHENV, config.GlobalConfig().Server.Env)
+		httpHdr.Set(grpc.EVENTMESHCLUSTER, config.GlobalConfig().Common.Cluster)
+		httpHdr.Set(grpc.EVENTMESHENV, config.GlobalConfig().Common.Env)
 		httpHdr.Set(grpc.EVENTMESHIP, util.GetIP())
-		httpHdr.Set(grpc.EVENTMESHIDC, config.GlobalConfig().Server.IDC)
+		httpHdr.Set(grpc.EVENTMESHIDC, config.GlobalConfig().Common.IDC)
 		httpHdr.Set(grpc.PROTOCOL_TYPE, hr.SimpleMessage.Header.ProtocolType)
 		httpHdr.Set(grpc.PROTOCOL_DESC, hr.SimpleMessage.Header.ProtocolDesc)
 		httpHdr.Set(grpc.PROTOCOL_VERSION, hr.SimpleMessage.Header.ProtocolVersion)
@@ -212,7 +212,7 @@ func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error) {
 func (w *WebhookRequest) getURLs() []string {
 	var (
 		urls       []string
-		currentIDC = config.GlobalConfig().Server.GRPCOption.IDC
+		currentIDC = config.GlobalConfig().Common.IDC
 	)
 
 	w.IDCWebhookURLs.Range(func(key, value any) bool {
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer_mesh.go b/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer_mesh.go
index b9f21890f..414d7f092 100644
--- a/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer_mesh.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer_mesh.go
@@ -48,8 +48,8 @@ func NewEventMeshProducer(cfg *ProducerGroupConfig) (EventMeshProducer, error) {
 		return nil, err
 	}
 
-	cluster := config2.GlobalConfig().Server.GRPCOption.Cluster
-	idc := config2.GlobalConfig().Server.GRPCOption.IDC
+	cluster := config2.GlobalConfig().Common.Cluster
+	idc := config2.GlobalConfig().Common.IDC
 	mm := make(map[string]string)
 	mm["producerGroup"] = cfg.GroupName
 	mm["instanceName"] = util.BuildMeshClientID(cfg.GroupName, cluster)
diff --git a/eventmesh-server-go/runtime/proto/pb/mocks/consumer_service.go b/eventmesh-server-go/runtime/proto/pb/mocks/consumer_service.go
new file mode 100644
index 000000000..2771fb7c7
--- /dev/null
+++ b/eventmesh-server-go/runtime/proto/pb/mocks/consumer_service.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb (interfaces: ConsumerServiceServer)
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+	context "context"
+	reflect "reflect"
+
+	pb "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	gomock "github.com/golang/mock/gomock"
+)
+
+// MockConsumerServiceServer is a mock of ConsumerServiceServer interface.
+type MockConsumerServiceServer struct {
+	ctrl     *gomock.Controller
+	recorder *MockConsumerServiceServerMockRecorder
+}
+
+// MockConsumerServiceServerMockRecorder is the mock recorder for MockConsumerServiceServer.
+type MockConsumerServiceServerMockRecorder struct {
+	mock *MockConsumerServiceServer
+}
+
+// NewMockConsumerServiceServer creates a new mock instance.
+func NewMockConsumerServiceServer(ctrl *gomock.Controller) *MockConsumerServiceServer {
+	mock := &MockConsumerServiceServer{ctrl: ctrl}
+	mock.recorder = &MockConsumerServiceServerMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockConsumerServiceServer) EXPECT() *MockConsumerServiceServerMockRecorder {
+	return m.recorder
+}
+
+// Subscribe mocks base method.
+func (m *MockConsumerServiceServer) Subscribe(arg0 context.Context, arg1 *pb.Subscription) (*pb.Response, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "Subscribe", arg0, arg1)
+	ret0, _ := ret[0].(*pb.Response)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// Subscribe indicates an expected call of Subscribe.
+func (mr *MockConsumerServiceServerMockRecorder) Subscribe(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockConsumerServiceServer)(nil).Subscribe), arg0, arg1)
+}
+
+// SubscribeStream mocks base method.
+func (m *MockConsumerServiceServer) SubscribeStream(arg0 pb.ConsumerService_SubscribeStreamServer) error {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "SubscribeStream", arg0)
+	ret0, _ := ret[0].(error)
+	return ret0
+}
+
+// SubscribeStream indicates an expected call of SubscribeStream.
+func (mr *MockConsumerServiceServerMockRecorder) SubscribeStream(arg0 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeStream", reflect.TypeOf((*MockConsumerServiceServer)(nil).SubscribeStream), arg0)
+}
+
+// Unsubscribe mocks base method.
+func (m *MockConsumerServiceServer) Unsubscribe(arg0 context.Context, arg1 *pb.Subscription) (*pb.Response, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "Unsubscribe", arg0, arg1)
+	ret0, _ := ret[0].(*pb.Response)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// Unsubscribe indicates an expected call of Unsubscribe.
+func (mr *MockConsumerServiceServerMockRecorder) Unsubscribe(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsubscribe", reflect.TypeOf((*MockConsumerServiceServer)(nil).Unsubscribe), arg0, arg1)
+}
+
+// mustEmbedUnimplementedConsumerServiceServer mocks base method.
+func (m *MockConsumerServiceServer) mustEmbedUnimplementedConsumerServiceServer() {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "mustEmbedUnimplementedConsumerServiceServer")
+}
+
+// mustEmbedUnimplementedConsumerServiceServer indicates an expected call of mustEmbedUnimplementedConsumerServiceServer.
+func (mr *MockConsumerServiceServerMockRecorder) mustEmbedUnimplementedConsumerServiceServer() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedConsumerServiceServer", reflect.TypeOf((*MockConsumerServiceServer)(nil).mustEmbedUnimplementedConsumerServiceServer))
+}
diff --git a/eventmesh-server-go/runtime/proto/pb/mocks/heartbeat_service.go b/eventmesh-server-go/runtime/proto/pb/mocks/heartbeat_service.go
new file mode 100644
index 000000000..0fcdcf0f9
--- /dev/null
+++ b/eventmesh-server-go/runtime/proto/pb/mocks/heartbeat_service.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb (interfaces: HeartbeatServiceServer)
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+	context "context"
+	reflect "reflect"
+
+	pb "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	gomock "github.com/golang/mock/gomock"
+)
+
+// MockHeartbeatServiceServer is a mock of HeartbeatServiceServer interface.
+type MockHeartbeatServiceServer struct {
+	ctrl     *gomock.Controller
+	recorder *MockHeartbeatServiceServerMockRecorder
+}
+
+// MockHeartbeatServiceServerMockRecorder is the mock recorder for MockHeartbeatServiceServer.
+type MockHeartbeatServiceServerMockRecorder struct {
+	mock *MockHeartbeatServiceServer
+}
+
+// NewMockHeartbeatServiceServer creates a new mock instance.
+func NewMockHeartbeatServiceServer(ctrl *gomock.Controller) *MockHeartbeatServiceServer {
+	mock := &MockHeartbeatServiceServer{ctrl: ctrl}
+	mock.recorder = &MockHeartbeatServiceServerMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockHeartbeatServiceServer) EXPECT() *MockHeartbeatServiceServerMockRecorder {
+	return m.recorder
+}
+
+// Heartbeat mocks base method.
+func (m *MockHeartbeatServiceServer) Heartbeat(arg0 context.Context, arg1 *pb.Heartbeat) (*pb.Response, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "Heartbeat", arg0, arg1)
+	ret0, _ := ret[0].(*pb.Response)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// Heartbeat indicates an expected call of Heartbeat.
+func (mr *MockHeartbeatServiceServerMockRecorder) Heartbeat(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockHeartbeatServiceServer)(nil).Heartbeat), arg0, arg1)
+}
+
+// mustEmbedUnimplementedHeartbeatServiceServer mocks base method.
+func (m *MockHeartbeatServiceServer) mustEmbedUnimplementedHeartbeatServiceServer() {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "mustEmbedUnimplementedHeartbeatServiceServer")
+}
+
+// mustEmbedUnimplementedHeartbeatServiceServer indicates an expected call of mustEmbedUnimplementedHeartbeatServiceServer.
+func (mr *MockHeartbeatServiceServerMockRecorder) mustEmbedUnimplementedHeartbeatServiceServer() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedHeartbeatServiceServer", reflect.TypeOf((*MockHeartbeatServiceServer)(nil).mustEmbedUnimplementedHeartbeatServiceServer))
+}
diff --git a/eventmesh-server-go/runtime/proto/pb/mocks/producer_service.go b/eventmesh-server-go/runtime/proto/pb/mocks/producer_service.go
new file mode 100644
index 000000000..8dfecd6e9
--- /dev/null
+++ b/eventmesh-server-go/runtime/proto/pb/mocks/producer_service.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//     http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb (interfaces: PublisherServiceServer)
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+	context "context"
+	reflect "reflect"
+
+	pb "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+	gomock "github.com/golang/mock/gomock"
+)
+
+// MockPublisherServiceServer is a mock of PublisherServiceServer interface.
+type MockPublisherServiceServer struct {
+	ctrl     *gomock.Controller
+	recorder *MockPublisherServiceServerMockRecorder
+}
+
+// MockPublisherServiceServerMockRecorder is the mock recorder for MockPublisherServiceServer.
+type MockPublisherServiceServerMockRecorder struct {
+	mock *MockPublisherServiceServer
+}
+
+// NewMockPublisherServiceServer creates a new mock instance.
+func NewMockPublisherServiceServer(ctrl *gomock.Controller) *MockPublisherServiceServer {
+	mock := &MockPublisherServiceServer{ctrl: ctrl}
+	mock.recorder = &MockPublisherServiceServerMockRecorder{mock}
+	return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockPublisherServiceServer) EXPECT() *MockPublisherServiceServerMockRecorder {
+	return m.recorder
+}
+
+// BatchPublish mocks base method.
+func (m *MockPublisherServiceServer) BatchPublish(arg0 context.Context, arg1 *pb.BatchMessage) (*pb.Response, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "BatchPublish", arg0, arg1)
+	ret0, _ := ret[0].(*pb.Response)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// BatchPublish indicates an expected call of BatchPublish.
+func (mr *MockPublisherServiceServerMockRecorder) BatchPublish(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchPublish", reflect.TypeOf((*MockPublisherServiceServer)(nil).BatchPublish), arg0, arg1)
+}
+
+// Publish mocks base method.
+func (m *MockPublisherServiceServer) Publish(arg0 context.Context, arg1 *pb.SimpleMessage) (*pb.Response, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "Publish", arg0, arg1)
+	ret0, _ := ret[0].(*pb.Response)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// Publish indicates an expected call of Publish.
+func (mr *MockPublisherServiceServerMockRecorder) Publish(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockPublisherServiceServer)(nil).Publish), arg0, arg1)
+}
+
+// RequestReply mocks base method.
+func (m *MockPublisherServiceServer) RequestReply(arg0 context.Context, arg1 *pb.SimpleMessage) (*pb.SimpleMessage, error) {
+	m.ctrl.T.Helper()
+	ret := m.ctrl.Call(m, "RequestReply", arg0, arg1)
+	ret0, _ := ret[0].(*pb.SimpleMessage)
+	ret1, _ := ret[1].(error)
+	return ret0, ret1
+}
+
+// RequestReply indicates an expected call of RequestReply.
+func (mr *MockPublisherServiceServerMockRecorder) RequestReply(arg0, arg1 interface{}) *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestReply", reflect.TypeOf((*MockPublisherServiceServer)(nil).RequestReply), arg0, arg1)
+}
+
+// mustEmbedUnimplementedPublisherServiceServer mocks base method.
+func (m *MockPublisherServiceServer) mustEmbedUnimplementedPublisherServiceServer() {
+	m.ctrl.T.Helper()
+	m.ctrl.Call(m, "mustEmbedUnimplementedPublisherServiceServer")
+}
+
+// mustEmbedUnimplementedPublisherServiceServer indicates an expected call of mustEmbedUnimplementedPublisherServiceServer.
+func (mr *MockPublisherServiceServerMockRecorder) mustEmbedUnimplementedPublisherServiceServer() *gomock.Call {
+	mr.mock.ctrl.T.Helper()
+	return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedPublisherServiceServer", reflect.TypeOf((*MockPublisherServiceServer)(nil).mustEmbedUnimplementedPublisherServiceServer))
+}
diff --git a/eventmesh-server-go/plugin/naming/nacos/registry/config.go b/eventmesh-server-go/runtime/registry/registry.go
similarity index 65%
rename from eventmesh-server-go/plugin/naming/nacos/registry/config.go
rename to eventmesh-server-go/runtime/registry/registry.go
index 11fd86c23..c9288b927 100644
--- a/eventmesh-server-go/plugin/naming/nacos/registry/config.go
+++ b/eventmesh-server-go/runtime/registry/registry.go
@@ -15,18 +15,9 @@
 
 package registry
 
-// Config registry config
-type Config struct {
-	ServiceName string
-	Weight      int
-	Address     string
-	Metadata    map[string]string
-}
-
-// PluginConfig define registry plugin config
-type PluginConfig struct {
-	ServiceName string `yaml:"service_name"`
-	CacheDir    string `yaml:"cache-dir"`
-	Port        string `yaml:"port"`         // nacos server port
-	AddressList string `yaml:"address_list"` // nacos server address list
+type Registry interface {
+	Init() error
+	findEventMeshInfoByCluster(clusterName string)
+	Start() error
+	Shutdown() error
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org