You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eventmesh.apache.org by jo...@apache.org on 2023/02/02 08:59:44 UTC
[incubator-eventmesh] branch master updated: [ISSUE #2941] Refactor registry nacos (#2960)
This is an automated email from the ASF dual-hosted git repository.
jonyang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-eventmesh.git
The following commit(s) were added to refs/heads/master by this push:
new fcfa45a55 [ISSUE #2941] Refactor registry nacos (#2960)
fcfa45a55 is described below
commit fcfa45a550bf50c93abe6bdd4f6630541ccaf10d
Author: walleliu <li...@163.com>
AuthorDate: Thu Feb 2 16:59:36 2023 +0800
[ISSUE #2941] Refactor registry nacos (#2960)
* update registry api
* add registry plugin
* add licience for mock go files
* change nacos registry default connection timeout to duration
---
.../nacos/selector/config.go => config/common.go} | 20 +-
eventmesh-server-go/config/config.go | 15 +-
eventmesh-server-go/config/config_test.go | 10 +-
eventmesh-server-go/config/grpc.go | 12 -
.../config/testdata/test_config.yaml | 9 +-
eventmesh-server-go/eventmesh.go | 2 +-
.../plugin/naming/nacos/registry/registry.go | 170 -----------
.../plugin/naming/nacos/selector/selector.go | 120 --------
.../nacos/registry/config.go => registry/model.go} | 34 ++-
.../nacos/selector => registry/nacos}/config.go | 18 +-
.../nacos/mocks/mock_naming_client_interface.go | 211 ++++++++++++++
eventmesh-server-go/plugin/registry/nacos/nacos.go | 241 ++++++++++++++++
.../plugin/registry/nacos/nacos_test.go | 314 +++++++++++++++++++++
.../registry/config.go => registry/registry.go} | 24 +-
.../grpc/consumer/consumer_group_client.go | 8 +-
.../core/protocol/grpc/consumer/consumer_mesh.go | 8 +-
.../grpc/consumer/consumer_processor_test.go | 186 ++++++++++++
.../core/protocol/grpc/consumer/message_request.go | 8 +-
.../core/protocol/grpc/producer/producer_mesh.go | 4 +-
.../runtime/proto/pb/mocks/consumer_service.go | 107 +++++++
.../runtime/proto/pb/mocks/heartbeat_service.go | 78 +++++
.../runtime/proto/pb/mocks/producer_service.go | 108 +++++++
.../config.go => runtime/registry/registry.go} | 19 +-
23 files changed, 1330 insertions(+), 396 deletions(-)
diff --git a/eventmesh-server-go/plugin/naming/nacos/selector/config.go b/eventmesh-server-go/config/common.go
similarity index 73%
copy from eventmesh-server-go/plugin/naming/nacos/selector/config.go
copy to eventmesh-server-go/config/common.go
index 95cbb8280..400c221de 100644
--- a/eventmesh-server-go/plugin/naming/nacos/selector/config.go
+++ b/eventmesh-server-go/config/common.go
@@ -13,18 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package selector
+package config
-// Config selector config
-type Config struct {
- ServiceName string
- Weight int
- ClusterName string
- GroupName string
- Metadata map[string]string
-}
-
-// PluginConfig define selector plugin config
-type PluginConfig struct {
- AddressList string `yaml:"address_list"`
+type Common struct {
+ Name string `yaml:"name" toml:"name"`
+ RegistryName string `yaml:"registry-name" toml:"registry-name"`
+ Cluster string `yaml:"cluster" toml:"cluster"`
+ Env string `yaml:"env" toml:"env"`
+ IDC string `yaml:"idc" toml:"idc"`
}
diff --git a/eventmesh-server-go/config/config.go b/eventmesh-server-go/config/config.go
index 626e7674b..77d11f0a6 100644
--- a/eventmesh-server-go/config/config.go
+++ b/eventmesh-server-go/config/config.go
@@ -32,7 +32,7 @@ const (
)
type Config struct {
- Name string `yaml:"name" toml:"name"`
+ Common *Common `yaml:"common" toml:"common"`
Server struct {
*HTTPOption `yaml:"http" toml:"http"`
*GRPCOption `yaml:"grpc" toml:"grpc"`
@@ -50,8 +50,13 @@ func init() {
}
func defaultConfig() *Config {
- cfg := &Config{
- Name: "eventmesh-server",
+ cfg := &Config{}
+ cfg.Common = &Common{
+ Name: "eventmesh-server",
+ RegistryName: "eventmesh-go",
+ Cluster: "1",
+ Env: "{}",
+ IDC: "idc1",
}
cfg.Server.GRPCOption = &GRPCOption{
Port: "10010",
@@ -67,10 +72,6 @@ func defaultConfig() *Config {
PushMessagePoolSize: 10,
ReplyPoolSize: 10,
MsgReqNumPerSecond: 5,
- RegistryName: "eventmesh-go",
- Cluster: "1",
- Env: "{}",
- IDC: "idc1",
SessionExpiredInMills: 5 * time.Second,
SendMessageTimeout: 5 * time.Second,
}
diff --git a/eventmesh-server-go/config/config_test.go b/eventmesh-server-go/config/config_test.go
index a7294be22..0e91a1acb 100644
--- a/eventmesh-server-go/config/config_test.go
+++ b/eventmesh-server-go/config/config_test.go
@@ -28,6 +28,12 @@ func TestConfig_Load(t *testing.T) {
assert := testifyassert.New(t)
config := &Config{}
+ config.Common = &Common{
+ RegistryName: "test",
+ Cluster: "test",
+ Env: "env",
+ IDC: "idc1",
+ }
config.Server.GRPCOption = &GRPCOption{
Port: "10010",
TLSOption: &TLSOption{
@@ -42,10 +48,6 @@ func TestConfig_Load(t *testing.T) {
PushMessagePoolSize: 10,
ReplyPoolSize: 10,
MsgReqNumPerSecond: 5,
- RegistryName: "test",
- Cluster: "test",
- Env: "env",
- IDC: "idc1",
SessionExpiredInMills: 5 * time.Second,
SendMessageTimeout: 5 * time.Second,
}
diff --git a/eventmesh-server-go/config/grpc.go b/eventmesh-server-go/config/grpc.go
index c3acd3c82..12073eb10 100644
--- a/eventmesh-server-go/config/grpc.go
+++ b/eventmesh-server-go/config/grpc.go
@@ -43,18 +43,6 @@ type GRPCOption struct {
//MsgReqNumPerSecond
MsgReqNumPerSecond float64 `yaml:"msg-req-num-per-second" toml:"msg-req-num-per-second"`
- // RegistryName name for registry plugin support nacos or etcd
- RegistryName string `yaml:"registry-name" toml:"registry-name"`
-
- // Cluster cluster for grpc server
- Cluster string `yaml:"cluster" toml:"cluster"`
-
- // Env for env variable
- Env string `yaml:"env" toml:"env"`
-
- // IDC idc for grpc server
- IDC string `yaml:"idc" toml:"idc"`
-
// SessionExpiredInMills internal to clean the not work session consumer
SessionExpiredInMills time.Duration `yaml:"session-expired-in-mills"`
// SendMessageTimeout timeout in send message
diff --git a/eventmesh-server-go/config/testdata/test_config.yaml b/eventmesh-server-go/config/testdata/test_config.yaml
index ec10468b1..6527c7123 100644
--- a/eventmesh-server-go/config/testdata/test_config.yaml
+++ b/eventmesh-server-go/config/testdata/test_config.yaml
@@ -13,6 +13,11 @@
# See the License for the specific language governing permissions and
# limitations under the License.
+common:
+ cluster: "test"
+ registry-name: "test"
+ env: "env"
+ idc: "idc1"
server:
grpc:
port: 10010
@@ -27,10 +32,6 @@ server:
push-message-pool-size: 10
reply-pool-size: 10
msg-req-num-per-second: 5
- cluster: "test"
- registry-name: "test"
- env: "env"
- idc: "idc1"
session-expired-in-mills: 5s
send-message-timeout: 5s
http:
diff --git a/eventmesh-server-go/eventmesh.go b/eventmesh-server-go/eventmesh.go
index d13dd59d5..046b0ceb1 100644
--- a/eventmesh-server-go/eventmesh.go
+++ b/eventmesh-server-go/eventmesh.go
@@ -27,8 +27,8 @@ import (
_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/connector/standalone"
_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/database/mysql"
_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/metrics/prometheus"
- _ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/naming/nacos/registry"
_ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/protocol/cloudevents"
+ _ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry/nacos"
)
var confPath string
diff --git a/eventmesh-server-go/plugin/naming/nacos/registry/registry.go b/eventmesh-server-go/plugin/naming/nacos/registry/registry.go
deleted file mode 100644
index 9237c151f..000000000
--- a/eventmesh-server-go/plugin/naming/nacos/registry/registry.go
+++ /dev/null
@@ -1,170 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package registry
-
-import (
- "fmt"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/naming/registry"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
- "github.com/gogf/gf/util/gconv"
- "github.com/nacos-group/nacos-sdk-go/v2/clients"
- "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
- "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
- "github.com/nacos-group/nacos-sdk-go/v2/vo"
- "github.com/pkg/errors"
- "net"
- "strconv"
- "strings"
-)
-
-const (
- DefaultClusterName = "DEFAULT"
- DefaultGroupName = "DEFAULT_GROUP"
-)
-
-const (
- defaultConnectTimeout = 5000
- defaultWeight = 100
-)
-
-func init() {
- plugin.Register("nacos", &Registry{})
-}
-
-// Registry register service
-type Registry struct {
- Provider naming_client.INamingClient
- cfg *Config
- host string
- port int
-}
-
-// newRegistry 新建实例
-func newRegistry(provider naming_client.INamingClient, cfg *Config) *Registry {
- return &Registry{
- Provider: provider,
- cfg: cfg,
- }
-}
-
-// Type return registry type
-func (r *Registry) Type() string {
- return "registry"
-}
-
-// Setup setup config
-func (r *Registry) Setup(name string, configDec plugin.Decoder) error {
- if configDec == nil {
- return errors.New("registry config decoder empty")
- }
- conf := &PluginConfig{}
- if err := configDec.Decode(conf); err != nil {
- return err
- }
- return r.register(conf)
-}
-
-// Register registry service, application can invoke this method to register service to remote registry-server
-func (r *Registry) Register(_ string) error {
- host, portRaw, err := net.SplitHostPort(r.cfg.Address)
- if err != nil {
- return err
- }
- port, err := strconv.ParseInt(portRaw, 10, 64)
- if err != nil {
- return err
- }
- r.host = host
- r.port = gconv.Int(port)
- if r.cfg.Weight == 0 {
- r.cfg.Weight = defaultWeight
- }
- var req = vo.RegisterInstanceParam{
- Ip: r.host,
- Port: uint64(r.port),
- ServiceName: r.cfg.ServiceName,
- GroupName: DefaultGroupName,
- Healthy: true,
- Enable: true,
- Weight: gconv.Float64(r.cfg.Weight),
- }
- result, err := r.Provider.RegisterInstance(req)
- if err != nil {
- return errors.Wrap(err, "fail to Register instance")
- }
- if !result {
- return errors.New("fail to Register instance")
- }
- return nil
-}
-
-// Deregister de-registry service, application can invoke this method to de-register service to remote registry-server
-func (r *Registry) Deregister(_ string) error {
- var req = vo.DeregisterInstanceParam{
- Ip: r.host,
- Port: uint64(r.port),
- ServiceName: r.cfg.ServiceName,
- GroupName: DefaultGroupName,
- }
- result, err := r.Provider.DeregisterInstance(req)
- if err != nil {
- return errors.Wrap(err, "fail to Deregister instance")
- }
- if !result {
- return errors.New("fail to Deregister instance")
- }
- return nil
-}
-
-func (r *Registry) register(conf *PluginConfig) error {
- provider, err := r.newProvider(conf)
- if err != nil {
- return err
- }
- ip := util.GetIP()
- serverName := conf.ServiceName
- cfg := &Config{
- ServiceName: serverName,
- Address: fmt.Sprintf("%s:%v", ip, conf.Port),
- }
- registry.Register(serverName, newRegistry(provider, cfg))
- return nil
-}
-
-func (r *Registry) newProvider(cfg *PluginConfig) (naming_client.INamingClient, error) {
- var p vo.NacosClientParam
- var addresses []string
- if len(cfg.AddressList) > 0 {
- addresses = strings.Split(cfg.AddressList, ",")
- }
- for _, address := range addresses {
- ip, port, err := net.SplitHostPort(address)
- if err != nil {
- return nil, err
- }
- p.ServerConfigs = append(p.ServerConfigs, constant.ServerConfig{IpAddr: ip, Port: gconv.Uint64(port)})
- }
- p.ClientConfig = &constant.ClientConfig{
- TimeoutMs: defaultConnectTimeout,
- CacheDir: cfg.CacheDir,
- }
- provider, err := clients.NewNamingClient(p)
- if err != nil {
- return nil, err
- }
- return provider, nil
-}
diff --git a/eventmesh-server-go/plugin/naming/nacos/selector/selector.go b/eventmesh-server-go/plugin/naming/nacos/selector/selector.go
deleted file mode 100644
index 297ebc5b2..000000000
--- a/eventmesh-server-go/plugin/naming/nacos/selector/selector.go
+++ /dev/null
@@ -1,120 +0,0 @@
-// Licensed to the Apache Software Foundation (ASF) under one or more
-// contributor license agreements. See the NOTICE file distributed with
-// this work for additional information regarding copyright ownership.
-// The ASF licenses this file to You under the Apache License, Version 2.0
-// (the "License"); you may not use this file except in compliance with
-// the License. You may obtain a copy of the License at
-//
-// http://www.apache.org/licenses/LICENSE-2.0
-//
-// Unless required by applicable law or agreed to in writing, software
-// distributed under the License is distributed on an "AS IS" BASIS,
-// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-// See the License for the specific language governing permissions and
-// limitations under the License.
-
-package selector
-
-import (
- "errors"
- "fmt"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/naming/registry"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/naming/selector"
- "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
- nacos_registry "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/naming/nacos/registry"
- "github.com/gogf/gf/util/gconv"
- "github.com/nacos-group/nacos-sdk-go/v2/clients"
- "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
- "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
- "github.com/nacos-group/nacos-sdk-go/v2/vo"
- "net"
- "strconv"
- "strings"
-)
-
-const (
- defaultConnectTimeout = 5000
-)
-
-func init() {
- plugin.Register("nacos", &Selector{})
-}
-
-// Selector 路由选择器
-type Selector struct {
- Client naming_client.INamingClient
-}
-
-// newRegistry 新建实例
-func newSelector(client naming_client.INamingClient) *Selector {
- return &Selector{
- Client: client,
- }
-}
-
-// Type return registry type
-func (s *Selector) Type() string {
- return "selector"
-}
-
-// Setup setup config
-func (s *Selector) Setup(name string, configDec plugin.Decoder) error {
- if configDec == nil {
- return errors.New("selector config decoder empty")
- }
- conf := &PluginConfig{}
- if err := configDec.Decode(conf); err != nil {
- return err
- }
- client, err := s.newClient(conf)
- if err != nil {
- return err
- }
- selector.Register(config.GlobalConfig().Name, newSelector(client))
- return nil
-}
-
-// Select 选择服务节点
-func (s *Selector) Select(serviceName string) (*registry.Instance, error) {
- instanceReq := vo.SelectOneHealthInstanceParam{
- ServiceName: serviceName,
- GroupName: nacos_registry.DefaultGroupName,
- Clusters: []string{nacos_registry.DefaultClusterName},
- }
- instance, err := s.Client.SelectOneHealthyInstance(instanceReq)
- if err != nil {
- return nil, fmt.Errorf("get one instance err: %s", err.Error())
- }
- if instance == nil {
- return nil, fmt.Errorf("get one instance return empty")
- }
- return ®istry.Instance{
- Address: net.JoinHostPort(instance.Ip, strconv.Itoa(int(instance.Port))),
- ServiceName: serviceName,
- Weight: gconv.Int(instance.Weight),
- Clusters: instance.ClusterName,
- Metadata: instance.Metadata,
- }, nil
-}
-
-func (s *Selector) newClient(cfg *PluginConfig) (naming_client.INamingClient, error) {
- var p vo.NacosClientParam
- var addresses []string
- if len(cfg.AddressList) > 0 {
- addresses = strings.Split(cfg.AddressList, ",")
- }
- for _, address := range addresses {
- ip, port, err := net.SplitHostPort(address)
- if err != nil {
- return nil, err
- }
- p.ServerConfigs = append(p.ServerConfigs, constant.ServerConfig{IpAddr: ip, Port: gconv.Uint64(port)})
- }
- p.ClientConfig = &constant.ClientConfig{TimeoutMs: defaultConnectTimeout}
- provider, err := clients.NewNamingClient(p)
- if err != nil {
- return nil, err
- }
- return provider, nil
-}
diff --git a/eventmesh-server-go/plugin/naming/nacos/registry/config.go b/eventmesh-server-go/plugin/registry/model.go
similarity index 56%
copy from eventmesh-server-go/plugin/naming/nacos/registry/config.go
copy to eventmesh-server-go/plugin/registry/model.go
index 11fd86c23..495331209 100644
--- a/eventmesh-server-go/plugin/naming/nacos/registry/config.go
+++ b/eventmesh-server-go/plugin/registry/model.go
@@ -15,18 +15,28 @@
package registry
-// Config registry config
-type Config struct {
- ServiceName string
- Weight int
- Address string
- Metadata map[string]string
+import "time"
+
+type EventMeshDataInfo struct {
+ EventMeshClusterName string
+ EventMeshName string
+ Endpoint string
+ LastUpdateTimestamp time.Time
+ Metadata map[string]string
+}
+
+type EventMeshRegisterInfo struct {
+ EventMeshClusterName string
+ EventMeshName string
+ EndPoint string
+ EventMeshInstanceNumMap map[string]map[string]int
+ Metadata map[string]string
+ ProtocolType string
}
-// PluginConfig define registry plugin config
-type PluginConfig struct {
- ServiceName string `yaml:"service_name"`
- CacheDir string `yaml:"cache-dir"`
- Port string `yaml:"port"` // nacos server port
- AddressList string `yaml:"address_list"` // nacos server address list
+type EventMeshUnRegisterInfo struct {
+ EventMeshClusterName string
+ EventMeshName string
+ EndPoint string
+ ProtocolType string
}
diff --git a/eventmesh-server-go/plugin/naming/nacos/selector/config.go b/eventmesh-server-go/plugin/registry/nacos/config.go
similarity index 74%
rename from eventmesh-server-go/plugin/naming/nacos/selector/config.go
rename to eventmesh-server-go/plugin/registry/nacos/config.go
index 95cbb8280..51a95744f 100644
--- a/eventmesh-server-go/plugin/naming/nacos/selector/config.go
+++ b/eventmesh-server-go/plugin/registry/nacos/config.go
@@ -13,18 +13,12 @@
// See the License for the specific language governing permissions and
// limitations under the License.
-package selector
+package nacos
-// Config selector config
+// Config define registry nacos plugin config
type Config struct {
- ServiceName string
- Weight int
- ClusterName string
- GroupName string
- Metadata map[string]string
-}
-
-// PluginConfig define selector plugin config
-type PluginConfig struct {
- AddressList string `yaml:"address_list"`
+ ServiceName string `yaml:"service-name"`
+ CacheDir string `yaml:"cache-dir"`
+ Port string `yaml:"port"` // nacos server port
+ AddressList string `yaml:"address-list"` // nacos server address list
}
diff --git a/eventmesh-server-go/plugin/registry/nacos/mocks/mock_naming_client_interface.go b/eventmesh-server-go/plugin/registry/nacos/mocks/mock_naming_client_interface.go
new file mode 100644
index 000000000..84082c4d5
--- /dev/null
+++ b/eventmesh-server-go/plugin/registry/nacos/mocks/mock_naming_client_interface.go
@@ -0,0 +1,211 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: ./naming_client_interface.go
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+ reflect "reflect"
+
+ gomock "github.com/golang/mock/gomock"
+ model "github.com/nacos-group/nacos-sdk-go/v2/model"
+ vo "github.com/nacos-group/nacos-sdk-go/v2/vo"
+)
+
+// MockINamingClient is a mock of INamingClient interface.
+type MockINamingClient struct {
+ ctrl *gomock.Controller
+ recorder *MockINamingClientMockRecorder
+}
+
+// MockINamingClientMockRecorder is the mock recorder for MockINamingClient.
+type MockINamingClientMockRecorder struct {
+ mock *MockINamingClient
+}
+
+// NewMockINamingClient creates a new mock instance.
+func NewMockINamingClient(ctrl *gomock.Controller) *MockINamingClient {
+ mock := &MockINamingClient{ctrl: ctrl}
+ mock.recorder = &MockINamingClientMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockINamingClient) EXPECT() *MockINamingClientMockRecorder {
+ return m.recorder
+}
+
+// CloseClient mocks base method.
+func (m *MockINamingClient) CloseClient() {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "CloseClient")
+}
+
+// CloseClient indicates an expected call of CloseClient.
+func (mr *MockINamingClientMockRecorder) CloseClient() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "CloseClient", reflect.TypeOf((*MockINamingClient)(nil).CloseClient))
+}
+
+// DeregisterInstance mocks base method.
+func (m *MockINamingClient) DeregisterInstance(param vo.DeregisterInstanceParam) (bool, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "DeregisterInstance", param)
+ ret0, _ := ret[0].(bool)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// DeregisterInstance indicates an expected call of DeregisterInstance.
+func (mr *MockINamingClientMockRecorder) DeregisterInstance(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "DeregisterInstance", reflect.TypeOf((*MockINamingClient)(nil).DeregisterInstance), param)
+}
+
+// GetAllServicesInfo mocks base method.
+func (m *MockINamingClient) GetAllServicesInfo(param vo.GetAllServiceInfoParam) (model.ServiceList, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "GetAllServicesInfo", param)
+ ret0, _ := ret[0].(model.ServiceList)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// GetAllServicesInfo indicates an expected call of GetAllServicesInfo.
+func (mr *MockINamingClientMockRecorder) GetAllServicesInfo(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetAllServicesInfo", reflect.TypeOf((*MockINamingClient)(nil).GetAllServicesInfo), param)
+}
+
+// GetService mocks base method.
+func (m *MockINamingClient) GetService(param vo.GetServiceParam) (model.Service, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "GetService", param)
+ ret0, _ := ret[0].(model.Service)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// GetService indicates an expected call of GetService.
+func (mr *MockINamingClientMockRecorder) GetService(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetService", reflect.TypeOf((*MockINamingClient)(nil).GetService), param)
+}
+
+// RegisterInstance mocks base method.
+func (m *MockINamingClient) RegisterInstance(param vo.RegisterInstanceParam) (bool, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "RegisterInstance", param)
+ ret0, _ := ret[0].(bool)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// RegisterInstance indicates an expected call of RegisterInstance.
+func (mr *MockINamingClientMockRecorder) RegisterInstance(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RegisterInstance", reflect.TypeOf((*MockINamingClient)(nil).RegisterInstance), param)
+}
+
+// SelectAllInstances mocks base method.
+func (m *MockINamingClient) SelectAllInstances(param vo.SelectAllInstancesParam) ([]model.Instance, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "SelectAllInstances", param)
+ ret0, _ := ret[0].([]model.Instance)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// SelectAllInstances indicates an expected call of SelectAllInstances.
+func (mr *MockINamingClientMockRecorder) SelectAllInstances(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectAllInstances", reflect.TypeOf((*MockINamingClient)(nil).SelectAllInstances), param)
+}
+
+// SelectInstances mocks base method.
+func (m *MockINamingClient) SelectInstances(param vo.SelectInstancesParam) ([]model.Instance, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "SelectInstances", param)
+ ret0, _ := ret[0].([]model.Instance)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// SelectInstances indicates an expected call of SelectInstances.
+func (mr *MockINamingClientMockRecorder) SelectInstances(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectInstances", reflect.TypeOf((*MockINamingClient)(nil).SelectInstances), param)
+}
+
+// SelectOneHealthyInstance mocks base method.
+func (m *MockINamingClient) SelectOneHealthyInstance(param vo.SelectOneHealthInstanceParam) (*model.Instance, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "SelectOneHealthyInstance", param)
+ ret0, _ := ret[0].(*model.Instance)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// SelectOneHealthyInstance indicates an expected call of SelectOneHealthyInstance.
+func (mr *MockINamingClientMockRecorder) SelectOneHealthyInstance(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SelectOneHealthyInstance", reflect.TypeOf((*MockINamingClient)(nil).SelectOneHealthyInstance), param)
+}
+
+// Subscribe mocks base method.
+func (m *MockINamingClient) Subscribe(param *vo.SubscribeParam) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Subscribe", param)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// Subscribe indicates an expected call of Subscribe.
+func (mr *MockINamingClientMockRecorder) Subscribe(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockINamingClient)(nil).Subscribe), param)
+}
+
+// Unsubscribe mocks base method.
+func (m *MockINamingClient) Unsubscribe(param *vo.SubscribeParam) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Unsubscribe", param)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// Unsubscribe indicates an expected call of Unsubscribe.
+func (mr *MockINamingClientMockRecorder) Unsubscribe(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsubscribe", reflect.TypeOf((*MockINamingClient)(nil).Unsubscribe), param)
+}
+
+// UpdateInstance mocks base method.
+func (m *MockINamingClient) UpdateInstance(param vo.UpdateInstanceParam) (bool, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "UpdateInstance", param)
+ ret0, _ := ret[0].(bool)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// UpdateInstance indicates an expected call of UpdateInstance.
+func (mr *MockINamingClientMockRecorder) UpdateInstance(param interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "UpdateInstance", reflect.TypeOf((*MockINamingClient)(nil).UpdateInstance), param)
+}
diff --git a/eventmesh-server-go/plugin/registry/nacos/nacos.go b/eventmesh-server-go/plugin/registry/nacos/nacos.go
new file mode 100644
index 000000000..052fc195d
--- /dev/null
+++ b/eventmesh-server-go/plugin/registry/nacos/nacos.go
@@ -0,0 +1,241 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package nacos
+
+import (
+ "fmt"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry"
+ "github.com/gogf/gf/util/gconv"
+ "github.com/nacos-group/nacos-sdk-go/v2/clients"
+ "github.com/nacos-group/nacos-sdk-go/v2/clients/naming_client"
+ "github.com/nacos-group/nacos-sdk-go/v2/common/constant"
+ "github.com/nacos-group/nacos-sdk-go/v2/vo"
+ "go.uber.org/atomic"
+ "net"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+)
+
+var (
+ defaultConnectTimeout = 5000 * time.Millisecond
+ DefaultClusterName = "DEFAULT"
+ DefaultGroupName = "DEFAULT_GROUP"
+ DefaultWeight float64 = 10
+ protoList = []string{"TCP", "HTTP", "GRPC"}
+)
+
+func init() {
+ plugin.Register("nacos", &Registry{})
+}
+
+type Registry struct {
+ initStatus *atomic.Bool
+ startStatus *atomic.Bool
+ cfg *Config
+ client naming_client.INamingClient
+ registryInfos *sync.Map // map[string]*registry.EventMeshRegisterInfo
+}
+
+func (r *Registry) Type() string {
+ return registry.Type
+}
+
+func (r *Registry) Setup(name string, dec plugin.Decoder) error {
+ conf := &Config{}
+ if err := dec.Decode(conf); err != nil {
+ return err
+ }
+ r.cfg = conf
+ return nil
+}
+
+func (r *Registry) Init() error {
+ r.initStatus = atomic.NewBool(true)
+ r.startStatus = atomic.NewBool(false)
+ r.registryInfos = new(sync.Map)
+ return nil
+}
+
+func (r *Registry) Start() error {
+ var p vo.NacosClientParam
+ var addresses []string
+
+ if len(r.cfg.AddressList) > 0 {
+ addresses = strings.Split(r.cfg.AddressList, ",")
+ }
+
+ for _, address := range addresses {
+ ip, port, err := net.SplitHostPort(address)
+ if err != nil {
+ return err
+ }
+ p.ServerConfigs = append(p.ServerConfigs, constant.ServerConfig{IpAddr: ip, Port: gconv.Uint64(port)})
+ }
+
+ p.ClientConfig = &constant.ClientConfig{
+ TimeoutMs: uint64(defaultConnectTimeout.Milliseconds()),
+ CacheDir: r.cfg.CacheDir,
+ }
+ cli, err := clients.NewNamingClient(p)
+ if err != nil {
+ return err
+ }
+
+ r.startStatus.Store(true)
+ r.client = cli
+ return nil
+}
+
+func (r *Registry) Shutdown() error {
+ r.startStatus.Store(false)
+ r.initStatus.Store(false)
+ r.client.CloseClient()
+ return nil
+}
+
+func (r *Registry) FindEventMeshInfoByCluster(clusterName string) ([]*registry.EventMeshDataInfo, error) {
+ var infos []*registry.EventMeshDataInfo
+ meshServerName := config.GlobalConfig().Common.Name
+ cluster := config.GlobalConfig().Common.Cluster
+ for _, proto := range protoList {
+ ins, err := r.client.SelectInstances(vo.SelectInstancesParam{
+ Clusters: []string{clusterName},
+ ServiceName: fmt.Sprintf("%v-%v", meshServerName, proto),
+ GroupName: cluster,
+ HealthyOnly: true,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if len(ins) == 0 {
+ continue
+ }
+ for _, in := range ins {
+ infos = append(infos, ®istry.EventMeshDataInfo{
+ EventMeshClusterName: in.ClusterName,
+ EventMeshName: in.ServiceName,
+ Endpoint: fmt.Sprintf("%v:%v", in.Ip, in.Port),
+ LastUpdateTimestamp: time.Time{},
+ Metadata: in.Metadata,
+ })
+ }
+ }
+
+ return infos, nil
+}
+
+func (r *Registry) FindAllEventMeshInfo() ([]*registry.EventMeshDataInfo, error) {
+ var infos []*registry.EventMeshDataInfo
+ meshServerName := config.GlobalConfig().Common.Name
+
+ for _, proto := range protoList {
+ ins, err := r.client.SelectInstances(vo.SelectInstancesParam{
+ Clusters: []string{},
+ ServiceName: fmt.Sprintf("%v-%v", meshServerName, proto),
+ GroupName: "GROUP",
+ HealthyOnly: true,
+ })
+ if err != nil {
+ return nil, err
+ }
+ if len(ins) == 0 {
+ continue
+ }
+ for _, in := range ins {
+ infos = append(infos, ®istry.EventMeshDataInfo{
+ EventMeshClusterName: in.ClusterName,
+ EventMeshName: in.ServiceName,
+ Endpoint: fmt.Sprintf("%v:%v", in.Ip, in.Port),
+ LastUpdateTimestamp: time.Time{},
+ Metadata: in.Metadata,
+ })
+ }
+ }
+
+ return infos, nil
+}
+
+// FindEventMeshClientDistributionData not used
+// deprecate
+func (r *Registry) FindEventMeshClientDistributionData(clusterName, group, purpose string) (map[string]map[string]int, error) {
+ return nil, nil
+}
+
+func (r *Registry) RegisterMetadata(map[string]string) {
+ r.registryInfos.Range(func(key, value any) bool {
+ r.Register(value.(*registry.EventMeshRegisterInfo))
+ return true
+ })
+}
+
+func (r *Registry) Register(info *registry.EventMeshRegisterInfo) error {
+ ipPort := strings.Split(info.EndPoint, ":")
+ if len(ipPort) != 2 {
+ return fmt.Errorf("endpoint format err")
+ }
+ ip := ipPort[0]
+ port, err := strconv.ParseInt(ipPort[1], 10, 64)
+ if err != nil {
+ return err
+ }
+
+ _, err = r.client.RegisterInstance(vo.RegisterInstanceParam{
+ Ip: ip,
+ Port: uint64(port),
+ ServiceName: info.EventMeshName,
+ GroupName: uniqGroupName(info.ProtocolType),
+ Healthy: true,
+ Enable: true,
+ Weight: DefaultWeight,
+ })
+ if err != nil {
+ return err
+ }
+ r.registryInfos.Store(info.EventMeshName, info)
+ return nil
+}
+
+func (r *Registry) UnRegister(info *registry.EventMeshUnRegisterInfo) error {
+ ipPort := strings.Split(info.EndPoint, ":")
+ if len(ipPort) != 2 {
+ return fmt.Errorf("endpoint format err")
+ }
+ ip := ipPort[0]
+ port, err := strconv.ParseInt(ipPort[1], 10, 64)
+ if err != nil {
+ return err
+ }
+
+ _, err = r.client.DeregisterInstance(vo.DeregisterInstanceParam{
+ Ip: ip,
+ Port: uint64(port),
+ ServiceName: info.EventMeshName,
+ GroupName: uniqGroupName(info.ProtocolType),
+ })
+ if err != nil {
+ return err
+ }
+ r.registryInfos.Delete(info.EventMeshName)
+ return nil
+}
+
+func uniqGroupName(protoType string) string {
+ return fmt.Sprintf("%s-GROUP", protoType)
+}
diff --git a/eventmesh-server-go/plugin/registry/nacos/nacos_test.go b/eventmesh-server-go/plugin/registry/nacos/nacos_test.go
new file mode 100644
index 000000000..fa4aff3a4
--- /dev/null
+++ b/eventmesh-server-go/plugin/registry/nacos/nacos_test.go
@@ -0,0 +1,314 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package nacos
+
+import (
+ "fmt"
+ "github.com/nacos-group/nacos-sdk-go/v2/model"
+ "os"
+ "path/filepath"
+ "testing"
+
+ "github.com/golang/mock/gomock"
+ "github.com/nacos-group/nacos-sdk-go/v2/vo"
+ "github.com/stretchr/testify/assert"
+
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin/registry/nacos/mocks"
+)
+
+func Test_Init(t *testing.T) {
+ r := &Registry{}
+ assert.NoError(t, r.Init())
+ assert.True(t, r.initStatus.Load())
+ assert.False(t, r.startStatus.Load())
+}
+
+func Test_Start(t *testing.T) {
+ t.Run("create nameing client", func(t *testing.T) {
+ cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+ assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+ defer os.RemoveAll(cacheDir)
+
+ r := &Registry{
+ cfg: &Config{
+ ServiceName: "test",
+ CacheDir: "/tmp/",
+ Port: "8088",
+ AddressList: "127.0.0.1:8081",
+ },
+ }
+ assert.NoError(t, r.Init())
+ ctrl := gomock.NewController(t)
+ mocks.NewMockINamingClient(ctrl)
+ assert.NoError(t, r.Start())
+ assert.NoError(t, r.Shutdown())
+ assert.False(t, r.initStatus.Load())
+ assert.False(t, r.startStatus.Load())
+ })
+}
+
+func Test_Register(t *testing.T) {
+ cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+ assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+ defer os.RemoveAll(cacheDir)
+
+ r := &Registry{
+ cfg: &Config{
+ ServiceName: "test",
+ CacheDir: "/tmp/",
+ Port: "8088",
+ AddressList: "127.0.0.1:8081",
+ },
+ }
+ assert.NoError(t, r.Init())
+ assert.True(t, r.initStatus.Load())
+ assert.False(t, r.startStatus.Load())
+ assert.NoError(t, r.Start())
+ ctrl := gomock.NewController(t)
+ nameclient := mocks.NewMockINamingClient(ctrl)
+ meshInfo := ®istry.EventMeshRegisterInfo{
+ EventMeshClusterName: "test",
+ EventMeshName: "test",
+ EndPoint: "127.0.0.1:3333",
+ EventMeshInstanceNumMap: map[string]map[string]int{},
+ Metadata: map[string]string{},
+ ProtocolType: "GRPC",
+ }
+ nameclient.EXPECT().RegisterInstance(vo.RegisterInstanceParam{
+ Ip: "127.0.0.1",
+ Port: 3333,
+ ServiceName: "test",
+ GroupName: uniqGroupName("GRPC"),
+ Healthy: true,
+ Enable: true,
+ Weight: DefaultWeight,
+ }).Return(true, nil).Times(1)
+ r.client = nameclient
+ err := r.Register(meshInfo)
+ assert.NoError(t, err)
+ val, ok := r.registryInfos.Load(meshInfo.EventMeshName)
+ assert.True(t, ok)
+ assert.NotNil(t, val)
+}
+
+func Test_DeRegister(t *testing.T) {
+ cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+ assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+ defer os.RemoveAll(cacheDir)
+
+ r := &Registry{
+ cfg: &Config{
+ ServiceName: "test",
+ CacheDir: "/tmp/",
+ Port: "8088",
+ AddressList: "127.0.0.1:8081",
+ },
+ }
+ assert.NoError(t, r.Init())
+ assert.True(t, r.initStatus.Load())
+ assert.False(t, r.startStatus.Load())
+ assert.NoError(t, r.Start())
+ ctrl := gomock.NewController(t)
+ nameclient := mocks.NewMockINamingClient(ctrl)
+ meshInfo := ®istry.EventMeshRegisterInfo{
+ EventMeshClusterName: "test",
+ EventMeshName: "test",
+ EndPoint: "127.0.0.1:3333",
+ EventMeshInstanceNumMap: map[string]map[string]int{},
+ Metadata: map[string]string{},
+ ProtocolType: "GRPC",
+ }
+ nameclient.EXPECT().RegisterInstance(vo.RegisterInstanceParam{
+ Ip: "127.0.0.1",
+ Port: 3333,
+ ServiceName: "test",
+ GroupName: uniqGroupName("GRPC"),
+ Healthy: true,
+ Enable: true,
+ Weight: DefaultWeight,
+ }).Return(true, nil).Times(1)
+ r.client = nameclient
+ err := r.Register(meshInfo)
+ assert.NoError(t, err)
+ val, ok := r.registryInfos.Load(meshInfo.EventMeshName)
+ assert.True(t, ok)
+ assert.NotNil(t, val)
+
+ unmeshInfo := ®istry.EventMeshUnRegisterInfo{
+ EventMeshClusterName: "test",
+ EventMeshName: "test",
+ EndPoint: "127.0.0.1:3333",
+ ProtocolType: "GRPC",
+ }
+ nameclient.EXPECT().DeregisterInstance(vo.DeregisterInstanceParam{
+ Ip: "127.0.0.1",
+ Port: 3333,
+ ServiceName: meshInfo.EventMeshName,
+ GroupName: uniqGroupName("GRPC"),
+ }).Return(true, nil).Times(1)
+ err = r.UnRegister(unmeshInfo)
+ assert.NoError(t, err)
+ val, ok = r.registryInfos.Load(meshInfo.EventMeshName)
+ assert.False(t, ok)
+ assert.Nil(t, val)
+}
+
+func Test_FindEventMeshInfoByCluster(t *testing.T) {
+ cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+ assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+ defer os.RemoveAll(cacheDir)
+
+ r := &Registry{
+ cfg: &Config{
+ ServiceName: "test",
+ CacheDir: "/tmp/",
+ Port: "8088",
+ AddressList: "127.0.0.1:8081",
+ },
+ }
+ assert.NoError(t, r.Init())
+ assert.True(t, r.initStatus.Load())
+ assert.False(t, r.startStatus.Load())
+ assert.NoError(t, r.Start())
+
+ t.Run("return empty", func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ nameclient := mocks.NewMockINamingClient(ctrl)
+ r.client = nameclient
+ protoList = []string{"GRPC"}
+ nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+ Clusters: []string{"test"},
+ ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+ GroupName: "1",
+ HealthyOnly: true,
+ }).Return([]model.Instance{}, nil).AnyTimes()
+ val, err := r.FindEventMeshInfoByCluster("test")
+ assert.NoError(t, err)
+ assert.Equal(t, len(val), 0)
+ })
+
+ t.Run("return 1 instance", func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ nameclient := mocks.NewMockINamingClient(ctrl)
+ r.client = nameclient
+ protoList = []string{"GRPC"}
+ nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+ Clusters: []string{"test"},
+ ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+ GroupName: "1",
+ HealthyOnly: true,
+ }).Return([]model.Instance{
+ {
+ InstanceId: "1",
+ },
+ }, nil).AnyTimes()
+ val, err := r.FindEventMeshInfoByCluster("test")
+ assert.NoError(t, err)
+ assert.Equal(t, len(val), 1)
+ })
+
+ t.Run("return err", func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ nameclient := mocks.NewMockINamingClient(ctrl)
+ r.client = nameclient
+ protoList = []string{"GRPC"}
+ mockErr := fmt.Errorf("mock err")
+ nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+ Clusters: []string{"test"},
+ ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+ GroupName: "1",
+ HealthyOnly: true,
+ }).Return(nil, mockErr).AnyTimes()
+ val, err := r.FindEventMeshInfoByCluster("test")
+ assert.Error(t, err)
+ assert.Equal(t, err, mockErr)
+ assert.Nil(t, val)
+ })
+}
+
+func Test_FindAllEventMeshInfo(t *testing.T) {
+ cacheDir := filepath.Join(os.TempDir(), "nacos-test-cache-dir")
+ assert.NoError(t, os.MkdirAll(cacheDir, os.ModePerm))
+ defer os.RemoveAll(cacheDir)
+
+ r := &Registry{
+ cfg: &Config{
+ ServiceName: "test",
+ CacheDir: "/tmp/",
+ Port: "8088",
+ AddressList: "127.0.0.1:8081",
+ },
+ }
+ assert.NoError(t, r.Init())
+ assert.True(t, r.initStatus.Load())
+ assert.False(t, r.startStatus.Load())
+ assert.NoError(t, r.Start())
+
+ t.Run("return empty", func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ nameclient := mocks.NewMockINamingClient(ctrl)
+ r.client = nameclient
+ protoList = []string{"GRPC"}
+ nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+ Clusters: []string{},
+ ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+ GroupName: "GROUP",
+ HealthyOnly: true,
+ }).Return([]model.Instance{}, nil).AnyTimes()
+ val, err := r.FindAllEventMeshInfo()
+ assert.NoError(t, err)
+ assert.Equal(t, len(val), 0)
+ })
+
+ t.Run("return 1 instance", func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ nameclient := mocks.NewMockINamingClient(ctrl)
+ r.client = nameclient
+ protoList = []string{"GRPC"}
+ nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+ Clusters: []string{},
+ ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+ GroupName: "GROUP",
+ HealthyOnly: true,
+ }).Return([]model.Instance{
+ {
+ InstanceId: "1",
+ },
+ }, nil).AnyTimes()
+ val, err := r.FindAllEventMeshInfo()
+ assert.NoError(t, err)
+ assert.Equal(t, len(val), 1)
+ })
+
+ t.Run("return err", func(t *testing.T) {
+ ctrl := gomock.NewController(t)
+ nameclient := mocks.NewMockINamingClient(ctrl)
+ r.client = nameclient
+ protoList = []string{"GRPC"}
+ mockErr := fmt.Errorf("mock err")
+ nameclient.EXPECT().SelectInstances(vo.SelectInstancesParam{
+ Clusters: []string{},
+ ServiceName: fmt.Sprintf("%v-%v", "eventmesh-server", "GRPC"),
+ GroupName: "GROUP",
+ HealthyOnly: true,
+ }).Return(nil, mockErr).AnyTimes()
+ val, err := r.FindAllEventMeshInfo()
+ assert.Error(t, err)
+ assert.Equal(t, err, mockErr)
+ assert.Nil(t, val)
+ })
+}
diff --git a/eventmesh-server-go/plugin/naming/nacos/registry/config.go b/eventmesh-server-go/plugin/registry/registry.go
similarity index 63%
copy from eventmesh-server-go/plugin/naming/nacos/registry/config.go
copy to eventmesh-server-go/plugin/registry/registry.go
index 11fd86c23..4b6f5d5d2 100644
--- a/eventmesh-server-go/plugin/naming/nacos/registry/config.go
+++ b/eventmesh-server-go/plugin/registry/registry.go
@@ -15,18 +15,16 @@
package registry
-// Config registry config
-type Config struct {
- ServiceName string
- Weight int
- Address string
- Metadata map[string]string
-}
+var Type = "registry"
-// PluginConfig define registry plugin config
-type PluginConfig struct {
- ServiceName string `yaml:"service_name"`
- CacheDir string `yaml:"cache-dir"`
- Port string `yaml:"port"` // nacos server port
- AddressList string `yaml:"address_list"` // nacos server address list
+type Interface interface {
+ Init() error
+ Start() error
+ Shutdown() error
+ FindEventMeshInfoByCluster(clusterName string) ([]*EventMeshDataInfo, error)
+ FindAllEventMeshInfo() ([]*EventMeshDataInfo, error)
+ FindEventMeshClientDistributionData(clusterName, group, purpose string) (map[string]map[string]int, error)
+ RegisterMetadata(map[string]string)
+ Register(info *EventMeshRegisterInfo) error
+ UnRegister(info *EventMeshUnRegisterInfo) error
}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_group_client.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_group_client.go
index dac62caa0..27935c8bd 100644
--- a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_group_client.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_group_client.go
@@ -46,8 +46,8 @@ type GroupClient struct {
func DefaultStreamGroupClient() *GroupClient {
hostname, _ := os.Hostname()
return &GroupClient{
- ENV: config.GlobalConfig().Server.GRPCOption.Env,
- IDC: config.GlobalConfig().Server.GRPCOption.IDC,
+ ENV: config.GlobalConfig().Common.Env,
+ IDC: config.GlobalConfig().Common.IDC,
ConsumerGroup: "ConsumerGroup",
Topic: "Topic",
GRPCType: consts.STREAM,
@@ -66,8 +66,8 @@ func DefaultStreamGroupClient() *GroupClient {
func DefaultWebhookGroupClient() *GroupClient {
hostname, _ := os.Hostname()
return &GroupClient{
- ENV: config.GlobalConfig().Server.GRPCOption.Env,
- IDC: config.GlobalConfig().Server.GRPCOption.IDC,
+ ENV: config.GlobalConfig().Common.Env,
+ IDC: config.GlobalConfig().Common.IDC,
ConsumerGroup: "ConsumerGroup",
Topic: "Topic",
GRPCType: consts.WEBHOOK,
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_mesh.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_mesh.go
index 49fcb729a..ac766af17 100644
--- a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_mesh.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_mesh.go
@@ -92,9 +92,9 @@ func (e *eventMeshConsumer) Init() error {
persistProps := make(map[string]string)
persistProps["isBroadcast"] = "false"
persistProps["consumerGroup"] = e.ConsumerGroup
- persistProps["eventMeshIDC"] = config.GlobalConfig().Server.GRPCOption.IDC
+ persistProps["eventMeshIDC"] = config.GlobalConfig().Common.IDC
persistProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
- config.GlobalConfig().Server.GRPCOption.Cluster)
+ config.GlobalConfig().Common.Cluster)
if err := e.persistentConsumer.Init(persistProps); err != nil {
return err
}
@@ -104,9 +104,9 @@ func (e *eventMeshConsumer) Init() error {
broadcastProps := make(map[string]string)
broadcastProps["isBroadcast"] = "false"
broadcastProps["consumerGroup"] = e.ConsumerGroup
- broadcastProps["eventMeshIDC"] = config.GlobalConfig().Server.GRPCOption.IDC
+ broadcastProps["eventMeshIDC"] = config.GlobalConfig().Common.IDC
broadcastProps["instanceName"] = util.BuildMeshClientID(e.ConsumerGroup,
- config.GlobalConfig().Server.GRPCOption.Cluster)
+ config.GlobalConfig().Common.Cluster)
if err := e.broadcastConsumer.Init(broadcastProps); err != nil {
return err
}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_processor_test.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_processor_test.go
new file mode 100644
index 000000000..528290010
--- /dev/null
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/consumer_processor_test.go
@@ -0,0 +1,186 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+package consumer
+
+import (
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/config"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/pkg/util"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/plugin"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/core/protocol/grpc/validator"
+ "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ "github.com/stretchr/testify/assert"
+ "testing"
+)
+
+func Test_processor_Subscribe(t *testing.T) {
+ createHeader := func() *pb.RequestHeader {
+ return &pb.RequestHeader{
+ Env: "env",
+ Region: "sh",
+ Idc: "Idc",
+ Ip: util.GetIP(),
+ Pid: util.GetIP(),
+ Sys: "Sys",
+ Username: "em",
+ Password: "pw",
+ Language: "go",
+ ProtocolType: "cloudeevents",
+ ProtocolVersion: "v1",
+ ProtocolDesc: "for mock",
+ }
+ }
+ tests := []struct {
+ name string
+ expect func(t *testing.T)
+ }{
+ {
+ name: "header no idc err",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ hdr.Idc = ""
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+ assert.Equal(t, err, validator.ErrHeaderNoIDC)
+ },
+ },
+ {
+ name: "header no ip err",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ hdr.Ip = ""
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+ assert.Equal(t, err, validator.ErrHeaderNoIP)
+ },
+ },
+ {
+ name: "header no env err",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ hdr.Env = ""
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+ assert.Equal(t, err, validator.ErrHeaderNoENV)
+ },
+ },
+ {
+ name: "header no pid err",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ hdr.Pid = ""
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+ assert.Equal(t, err, validator.ErrHeaderNoPID)
+ },
+ },
+ {
+ name: "header no sys err",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ hdr.Sys = ""
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+ assert.Equal(t, err, validator.ErrHeaderNoSYS)
+ },
+ },
+ {
+ name: "header no username err",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ hdr.Username = ""
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+ assert.Equal(t, err, validator.ErrHeaderNoUser)
+ },
+ },
+ {
+ name: "header no passwd err",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ hdr.Password = ""
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+ assert.Equal(t, err, validator.ErrHeaderNoPASSWD)
+ },
+ },
+ {
+ name: "header no language err",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ hdr.Language = ""
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr})
+ assert.Equal(t, err, validator.ErrHeaderNoLANG)
+ },
+ },
+ {
+ name: "webhook no url",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ _, err := p.Subscribe(nil, &pb.Subscription{Header: hdr, Url: ""})
+ assert.Equal(t, err, validator.ErrSubscriptionNoURL)
+ },
+ },
+ {
+ name: "stream no topic",
+ expect: func(t *testing.T) {
+ p := &processor{}
+ hdr := createHeader()
+ _, err := p.Subscribe(nil, &pb.Subscription{
+ Header: hdr,
+ Url: "http://mock.com",
+ })
+ assert.Equal(t, err, validator.ErrSubscriptionNoItem)
+ },
+ },
+ {
+ name: "register client err",
+ expect: func(t *testing.T) {
+ mockMgr, _ := NewConsumerManager()
+ hdr := createHeader()
+ req := &pb.Subscription{
+ Header: hdr,
+ Url: "http://mock.com",
+ SubscriptionItems: []*pb.Subscription_SubscriptionItem{
+ {
+ Topic: "test-topic",
+ Mode: pb.Subscription_SubscriptionItem_CLUSTERING,
+ Type: pb.Subscription_SubscriptionItem_ASYNC,
+ },
+ },
+ }
+ p := &processor{}
+ resp, err := p.Subscribe(mockMgr, req)
+ assert.NoError(t, err)
+ t.Log(resp.String())
+ },
+ },
+ }
+
+ err := config.GlobalConfig().Plugins.Setup()
+ assert.NoError(t, err)
+ plugin.SetActivePlugin(config.GlobalConfig().ActivePlugins)
+ for _, tc := range tests {
+ t.Run(tc.name, func(t *testing.T) {
+ tc.expect(t)
+ })
+ }
+}
+
+func Test_processor_Unsubscribe(t *testing.T) {
+
+}
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/message_request.go b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/message_request.go
index e32d4dbf6..19fb90445 100644
--- a/eventmesh-server-go/runtime/core/protocol/grpc/consumer/message_request.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/consumer/message_request.go
@@ -156,10 +156,10 @@ func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error) {
httpHdr.Set(grpc.REQUEST_CODE, grpc.HTTP_PUSH_CLIENT_ASYNC)
httpHdr.Set(grpc.LANGUAGE, "Go")
httpHdr.Set(grpc.Version, "1.0")
- httpHdr.Set(grpc.EVENTMESHCLUSTER, config.GlobalConfig().Server.Cluster)
- httpHdr.Set(grpc.EVENTMESHENV, config.GlobalConfig().Server.Env)
+ httpHdr.Set(grpc.EVENTMESHCLUSTER, config.GlobalConfig().Common.Cluster)
+ httpHdr.Set(grpc.EVENTMESHENV, config.GlobalConfig().Common.Env)
httpHdr.Set(grpc.EVENTMESHIP, util.GetIP())
- httpHdr.Set(grpc.EVENTMESHIDC, config.GlobalConfig().Server.IDC)
+ httpHdr.Set(grpc.EVENTMESHIDC, config.GlobalConfig().Common.IDC)
httpHdr.Set(grpc.PROTOCOL_TYPE, hr.SimpleMessage.Header.ProtocolType)
httpHdr.Set(grpc.PROTOCOL_DESC, hr.SimpleMessage.Header.ProtocolDesc)
httpHdr.Set(grpc.PROTOCOL_VERSION, hr.SimpleMessage.Header.ProtocolVersion)
@@ -212,7 +212,7 @@ func NewWebhookRequest(mctx *MessageContext) (*WebhookRequest, error) {
func (w *WebhookRequest) getURLs() []string {
var (
urls []string
- currentIDC = config.GlobalConfig().Server.GRPCOption.IDC
+ currentIDC = config.GlobalConfig().Common.IDC
)
w.IDCWebhookURLs.Range(func(key, value any) bool {
diff --git a/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer_mesh.go b/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer_mesh.go
index b9f21890f..414d7f092 100644
--- a/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer_mesh.go
+++ b/eventmesh-server-go/runtime/core/protocol/grpc/producer/producer_mesh.go
@@ -48,8 +48,8 @@ func NewEventMeshProducer(cfg *ProducerGroupConfig) (EventMeshProducer, error) {
return nil, err
}
- cluster := config2.GlobalConfig().Server.GRPCOption.Cluster
- idc := config2.GlobalConfig().Server.GRPCOption.IDC
+ cluster := config2.GlobalConfig().Common.Cluster
+ idc := config2.GlobalConfig().Common.IDC
mm := make(map[string]string)
mm["producerGroup"] = cfg.GroupName
mm["instanceName"] = util.BuildMeshClientID(cfg.GroupName, cluster)
diff --git a/eventmesh-server-go/runtime/proto/pb/mocks/consumer_service.go b/eventmesh-server-go/runtime/proto/pb/mocks/consumer_service.go
new file mode 100644
index 000000000..2771fb7c7
--- /dev/null
+++ b/eventmesh-server-go/runtime/proto/pb/mocks/consumer_service.go
@@ -0,0 +1,107 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb (interfaces: ConsumerServiceServer)
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+ context "context"
+ reflect "reflect"
+
+ pb "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ gomock "github.com/golang/mock/gomock"
+)
+
+// MockConsumerServiceServer is a mock of ConsumerServiceServer interface.
+type MockConsumerServiceServer struct {
+ ctrl *gomock.Controller
+ recorder *MockConsumerServiceServerMockRecorder
+}
+
+// MockConsumerServiceServerMockRecorder is the mock recorder for MockConsumerServiceServer.
+type MockConsumerServiceServerMockRecorder struct {
+ mock *MockConsumerServiceServer
+}
+
+// NewMockConsumerServiceServer creates a new mock instance.
+func NewMockConsumerServiceServer(ctrl *gomock.Controller) *MockConsumerServiceServer {
+ mock := &MockConsumerServiceServer{ctrl: ctrl}
+ mock.recorder = &MockConsumerServiceServerMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockConsumerServiceServer) EXPECT() *MockConsumerServiceServerMockRecorder {
+ return m.recorder
+}
+
+// Subscribe mocks base method.
+func (m *MockConsumerServiceServer) Subscribe(arg0 context.Context, arg1 *pb.Subscription) (*pb.Response, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Subscribe", arg0, arg1)
+ ret0, _ := ret[0].(*pb.Response)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// Subscribe indicates an expected call of Subscribe.
+func (mr *MockConsumerServiceServerMockRecorder) Subscribe(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Subscribe", reflect.TypeOf((*MockConsumerServiceServer)(nil).Subscribe), arg0, arg1)
+}
+
+// SubscribeStream mocks base method.
+func (m *MockConsumerServiceServer) SubscribeStream(arg0 pb.ConsumerService_SubscribeStreamServer) error {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "SubscribeStream", arg0)
+ ret0, _ := ret[0].(error)
+ return ret0
+}
+
+// SubscribeStream indicates an expected call of SubscribeStream.
+func (mr *MockConsumerServiceServerMockRecorder) SubscribeStream(arg0 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "SubscribeStream", reflect.TypeOf((*MockConsumerServiceServer)(nil).SubscribeStream), arg0)
+}
+
+// Unsubscribe mocks base method.
+func (m *MockConsumerServiceServer) Unsubscribe(arg0 context.Context, arg1 *pb.Subscription) (*pb.Response, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Unsubscribe", arg0, arg1)
+ ret0, _ := ret[0].(*pb.Response)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// Unsubscribe indicates an expected call of Unsubscribe.
+func (mr *MockConsumerServiceServerMockRecorder) Unsubscribe(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Unsubscribe", reflect.TypeOf((*MockConsumerServiceServer)(nil).Unsubscribe), arg0, arg1)
+}
+
+// mustEmbedUnimplementedConsumerServiceServer mocks base method.
+func (m *MockConsumerServiceServer) mustEmbedUnimplementedConsumerServiceServer() {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "mustEmbedUnimplementedConsumerServiceServer")
+}
+
+// mustEmbedUnimplementedConsumerServiceServer indicates an expected call of mustEmbedUnimplementedConsumerServiceServer.
+func (mr *MockConsumerServiceServerMockRecorder) mustEmbedUnimplementedConsumerServiceServer() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedConsumerServiceServer", reflect.TypeOf((*MockConsumerServiceServer)(nil).mustEmbedUnimplementedConsumerServiceServer))
+}
diff --git a/eventmesh-server-go/runtime/proto/pb/mocks/heartbeat_service.go b/eventmesh-server-go/runtime/proto/pb/mocks/heartbeat_service.go
new file mode 100644
index 000000000..0fcdcf0f9
--- /dev/null
+++ b/eventmesh-server-go/runtime/proto/pb/mocks/heartbeat_service.go
@@ -0,0 +1,78 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb (interfaces: HeartbeatServiceServer)
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+ context "context"
+ reflect "reflect"
+
+ pb "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ gomock "github.com/golang/mock/gomock"
+)
+
+// MockHeartbeatServiceServer is a mock of HeartbeatServiceServer interface.
+type MockHeartbeatServiceServer struct {
+ ctrl *gomock.Controller
+ recorder *MockHeartbeatServiceServerMockRecorder
+}
+
+// MockHeartbeatServiceServerMockRecorder is the mock recorder for MockHeartbeatServiceServer.
+type MockHeartbeatServiceServerMockRecorder struct {
+ mock *MockHeartbeatServiceServer
+}
+
+// NewMockHeartbeatServiceServer creates a new mock instance.
+func NewMockHeartbeatServiceServer(ctrl *gomock.Controller) *MockHeartbeatServiceServer {
+ mock := &MockHeartbeatServiceServer{ctrl: ctrl}
+ mock.recorder = &MockHeartbeatServiceServerMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockHeartbeatServiceServer) EXPECT() *MockHeartbeatServiceServerMockRecorder {
+ return m.recorder
+}
+
+// Heartbeat mocks base method.
+func (m *MockHeartbeatServiceServer) Heartbeat(arg0 context.Context, arg1 *pb.Heartbeat) (*pb.Response, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Heartbeat", arg0, arg1)
+ ret0, _ := ret[0].(*pb.Response)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// Heartbeat indicates an expected call of Heartbeat.
+func (mr *MockHeartbeatServiceServerMockRecorder) Heartbeat(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Heartbeat", reflect.TypeOf((*MockHeartbeatServiceServer)(nil).Heartbeat), arg0, arg1)
+}
+
+// mustEmbedUnimplementedHeartbeatServiceServer mocks base method.
+func (m *MockHeartbeatServiceServer) mustEmbedUnimplementedHeartbeatServiceServer() {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "mustEmbedUnimplementedHeartbeatServiceServer")
+}
+
+// mustEmbedUnimplementedHeartbeatServiceServer indicates an expected call of mustEmbedUnimplementedHeartbeatServiceServer.
+func (mr *MockHeartbeatServiceServerMockRecorder) mustEmbedUnimplementedHeartbeatServiceServer() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedHeartbeatServiceServer", reflect.TypeOf((*MockHeartbeatServiceServer)(nil).mustEmbedUnimplementedHeartbeatServiceServer))
+}
diff --git a/eventmesh-server-go/runtime/proto/pb/mocks/producer_service.go b/eventmesh-server-go/runtime/proto/pb/mocks/producer_service.go
new file mode 100644
index 000000000..8dfecd6e9
--- /dev/null
+++ b/eventmesh-server-go/runtime/proto/pb/mocks/producer_service.go
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements. See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+// Code generated by MockGen. DO NOT EDIT.
+// Source: github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb (interfaces: PublisherServiceServer)
+
+// Package mocks is a generated GoMock package.
+package mocks
+
+import (
+ context "context"
+ reflect "reflect"
+
+ pb "github.com/apache/incubator-eventmesh/eventmesh-server-go/runtime/proto/pb"
+ gomock "github.com/golang/mock/gomock"
+)
+
+// MockPublisherServiceServer is a mock of PublisherServiceServer interface.
+type MockPublisherServiceServer struct {
+ ctrl *gomock.Controller
+ recorder *MockPublisherServiceServerMockRecorder
+}
+
+// MockPublisherServiceServerMockRecorder is the mock recorder for MockPublisherServiceServer.
+type MockPublisherServiceServerMockRecorder struct {
+ mock *MockPublisherServiceServer
+}
+
+// NewMockPublisherServiceServer creates a new mock instance.
+func NewMockPublisherServiceServer(ctrl *gomock.Controller) *MockPublisherServiceServer {
+ mock := &MockPublisherServiceServer{ctrl: ctrl}
+ mock.recorder = &MockPublisherServiceServerMockRecorder{mock}
+ return mock
+}
+
+// EXPECT returns an object that allows the caller to indicate expected use.
+func (m *MockPublisherServiceServer) EXPECT() *MockPublisherServiceServerMockRecorder {
+ return m.recorder
+}
+
+// BatchPublish mocks base method.
+func (m *MockPublisherServiceServer) BatchPublish(arg0 context.Context, arg1 *pb.BatchMessage) (*pb.Response, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "BatchPublish", arg0, arg1)
+ ret0, _ := ret[0].(*pb.Response)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// BatchPublish indicates an expected call of BatchPublish.
+func (mr *MockPublisherServiceServerMockRecorder) BatchPublish(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "BatchPublish", reflect.TypeOf((*MockPublisherServiceServer)(nil).BatchPublish), arg0, arg1)
+}
+
+// Publish mocks base method.
+func (m *MockPublisherServiceServer) Publish(arg0 context.Context, arg1 *pb.SimpleMessage) (*pb.Response, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "Publish", arg0, arg1)
+ ret0, _ := ret[0].(*pb.Response)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// Publish indicates an expected call of Publish.
+func (mr *MockPublisherServiceServerMockRecorder) Publish(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Publish", reflect.TypeOf((*MockPublisherServiceServer)(nil).Publish), arg0, arg1)
+}
+
+// RequestReply mocks base method.
+func (m *MockPublisherServiceServer) RequestReply(arg0 context.Context, arg1 *pb.SimpleMessage) (*pb.SimpleMessage, error) {
+ m.ctrl.T.Helper()
+ ret := m.ctrl.Call(m, "RequestReply", arg0, arg1)
+ ret0, _ := ret[0].(*pb.SimpleMessage)
+ ret1, _ := ret[1].(error)
+ return ret0, ret1
+}
+
+// RequestReply indicates an expected call of RequestReply.
+func (mr *MockPublisherServiceServerMockRecorder) RequestReply(arg0, arg1 interface{}) *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "RequestReply", reflect.TypeOf((*MockPublisherServiceServer)(nil).RequestReply), arg0, arg1)
+}
+
+// mustEmbedUnimplementedPublisherServiceServer mocks base method.
+func (m *MockPublisherServiceServer) mustEmbedUnimplementedPublisherServiceServer() {
+ m.ctrl.T.Helper()
+ m.ctrl.Call(m, "mustEmbedUnimplementedPublisherServiceServer")
+}
+
+// mustEmbedUnimplementedPublisherServiceServer indicates an expected call of mustEmbedUnimplementedPublisherServiceServer.
+func (mr *MockPublisherServiceServerMockRecorder) mustEmbedUnimplementedPublisherServiceServer() *gomock.Call {
+ mr.mock.ctrl.T.Helper()
+ return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "mustEmbedUnimplementedPublisherServiceServer", reflect.TypeOf((*MockPublisherServiceServer)(nil).mustEmbedUnimplementedPublisherServiceServer))
+}
diff --git a/eventmesh-server-go/plugin/naming/nacos/registry/config.go b/eventmesh-server-go/runtime/registry/registry.go
similarity index 65%
rename from eventmesh-server-go/plugin/naming/nacos/registry/config.go
rename to eventmesh-server-go/runtime/registry/registry.go
index 11fd86c23..c9288b927 100644
--- a/eventmesh-server-go/plugin/naming/nacos/registry/config.go
+++ b/eventmesh-server-go/runtime/registry/registry.go
@@ -15,18 +15,9 @@
package registry
-// Config registry config
-type Config struct {
- ServiceName string
- Weight int
- Address string
- Metadata map[string]string
-}
-
-// PluginConfig define registry plugin config
-type PluginConfig struct {
- ServiceName string `yaml:"service_name"`
- CacheDir string `yaml:"cache-dir"`
- Port string `yaml:"port"` // nacos server port
- AddressList string `yaml:"address_list"` // nacos server address list
+type Registry interface {
+ Init() error
+ findEventMeshInfoByCluster(clusterName string)
+ Start() error
+ Shutdown() error
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@eventmesh.apache.org
For additional commands, e-mail: commits-help@eventmesh.apache.org