You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/14 07:35:39 UTC
[dubbo-go] branch 3.0 updated: Nacos client (#1255)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 75fc696 Nacos client (#1255)
75fc696 is described below
commit 75fc69650a261b9573fe61be387bb3c92d084fce
Author: 赵云兴 <23...@qq.com>
AuthorDate: Mon Jun 14 15:35:31 2021 +0800
Nacos client (#1255)
* build(deps): bump actions/cache from v2.1.4 to v2.1.5
Bumps [actions/cache](https://github.com/actions/cache) from v2.1.4 to v2.1.5.
- [Release notes](https://github.com/actions/cache/releases)
- [Commits](https://github.com/actions/cache/compare/v2.1.4...1a9e2138d905efd099035b49d8b7a3888c653ca8)
Signed-off-by: dependabot[bot] <su...@github.com>
* improve etcd version and change create to put (#1203)
* up:remoting nacos
* add:nacos service discovery
* up:设置默认值
* up:nacos registroy client
* up:nacon config client
* up:go fmt
* up:nacos config client
* up:test
* up:修改初æ测试方法
* up:fmt
* up:triple version
* up:修改配置操作
Co-authored-by: dependabot[bot] <49...@users.noreply.github.com>
Co-authored-by: Xin.Zh <dr...@foxmail.com>
Co-authored-by: AlexStocks <al...@foxmail.com>
Co-authored-by: randy <zt...@gmail.com>
---
config_center/nacos/client.go | 112 +++++++------------------------
config_center/nacos/client_test.go | 49 ++++++--------
config_center/nacos/facade.go | 54 ++-------------
config_center/nacos/impl.go | 31 ++++-----
config_center/nacos/impl_test.go | 112 +++++++------------------------
config_center/nacos/listener.go | 2 +-
metadata/report/nacos/report.go | 12 ++--
registry/nacos/listener.go | 17 +++--
registry/nacos/registry.go | 73 ++------------------
registry/nacos/registry_test.go | 11 +--
registry/nacos/service_discovery.go | 33 +++++----
registry/nacos/service_discovery_test.go | 5 +-
remoting/nacos/builder.go | 104 ++++++++++++++++------------
remoting/nacos/builder_test.go | 46 +++++++++++++
14 files changed, 243 insertions(+), 418 deletions(-)
diff --git a/config_center/nacos/client.go b/config_center/nacos/client.go
index c26717f..f53c629 100644
--- a/config_center/nacos/client.go
+++ b/config_center/nacos/client.go
@@ -18,46 +18,43 @@
package nacos
import (
- "strconv"
"strings"
"sync"
"time"
)
import (
- "github.com/nacos-group/nacos-sdk-go/clients"
- "github.com/nacos-group/nacos-sdk-go/clients/config_client"
- nacosconst "github.com/nacos-group/nacos-sdk-go/common/constant"
+ nacosClient "github.com/dubbogo/gost/database/kv/nacos"
perrors "github.com/pkg/errors"
)
import (
- "dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/remoting/nacos"
)
-// NacosClient Nacos client
+// NacosClient Nacos configClient
type NacosClient struct {
- name string
- NacosAddrs []string
- sync.Mutex // for Client
- client *config_client.IConfigClient
- exit chan struct{}
- Timeout time.Duration
- once sync.Once
- onceClose func()
+ name string
+ NacosAddrs []string
+ sync.Mutex // for Client
+ configClient *nacosClient.NacosConfigClient
+ exit chan struct{}
+ Timeout time.Duration
+ once sync.Once
+ onceClose func()
}
// Client Get Client
-func (n *NacosClient) Client() *config_client.IConfigClient {
- return n.client
+func (n *NacosClient) Client() *nacosClient.NacosConfigClient {
+ return n.configClient
}
-// SetClient Set client
-func (n *NacosClient) SetClient(client *config_client.IConfigClient) {
+// SetClient Set configClient
+func (n *NacosClient) SetClient(configClient *nacosClient.NacosConfigClient) {
n.Lock()
- n.client = client
+ n.configClient = configClient
n.Unlock()
}
@@ -65,7 +62,7 @@ type option func(*options)
type options struct {
nacosName string
- // client *NacosClient
+ // configClient *NacosClient
}
// WithNacosName Set nacos name
@@ -75,7 +72,7 @@ func WithNacosName(name string) option {
}
}
-// ValidateNacosClient Validate nacos client , if null then create it
+// ValidateNacosClient Validate nacos configClient , if null then create it
func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
if container == nil {
return perrors.Errorf("container can not be null")
@@ -95,7 +92,7 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
nacosAddresses := strings.Split(url.Location, ",")
if container.NacosClient() == nil {
// in dubbo ,every registry only connect one node ,so this is []string{r.Address}
- newClient, err := newNacosClient(os.nacosName, nacosAddresses, timeout, url)
+ newClient, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("newNacosClient(name{%s}, nacos address{%v}, timeout{%d}) = error{%v}",
os.nacosName, url.Location, timeout.String(), err)
@@ -105,79 +102,18 @@ func ValidateNacosClient(container nacosClientFacade, opts ...option) error {
}
if container.NacosClient().Client() == nil {
- configClient, err := initNacosConfigClient(nacosAddresses, timeout, url)
+ configClient, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("initNacosConfigClient(addr:%+v,timeout:%v,url:%v) = err %+v",
nacosAddresses, timeout.String(), url, err)
return perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
}
- container.NacosClient().SetClient(&configClient)
-
+ container.NacosClient().SetClient(configClient.Client())
}
-
return perrors.WithMessagef(nil, "newNacosClient(address:%+v)", url.PrimitiveURL)
}
-func newNacosClient(name string, nacosAddrs []string, timeout time.Duration, url *common.URL) (*NacosClient, error) {
- var (
- err error
- n *NacosClient
- )
-
- n = &NacosClient{
- name: name,
- NacosAddrs: nacosAddrs,
- Timeout: timeout,
- exit: make(chan struct{}),
- onceClose: func() {
- close(n.exit)
- },
- }
-
- configClient, err := initNacosConfigClient(nacosAddrs, timeout, url)
- if err != nil {
- logger.Errorf("initNacosConfigClient(addr:%+v,timeout:%v,url:%v) = err %+v",
- nacosAddrs, timeout.String(), url, err)
- return n, perrors.WithMessagef(err, "newNacosClient(address:%+v)", url.Location)
- }
- n.SetClient(&configClient)
-
- return n, nil
-}
-
-func initNacosConfigClient(nacosAddrs []string, timeout time.Duration, url *common.URL) (config_client.IConfigClient, error) {
- var svrConfList []nacosconst.ServerConfig
- for _, nacosAddr := range nacosAddrs {
- split := strings.Split(nacosAddr, ":")
- port, err := strconv.ParseUint(split[1], 10, 64)
- if err != nil {
- logger.Errorf("strconv.ParseUint(nacos addr port:%+v) = error %+v", split[1], err)
- continue
- }
- svrconf := nacosconst.ServerConfig{
- IpAddr: split[0],
- Port: port,
- }
- svrConfList = append(svrConfList, svrconf)
- }
-
- return clients.CreateConfigClient(map[string]interface{}{
- "serverConfigs": svrConfList,
- "clientConfig": nacosconst.ClientConfig{
- TimeoutMs: uint64(int32(timeout / time.Millisecond)),
- ListenInterval: uint64(int32(timeout / time.Millisecond)),
- NotLoadCacheAtStart: true,
- LogDir: url.GetParam(constant.NACOS_LOG_DIR_KEY, ""),
- CacheDir: url.GetParam(constant.NACOS_CACHE_DIR_KEY, ""),
- Endpoint: url.GetParam(constant.NACOS_ENDPOINT, ""),
- Username: url.GetParam(constant.NACOS_USERNAME, ""),
- Password: url.GetParam(constant.NACOS_PASSWORD, ""),
- NamespaceId: url.GetParam(constant.NACOS_NAMESPACE_ID, ""),
- },
- })
-}
-
-// Done Get nacos client exit signal
+// Done Get nacos configClient exit signal
func (n *NacosClient) Done() <-chan struct{} {
return n.exit
}
@@ -193,7 +129,7 @@ func (n *NacosClient) stop() bool {
return false
}
-// NacosClientValid Get nacos client valid status
+// NacosClientValid Get nacos configClient valid status
func (n *NacosClient) NacosClientValid() bool {
select {
case <-n.exit:
@@ -211,7 +147,7 @@ func (n *NacosClient) NacosClientValid() bool {
return valid
}
-// Close Close nacos client , then set null
+// Close Close nacos configClient , then set null
func (n *NacosClient) Close() {
if n == nil {
return
diff --git a/config_center/nacos/client_test.go b/config_center/nacos/client_test.go
index 3ae19f6..458f2d9 100644
--- a/config_center/nacos/client_test.go
+++ b/config_center/nacos/client_test.go
@@ -18,7 +18,6 @@
package nacos
import (
- "strings"
"testing"
"time"
)
@@ -32,9 +31,10 @@ import (
)
func TestNewNacosClient(t *testing.T) {
- server := mockCommonNacosServer()
- nacosURL := strings.ReplaceAll(server.URL, "http", "registry")
+
+ nacosURL := "registry://127.0.0.1:8848"
registryUrl, _ := common.NewURL(nacosURL)
+
c := &nacosDynamicConfiguration{
url: registryUrl,
done: make(chan struct{}),
@@ -44,52 +44,41 @@ func TestNewNacosClient(t *testing.T) {
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
- // c.client.Close() and <-c.client.Done() have order requirements.
- // If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
- // sets c.client to nil before calling c.client.Done().
+ // c.configClient.Close() and <-c.configClient.Done() have order requirements.
+ // If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
+ // sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
- <-c.client.Done()
+ //<-c.client.Done()
c.Destroy()
}
func TestSetNacosClient(t *testing.T) {
- server := mockCommonNacosServer()
- nacosURL := "registry://" + server.Listener.Addr().String()
+ nacosURL := "registry://127.0.0.1:8848"
registryUrl, _ := common.NewURL(nacosURL)
+
c := &nacosDynamicConfiguration{
url: registryUrl,
done: make(chan struct{}),
}
- var client *NacosClient
- client = &NacosClient{
- name: nacosClientName,
- NacosAddrs: []string{nacosURL},
- Timeout: 15 * time.Second,
- exit: make(chan struct{}),
- onceClose: func() {
- close(client.exit)
- },
- }
- c.SetNacosClient(client)
+
err := ValidateNacosClient(c, WithNacosName(nacosClientName))
assert.NoError(t, err)
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
- // c.client.Close() and <-c.client.Done() have order requirements.
- // If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
- // sets c.client to nil before calling c.client.Done().
+ // c.configClient.Close() and <-c.configClient.Done() have order requirements.
+ // If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
+ // sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
- <-c.client.Done()
c.Destroy()
}
func TestNewNacosClient_connectError(t *testing.T) {
- nacosURL := "registry://127.0.0.1:8888"
+ nacosURL := "registry://127.0.0.1:8848"
registryUrl, err := common.NewURL(nacosURL)
assert.NoError(t, err)
c := &nacosDynamicConfiguration{
@@ -101,14 +90,14 @@ func TestNewNacosClient_connectError(t *testing.T) {
c.wg.Add(1)
go HandleClientRestart(c)
go func() {
- // c.client.Close() and <-c.client.Done() have order requirements.
- // If c.client.Close() is called first.It is possible that "go HandleClientRestart(c)"
- // sets c.client to nil before calling c.client.Done().
+ // c.configClient.Close() and <-c.configClient.Done() have order requirements.
+ // If c.configClient.Close() is called first.It is possible that "go HandleClientRestart(c)"
+ // sets c.configClient to nil before calling c.configClient.Done().
time.Sleep(time.Second)
c.client.Close()
}()
- <-c.client.Done()
- // let client do retry
+ // <-c.client.Done()
+ // let configClient do retry
time.Sleep(5 * time.Second)
c.Destroy()
}
diff --git a/config_center/nacos/facade.go b/config_center/nacos/facade.go
index 298bef9..6141e1e 100644
--- a/config_center/nacos/facade.go
+++ b/config_center/nacos/facade.go
@@ -19,12 +19,10 @@ package nacos
import (
"sync"
- "time"
)
import (
- "github.com/apache/dubbo-getty"
- perrors "github.com/pkg/errors"
+ nacosClient "github.com/dubbogo/gost/database/kv/nacos"
)
import (
@@ -32,62 +30,24 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/logger"
)
-const (
- connDelay = 3
- maxFailTimes = 15
-)
-
type nacosClientFacade interface {
- NacosClient() *NacosClient
- SetNacosClient(*NacosClient)
- // WaitGroup for wait group control, zk client listener & zk client container
+ NacosClient() *nacosClient.NacosConfigClient
+ SetNacosClient(*nacosClient.NacosConfigClient)
+ // WaitGroup for wait group control, zk configClient listener & zk configClient container
WaitGroup() *sync.WaitGroup
- // GetDone For nacos client control RestartCallBack() bool
+ // GetDone For nacos configClient control RestartCallBack() bool
GetDone() chan struct{}
common.Node
}
-// HandleClientRestart Restart client handler
+// HandleClientRestart Restart configClient handler
func HandleClientRestart(r nacosClientFacade) {
- var (
- err error
- failTimes int
- )
-
defer r.WaitGroup().Done()
-LOOP:
for {
select {
case <-r.GetDone():
logger.Warnf("(NacosProviderRegistry)reconnectNacosRegistry goroutine exit now...")
- break LOOP
- // re-register all services
- case <-r.NacosClient().Done():
- r.NacosClient().Close()
- nacosName := r.NacosClient().name
- nacosAddress := r.NacosClient().NacosAddrs
- r.SetNacosClient(nil)
-
- // Connect nacos until success.
- failTimes = 0
- for {
- select {
- case <-r.GetDone():
- logger.Warnf("(NacosProviderRegistry)reconnectZkRegistry goroutine exit now...")
- break LOOP
- case <-getty.GetTimeWheel().After(time.Duration(failTimes*connDelay) * time.Second): // Prevent crazy reconnection nacos.
- }
- err = ValidateNacosClient(r, WithNacosName(nacosName))
- logger.Infof("NacosProviderRegistry.validateNacosClient(nacosAddr{%s}) = error{%#v}",
- nacosAddress, perrors.WithStack(err))
- if err == nil {
- break
- }
- failTimes++
- if maxFailTimes <= failTimes {
- failTimes = maxFailTimes
- }
- }
+ return
}
}
}
diff --git a/config_center/nacos/impl.go b/config_center/nacos/impl.go
index 6085ed0..50e1094 100644
--- a/config_center/nacos/impl.go
+++ b/config_center/nacos/impl.go
@@ -24,6 +24,7 @@ import (
import (
gxset "github.com/dubbogo/gost/container/set"
+ nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
@@ -53,7 +54,7 @@ type nacosDynamicConfiguration struct {
wg sync.WaitGroup
cltLock sync.Mutex
done chan struct{}
- client *NacosClient
+ client *nacosClient.NacosConfigClient
keyListeners sync.Map
parser parser.ConfigurationParser
}
@@ -66,7 +67,7 @@ func newNacosDynamicConfiguration(url *common.URL) (*nacosDynamicConfiguration,
}
err := ValidateNacosClient(c, WithNacosName(nacosClientName))
if err != nil {
- logger.Errorf("nacos client start error ,error message is %v", err)
+ logger.Errorf("nacos configClient start error ,error message is %v", err)
return nil, err
}
c.wg.Add(1)
@@ -98,7 +99,7 @@ func (n *nacosDynamicConfiguration) GetInternalProperty(key string, opts ...conf
func (n *nacosDynamicConfiguration) PublishConfig(key string, group string, value string) error {
group = n.resolvedGroup(group)
- ok, err := (*n.client.Client()).PublishConfig(vo.ConfigParam{
+ ok, err := n.client.Client().PublishConfig(vo.ConfigParam{
DataId: key,
Group: group,
Content: value,
@@ -115,7 +116,7 @@ func (n *nacosDynamicConfiguration) PublishConfig(key string, group string, valu
// GetConfigKeysByGroup will return all keys with the group
func (n *nacosDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.HashSet, error) {
group = n.resolvedGroup(group)
- page, err := (*n.client.Client()).SearchConfig(vo.SearchConfigParam{
+ page, err := n.client.Client().SearchConfig(vo.SearchConfigParam{
Search: "accurate",
Group: group,
PageNo: 1,
@@ -125,7 +126,7 @@ func (n *nacosDynamicConfiguration) GetConfigKeysByGroup(group string) (*gxset.H
result := gxset.NewSet()
if err != nil {
- return result, perrors.WithMessage(err, "can not find the client config")
+ return result, perrors.WithMessage(err, "can not find the configClient config")
}
for _, itm := range page.PageItems {
result.Add(itm.DataId)
@@ -139,7 +140,7 @@ func (n *nacosDynamicConfiguration) GetRule(key string, opts ...config_center.Op
for _, opt := range opts {
opt(tmpOpts)
}
- content, err := (*n.client.Client()).GetConfig(vo.ConfigParam{
+ content, err := n.client.Client().GetConfig(vo.ConfigParam{
DataId: key,
Group: n.resolvedGroup(tmpOpts.Group),
})
@@ -161,23 +162,23 @@ func (n *nacosDynamicConfiguration) SetParser(p parser.ConfigurationParser) {
}
// NacosClient Get Nacos Client
-func (n *nacosDynamicConfiguration) NacosClient() *NacosClient {
+func (n *nacosDynamicConfiguration) NacosClient() *nacosClient.NacosConfigClient {
return n.client
}
// SetNacosClient Set Nacos Client
-func (n *nacosDynamicConfiguration) SetNacosClient(client *NacosClient) {
+func (n *nacosDynamicConfiguration) SetNacosClient(client *nacosClient.NacosConfigClient) {
n.cltLock.Lock()
n.client = client
n.cltLock.Unlock()
}
-// WaitGroup for wait group control, zk client listener & zk client container
+// WaitGroup for wait group control, zk configClient listener & zk configClient container
func (n *nacosDynamicConfiguration) WaitGroup() *sync.WaitGroup {
return &n.wg
}
-// GetDone For nacos client control RestartCallBack() bool
+// GetDone For nacos configClient control RestartCallBack() bool
func (n *nacosDynamicConfiguration) GetDone() chan struct{} {
return n.done
}
@@ -214,11 +215,7 @@ func (n *nacosDynamicConfiguration) IsAvailable() bool {
}
func (n *nacosDynamicConfiguration) closeConfigs() {
- n.cltLock.Lock()
- client := n.client
- n.client = nil
- n.cltLock.Unlock()
- // Close the old client first to close the tmp node
- client.Close()
- logger.Infof("begin to close provider n client")
+ // Close the old configClient first to close the tmp node
+ n.client.Close()
+ logger.Infof("begin to close provider n configClient")
}
diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go
index f32920b..eb699e6 100644
--- a/config_center/nacos/impl_test.go
+++ b/config_center/nacos/impl_test.go
@@ -18,10 +18,6 @@
package nacos
import (
- "fmt"
- "net/http"
- "net/http/httptest"
- "strings"
"sync"
"testing"
"time"
@@ -37,104 +33,46 @@ import (
"dubbo.apache.org/dubbo-go/v3/config_center/parser"
)
-// run mock config server
-func runMockConfigServer(configHandler func(http.ResponseWriter, *http.Request),
- configListenHandler func(http.ResponseWriter, *http.Request)) *httptest.Server {
- uriHandlerMap := make(map[string]func(http.ResponseWriter, *http.Request))
-
- uriHandlerMap["/nacos/v1/cs/configs"] = configHandler
- uriHandlerMap["/nacos/v1/cs/configs/listener"] = configListenHandler
-
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- uri := r.RequestURI
- for path, handler := range uriHandlerMap {
- if uri == path {
- handler(w, r)
- break
- }
- }
- }))
-
- return ts
-}
-
-func mockCommonNacosServer() *httptest.Server {
- return runMockConfigServer(func(writer http.ResponseWriter, _ *http.Request) {
- data := "true"
- fmt.Fprintf(writer, "%s", data)
- }, func(writer http.ResponseWriter, _ *http.Request) {
- data := `dubbo.properties%02dubbo%02dubbo.service.com.ikurento.user.UserProvider.cluster=failback`
- fmt.Fprintf(writer, "%s", data)
- })
+func getNacosConfig(t *testing.T) config_center.DynamicConfiguration {
+ registryUrl, err := common.NewURL("registry://console.nacos.io:80")
+ assert.Nil(t, err)
+ nacosConfig, err := newNacosDynamicConfiguration(registryUrl)
+ assert.Nil(t, err)
+ return nacosConfig
}
-func initNacosData(t *testing.T) (*nacosDynamicConfiguration, error) {
- server := mockCommonNacosServer()
- nacosURL := strings.ReplaceAll(server.URL, "http", "registry")
- regurl, _ := common.NewURL(nacosURL)
- factory := &nacosDynamicConfigurationFactory{}
- nacosConfiguration, err := factory.GetDynamicConfiguration(regurl)
- assert.NoError(t, err)
-
- nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{})
-
- return nacosConfiguration.(*nacosDynamicConfiguration), err
+func TestPublishConfig(t *testing.T) {
+ nacosConfig := getNacosConfig(t)
+ data := `dubbo.protocol.name=dubbo`
+ err := nacosConfig.PublishConfig("dubbo.properties", "dubbo-go", data)
+ assert.Nil(t, err)
}
func TestGetConfig(t *testing.T) {
- nacos, err := initNacosData(t)
- assert.NoError(t, err)
- configs, err := nacos.GetProperties("dubbo.properties", config_center.WithGroup("dubbo"))
- assert.Empty(t, configs)
- assert.NoError(t, err)
- _, err = nacos.Parser().Parse(configs)
- assert.NoError(t, err)
-}
+ nacosConfig := getNacosConfig(t)
+ nacosConfig.SetParser(&parser.DefaultConfigurationParser{})
-func TestNacosDynamicConfiguration_GetConfigKeysByGroup(t *testing.T) {
- data := `
-{
- "PageItems": [
- {
- "dataId": "application"
- }
- ]
-}
-`
- ts := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
- _, err := w.Write([]byte(data))
- assert.Nil(t, err)
- }))
-
- nacosURL := strings.ReplaceAll(ts.URL, "http", "registry")
- regurl, _ := common.NewURL(nacosURL)
- nacosConfiguration, err := newNacosDynamicConfiguration(regurl)
+ config, err := nacosConfig.GetProperties("dubbo.properties", config_center.WithGroup("dubbo-go"))
+ assert.NotEmpty(t, config)
assert.NoError(t, err)
- nacosConfiguration.SetParser(&parser.DefaultConfigurationParser{})
-
- configs, err := nacosConfiguration.GetConfigKeysByGroup("dubbo")
- assert.Nil(t, err)
- assert.Equal(t, 1, configs.Size())
- assert.True(t, configs.Contains("application"))
+ parse, err := nacosConfig.Parser().Parse(config)
+ assert.NoError(t, err)
+ assert.Equal(t, parse["dubbo.protocol.name"], "dubbo")
}
-func TestNacosDynamicConfigurationPublishConfig(t *testing.T) {
- nacos, err := initNacosData(t)
- assert.Nil(t, err)
- key := "myKey"
- group := "/custom/a/b"
- value := "MyValue"
- err = nacos.PublishConfig(key, group, value)
- assert.Nil(t, err)
+func TestGetConfigKeysByGroup(t *testing.T) {
+ nacosConfig := getNacosConfig(t)
+ config, err := nacosConfig.GetConfigKeysByGroup("dubbo-go")
+ assert.NoError(t, err)
+ assert.True(t, config.Contains("dubbo.properties"))
}
func TestAddListener(t *testing.T) {
- nacos, err := initNacosData(t)
- assert.NoError(t, err)
+ nacosConfig := getNacosConfig(t)
listener := &mockDataListener{}
time.Sleep(time.Second * 2)
- nacos.AddListener("dubbo.properties", listener)
+ nacosConfig.AddListener("dubbo.properties", listener)
}
func TestRemoveListener(_ *testing.T) {
diff --git a/config_center/nacos/listener.go b/config_center/nacos/listener.go
index 2e5eca8..258e730 100644
--- a/config_center/nacos/listener.go
+++ b/config_center/nacos/listener.go
@@ -38,7 +38,7 @@ func callback(listener config_center.ConfigurationListener, _, _, dataId, data s
func (n *nacosDynamicConfiguration) addListener(key string, listener config_center.ConfigurationListener) {
_, loaded := n.keyListeners.Load(key)
if !loaded {
- err := (*n.client.Client()).ListenConfig(vo.ConfigParam{
+ err := n.client.Client().ListenConfig(vo.ConfigParam{
DataId: key,
Group: "dubbo",
OnChange: func(namespace, group, dataId, data string) {
diff --git a/metadata/report/nacos/report.go b/metadata/report/nacos/report.go
index d881771..a351726 100644
--- a/metadata/report/nacos/report.go
+++ b/metadata/report/nacos/report.go
@@ -23,7 +23,7 @@ import (
)
import (
- "github.com/nacos-group/nacos-sdk-go/clients/config_client"
+ nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
@@ -48,7 +48,7 @@ func init() {
// nacosMetadataReport is the implementation
// of MetadataReport based on nacos.
type nacosMetadataReport struct {
- client config_client.IConfigClient
+ client *nacosClient.NacosConfigClient
}
// GetAppMetadata get metadata info from nacos
@@ -152,7 +152,7 @@ func (n *nacosMetadataReport) GetServiceDefinition(metadataIdentifier *identifie
// storeMetadata will publish the metadata to Nacos
// if failed or error is not nil, error will be returned
func (n *nacosMetadataReport) storeMetadata(param vo.ConfigParam) error {
- res, err := n.client.PublishConfig(param)
+ res, err := n.client.Client().PublishConfig(param)
if err != nil {
return perrors.WithMessage(err, "Could not publish the metadata")
}
@@ -164,7 +164,7 @@ func (n *nacosMetadataReport) storeMetadata(param vo.ConfigParam) error {
// deleteMetadata will delete the metadata
func (n *nacosMetadataReport) deleteMetadata(param vo.ConfigParam) error {
- res, err := n.client.DeleteConfig(param)
+ res, err := n.client.Client().DeleteConfig(param)
if err != nil {
return perrors.WithMessage(err, "Could not delete the metadata")
}
@@ -196,7 +196,7 @@ func (n *nacosMetadataReport) getConfigAsArray(param vo.ConfigParam) ([]string,
// getConfig will read the config
func (n *nacosMetadataReport) getConfig(param vo.ConfigParam) (string, error) {
- cfg, err := n.client.GetConfig(param)
+ cfg, err := n.client.Client().GetConfig(param)
if err != nil {
logger.Errorf("Finding the configuration failed: %v", param)
return "", err
@@ -208,7 +208,7 @@ type nacosMetadataReportFactory struct{}
// nolint
func (n *nacosMetadataReportFactory) CreateMetadataReport(url *common.URL) report.MetadataReport {
- client, err := nacos.NewNacosConfigClient(url)
+ client, err := nacos.NewNacosConfigClientByUrl(url)
if err != nil {
logger.Errorf("Could not create nacos metadata report. URL: %s", url.String())
return nil
diff --git a/registry/nacos/listener.go b/registry/nacos/listener.go
index 19beaa5..3a538cc 100644
--- a/registry/nacos/listener.go
+++ b/registry/nacos/listener.go
@@ -26,7 +26,7 @@ import (
)
import (
- "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+ nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
@@ -42,7 +42,7 @@ import (
)
type nacosListener struct {
- namingClient naming_client.INamingClient
+ namingClient *nacosClient.NacosNamingClient
listenUrl *common.URL
events chan *config_center.ConfigChangeEvent
instanceMap map[string]model.Instance
@@ -51,8 +51,8 @@ type nacosListener struct {
subscribeParam *vo.SubscribeParam
}
-// NewRegistryDataListener creates a data listener for nacos
-func NewNacosListener(url *common.URL, namingClient naming_client.INamingClient) (*nacosListener, error) {
+// NewNacosListener creates a data listener for nacos
+func NewNacosListener(url *common.URL, namingClient *nacosClient.NacosNamingClient) (*nacosListener, error) {
listener := &nacosListener{
namingClient: namingClient,
listenUrl: url, events: make(chan *config_center.ConfigChangeEvent, 32),
@@ -150,7 +150,6 @@ func (nl *nacosListener) Callback(services []model.SubscribeService, err error)
}
nl.instanceMap = newInstanceMap
-
for i := range addInstances {
newUrl := generateUrl(addInstances[i])
if newUrl != nil {
@@ -184,18 +183,18 @@ func getSubscribeName(url *common.URL) string {
func (nl *nacosListener) startListen() error {
if nl.namingClient == nil {
- return perrors.New("nacos naming client stopped")
+ return perrors.New("nacos naming namingClient stopped")
}
serviceName := getSubscribeName(nl.listenUrl)
nl.subscribeParam = &vo.SubscribeParam{ServiceName: serviceName, SubscribeCallback: nl.Callback}
go func() {
- _ = nl.namingClient.Subscribe(nl.subscribeParam)
+ _ = nl.namingClient.Client().Subscribe(nl.subscribeParam)
}()
return nil
}
func (nl *nacosListener) stopListen() error {
- return nl.namingClient.Unsubscribe(nl.subscribeParam)
+ return nl.namingClient.Client().Unsubscribe(nl.subscribeParam)
}
func (nl *nacosListener) process(configType *config_center.ConfigChangeEvent) {
@@ -219,6 +218,6 @@ func (nl *nacosListener) Next() (*registry.ServiceEvent, error) {
// nolint
func (nl *nacosListener) Close() {
- nl.stopListen()
+ _ = nl.stopListen()
close(nl.done)
}
diff --git a/registry/nacos/registry.go b/registry/nacos/registry.go
index 479c5d6..b29f97f 100644
--- a/registry/nacos/registry.go
+++ b/registry/nacos/registry.go
@@ -19,16 +19,14 @@ package nacos
import (
"bytes"
- "net"
"strconv"
"strings"
"time"
)
import (
- "github.com/nacos-group/nacos-sdk-go/clients"
- "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
- nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/remoting/nacos"
+ nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
)
@@ -55,7 +53,7 @@ func init() {
type nacosRegistry struct {
*common.URL
- namingClient naming_client.INamingClient
+ namingClient *nacosClient.NacosNamingClient
registryUrls []*common.URL
}
@@ -119,7 +117,7 @@ func createRegisterParam(url *common.URL, serviceName string) vo.RegisterInstanc
func (nr *nacosRegistry) Register(url *common.URL) error {
serviceName := getServiceName(url)
param := createRegisterParam(url, serviceName)
- isRegistry, err := nr.namingClient.RegisterInstance(param)
+ isRegistry, err := nr.namingClient.Client().RegisterInstance(param)
if err != nil {
return err
}
@@ -149,7 +147,7 @@ func createDeregisterParam(url *common.URL, serviceName string) vo.DeregisterIns
func (nr *nacosRegistry) DeRegister(url *common.URL) error {
serviceName := getServiceName(url)
param := createDeregisterParam(url, serviceName)
- isDeRegistry, err := nr.namingClient.DeregisterInstance(param)
+ isDeRegistry, err := nr.namingClient.Client().DeregisterInstance(param)
if err != nil {
return err
}
@@ -199,11 +197,9 @@ func (nr *nacosRegistry) Subscribe(url *common.URL, notifyListener registry.Noti
listener.Close()
return err
}
-
logger.Infof("update begin, service event: %v", serviceEvent.String())
notifyListener.Notify(serviceEvent)
}
-
}
}
@@ -237,69 +233,14 @@ func (nr *nacosRegistry) Destroy() {
// newNacosRegistry will create new instance
func newNacosRegistry(url *common.URL) (registry.Registry, error) {
- nacosConfig, err := getNacosConfig(url)
- if err != nil {
- return &nacosRegistry{}, err
- }
- client, err := clients.CreateNamingClient(nacosConfig)
+ namingClient, err := nacos.NewNacosClientByUrl(url)
if err != nil {
return &nacosRegistry{}, err
}
tmpRegistry := &nacosRegistry{
URL: url,
- namingClient: client,
+ namingClient: namingClient,
registryUrls: []*common.URL{},
}
return tmpRegistry, nil
}
-
-// getNacosConfig will return the nacos config
-// TODO support RemoteRef
-func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
- if url == nil {
- return nil, perrors.New("url is empty!")
- }
- if len(url.Location) == 0 {
- return nil, perrors.New("url.location is empty!")
- }
- configMap := make(map[string]interface{}, 2)
-
- addresses := strings.Split(url.Location, ",")
- serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
- for _, addr := range addresses {
- ip, portStr, err := net.SplitHostPort(addr)
- if err != nil {
- return nil, perrors.WithMessagef(err, "split [%s] ", addr)
- }
- port, _ := strconv.Atoi(portStr)
- serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
- IpAddr: ip,
- Port: uint64(port),
- })
- }
- configMap[nacosConstant.KEY_SERVER_CONFIGS] = serverConfigs
-
- var clientConfig nacosConstant.ClientConfig
- timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
- if err != nil {
- return nil, err
- }
- clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
- clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
- clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
- clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
- clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
- clientConfig.NamespaceId = url.GetParam(constant.NACOS_NAMESPACE_ID, "")
-
- // enable local cache when nacos can not connect.
- notLoadCache, err := strconv.ParseBool(url.GetParam(constant.NACOS_NOT_LOAD_LOCAL_CACHE, "false"))
- if err != nil {
- logger.Errorf("ParseBool - error: %v", err)
- notLoadCache = false
- }
- clientConfig.NotLoadCacheAtStart = notLoadCache
-
- configMap[nacosConstant.KEY_CLIENT_CONFIG] = clientConfig
-
- return configMap, nil
-}
diff --git a/registry/nacos/registry_test.go b/registry/nacos/registry_test.go
index b95d3d5..17829d6 100644
--- a/registry/nacos/registry_test.go
+++ b/registry/nacos/registry_test.go
@@ -37,7 +37,7 @@ import (
)
func TestNacosRegistry_Register(t *testing.T) {
- t.Skip()
+ //t.Skip()
if !checkNacosServerAlive() {
return
}
@@ -66,8 +66,9 @@ func TestNacosRegistry_Register(t *testing.T) {
t.Errorf("register error:%s \n", err.Error())
return
}
+ time.Sleep(5 * time.Second)
nacosReg := reg.(*nacosRegistry)
- service, _ := nacosReg.namingClient.GetService(vo.GetServiceParam{ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"})
+ service, _ := nacosReg.namingClient.Client().GetService(vo.GetServiceParam{ServiceName: "providers:com.ikurento.user.UserProvider:1.0.0:guangzhou-idc"})
data, _ := json.Marshal(service)
t.Logf(string(data))
assert.Equal(t, 1, len(service.Hosts))
@@ -85,11 +86,11 @@ func TestNacosRegistry_Subscribe(t *testing.T) {
urlMap := url.Values{}
urlMap.Set(constant.GROUP_KEY, "guangzhou-idc")
urlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
- urlMap.Set(constant.INTERFACE_KEY, "com.ikurento.user.UserProvider")
+ urlMap.Set(constant.INTERFACE_KEY, "com.dubbo.user.UserProvider")
urlMap.Set(constant.VERSION_KEY, "1.0.0")
urlMap.Set(constant.CLUSTER_KEY, "mock")
urlMap.Set(constant.NACOS_PATH_KEY, "")
- testUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"}))
+ testUrl, _ := common.NewURL("dubbo://127.0.0.1:20000/com.dubbo.user.UserProvider", common.WithParams(urlMap), common.WithMethods([]string{"GetUser", "AddUser"}))
reg, _ := newNacosRegistry(regurl)
err := reg.Register(testUrl)
@@ -179,7 +180,7 @@ func TestNacosRegistry_Subscribe_del(t *testing.T) {
nacosReg := reg.(*nacosRegistry)
// deregister instance to mock instance offline
- _, err = nacosReg.namingClient.DeregisterInstance(vo.DeregisterInstanceParam{
+ _, err = nacosReg.namingClient.Client().DeregisterInstance(vo.DeregisterInstanceParam{
Ip: "127.0.0.2", Port: 20000,
ServiceName: "providers:com.ikurento.user.UserProvider:2.0.0:guangzhou-idc",
})
diff --git a/registry/nacos/service_discovery.go b/registry/nacos/service_discovery.go
index 93cf2cb..def1996 100644
--- a/registry/nacos/service_discovery.go
+++ b/registry/nacos/service_discovery.go
@@ -24,8 +24,8 @@ import (
import (
"github.com/dubbogo/gost/container/set"
+ nacosClient "github.com/dubbogo/gost/database/kv/nacos"
"github.com/dubbogo/gost/hash/page"
- "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
"github.com/nacos-group/nacos-sdk-go/model"
"github.com/nacos-group/nacos-sdk-go/vo"
perrors "github.com/pkg/errors"
@@ -51,21 +51,21 @@ func init() {
}
// nacosServiceDiscovery is the implementation of service discovery based on nacos.
-// There is a problem, the go client for nacos does not support the id field.
+// There is a problem, the go namingClient for nacos does not support the id field.
// we will use the metadata to store the id of ServiceInstance
type nacosServiceDiscovery struct {
group string
// descriptor is a short string about the basic information of this instance
descriptor string
- // namingClient is the Nacos' client
- namingClient naming_client.INamingClient
+ // namingClient is the Nacos' namingClient
+ namingClient *nacosClient.NacosNamingClient
// cache registry instances
registryInstances []registry.ServiceInstance
}
// Destroy will close the service discovery.
-// Actually, it only marks the naming client as null and then return
+// Actually, it only marks the naming namingClient as null and then return
func (n *nacosServiceDiscovery) Destroy() error {
for _, inst := range n.registryInstances {
err := n.Unregister(inst)
@@ -74,14 +74,14 @@ func (n *nacosServiceDiscovery) Destroy() error {
logger.Errorf("Unregister nacos instance:%+v, err:%+v", inst, err)
}
}
- n.namingClient = nil
+ n.namingClient.Close()
return nil
}
// Register will register the service to nacos
func (n *nacosServiceDiscovery) Register(instance registry.ServiceInstance) error {
ins := n.toRegisterInstance(instance)
- ok, err := n.namingClient.RegisterInstance(ins)
+ ok, err := n.namingClient.Client().RegisterInstance(ins)
if err != nil || !ok {
return perrors.WithMessage(err, "Could not register the instance. "+instance.GetServiceName())
}
@@ -104,7 +104,7 @@ func (n *nacosServiceDiscovery) Update(instance registry.ServiceInstance) error
// Unregister will unregister the instance
func (n *nacosServiceDiscovery) Unregister(instance registry.ServiceInstance) error {
- ok, err := n.namingClient.DeregisterInstance(n.toDeregisterInstance(instance))
+ ok, err := n.namingClient.Client().DeregisterInstance(n.toDeregisterInstance(instance))
if err != nil || !ok {
return perrors.WithMessage(err, "Could not unregister the instance. "+instance.GetServiceName())
}
@@ -118,7 +118,7 @@ func (n *nacosServiceDiscovery) GetDefaultPageSize() int {
// GetServices will return the all services
func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet {
- services, err := n.namingClient.GetAllServicesInfo(vo.GetAllServiceInfoParam{
+ services, err := n.namingClient.Client().GetAllServicesInfo(vo.GetAllServiceInfoParam{
GroupName: n.group,
})
@@ -136,7 +136,7 @@ func (n *nacosServiceDiscovery) GetServices() *gxset.HashSet {
// GetInstances will return the instances of serviceName and the group
func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.ServiceInstance {
- instances, err := n.namingClient.SelectAllInstances(vo.SelectAllInstancesParam{
+ instances, err := n.namingClient.Client().SelectAllInstances(vo.SelectAllInstancesParam{
ServiceName: serviceName,
GroupName: n.group,
})
@@ -162,12 +162,11 @@ func (n *nacosServiceDiscovery) GetInstances(serviceName string) []registry.Serv
Metadata: metadata,
})
}
-
return res
}
// GetInstancesByPage will return the instances
-// Due to nacos client does not support pagination, so we have to query all instances and then return part of them
+// Due to nacos namingClient does not support pagination, so we have to query all instances and then return part of them
func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset int, pageSize int) gxpage.Pager {
all := n.GetInstances(serviceName)
res := make([]interface{}, 0, pageSize)
@@ -179,7 +178,7 @@ func (n *nacosServiceDiscovery) GetInstancesByPage(serviceName string, offset in
}
// GetHealthyInstancesByPage will return the instance
-// The nacos client has an API SelectInstances, which has a parameter call HealthyOnly.
+// The nacos namingClient has an API SelectInstances, which has a parameter call HealthyOnly.
// However, the healthy parameter in this method maybe false. So we can not use that API.
// Thus, we must query all instances and then do filter
func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, offset int, pageSize int, healthy bool) gxpage.Pager {
@@ -202,7 +201,7 @@ func (n *nacosServiceDiscovery) GetHealthyInstancesByPage(serviceName string, of
}
// GetRequestInstances will return the instances
-// The nacos client doesn't have batch API, so we should query those serviceNames one by one.
+// The nacos namingClient doesn't have batch API, so we should query those serviceNames one by one.
func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offset int, requestedSize int) map[string]gxpage.Pager {
res := make(map[string]gxpage.Pager, len(serviceNames))
for _, name := range serviceNames {
@@ -215,7 +214,7 @@ func (n *nacosServiceDiscovery) GetRequestInstances(serviceNames []string, offse
func (n *nacosServiceDiscovery) AddListener(listener registry.ServiceInstancesChangedListener) error {
for _, t := range listener.GetServiceNames().Values() {
serviceName := t.(string)
- err := n.namingClient.Subscribe(&vo.SubscribeParam{
+ err := n.namingClient.Client().Subscribe(&vo.SubscribeParam{
ServiceName: serviceName,
SubscribeCallback: func(services []model.SubscribeService, err error) {
if err != nil {
@@ -283,7 +282,7 @@ func (n *nacosServiceDiscovery) toRegisterInstance(instance registry.ServiceInst
Ip: instance.GetHost(),
Port: uint64(instance.GetPort()),
Metadata: metadata,
- // We must specify the weight since Java nacos client will ignore the instance whose weight is 0
+ // We must specify the weight since Java nacos namingClient will ignore the instance whose weight is 0
Weight: 1,
Enable: instance.IsEnable(),
Healthy: instance.IsHealthy(),
@@ -338,7 +337,7 @@ func newNacosServiceDiscovery(name string) (registry.ServiceDiscovery, error) {
client, err := nacos.NewNacosClient(remoteConfig)
if err != nil {
- return nil, perrors.WithMessage(err, "create nacos client failed.")
+ return nil, perrors.WithMessage(err, "create nacos namingClient failed.")
}
descriptor := fmt.Sprintf("nacos-service-discovery[%s]", remoteConfig.Address)
diff --git a/registry/nacos/service_discovery_test.go b/registry/nacos/service_discovery_test.go
index 0e60499..03b1b9c 100644
--- a/registry/nacos/service_discovery_test.go
+++ b/registry/nacos/service_discovery_test.go
@@ -113,11 +113,12 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
assert.Nil(t, err)
err = serviceDiscovery.Register(instance)
+
assert.Nil(t, err)
// sometimes nacos may be failed to push update of instance,
// so it need 10s to pull, we sleep 10 second to make sure instance has been update
- time.Sleep(11 * time.Second)
+ time.Sleep(5 * time.Second)
page := serviceDiscovery.GetHealthyInstancesByPage(serviceName, 0, 10, true)
assert.NotNil(t, page)
assert.Equal(t, 0, page.GetOffset())
@@ -125,6 +126,7 @@ func TestNacosServiceDiscovery_CRUD(t *testing.T) {
assert.Equal(t, 1, page.GetDataSize())
instance = page.GetData()[0].(*registry.DefaultServiceInstance)
+ instance.ServiceName = serviceName
assert.NotNil(t, instance)
assert.Equal(t, id, instance.GetID())
assert.Equal(t, host, instance.GetHost())
@@ -173,7 +175,6 @@ func TestNacosServiceDiscovery_Destroy(t *testing.T) {
assert.NotNil(t, serviceDiscovery)
err = serviceDiscovery.Destroy()
assert.Nil(t, err)
- assert.Nil(t, serviceDiscovery.(*nacosServiceDiscovery).namingClient)
}
func prepareData() {
diff --git a/remoting/nacos/builder.go b/remoting/nacos/builder.go
index 4367bab..4624061 100644
--- a/remoting/nacos/builder.go
+++ b/remoting/nacos/builder.go
@@ -25,9 +25,7 @@ import (
)
import (
- "github.com/nacos-group/nacos-sdk-go/clients"
- "github.com/nacos-group/nacos-sdk-go/clients/config_client"
- "github.com/nacos-group/nacos-sdk-go/clients/naming_client"
+ nacosClient "github.com/dubbogo/gost/database/kv/nacos"
nacosConstant "github.com/nacos-group/nacos-sdk-go/common/constant"
perrors "github.com/pkg/errors"
)
@@ -38,94 +36,114 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
)
-// NewNacosConfigClient read the config from url and build an instance
-func NewNacosConfigClient(url *common.URL) (config_client.IConfigClient, error) {
- nacosConfig, err := getNacosConfig(url)
+// NewNacosConfigClientByUrl read the config from url and build an instance
+func NewNacosConfigClientByUrl(url *common.URL) (*nacosClient.NacosConfigClient, error) {
+ sc, cc, err := GetNacosConfig(url)
if err != nil {
return nil, err
}
- return clients.CreateConfigClient(nacosConfig)
+ return nacosClient.NewNacosConfigClient(getNacosClientName(), true, sc, cc)
}
-// getNacosConfig will return the nacos config
-func getNacosConfig(url *common.URL) (map[string]interface{}, error) {
+// GetNacosConfig will return the nacos config
+func GetNacosConfig(url *common.URL) ([]nacosConstant.ServerConfig, nacosConstant.ClientConfig, error) {
if url == nil {
- return nil, perrors.New("url is empty!")
+ return []nacosConstant.ServerConfig{}, nacosConstant.ClientConfig{}, perrors.New("url is empty!")
}
+
if len(url.Location) == 0 {
- return nil, perrors.New("url.location is empty!")
+ return []nacosConstant.ServerConfig{}, nacosConstant.ClientConfig{},
+ perrors.New("url.location is empty!")
}
- configMap := make(map[string]interface{}, 2)
addresses := strings.Split(url.Location, ",")
serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
- return nil, perrors.WithMessagef(err, "split [%s] ", addr)
+ return []nacosConstant.ServerConfig{}, nacosConstant.ClientConfig{},
+ perrors.WithMessagef(err, "split [%s] ", addr)
}
port, _ := strconv.Atoi(portStr)
- serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
- IpAddr: ip,
- Port: uint64(port),
- })
+ serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{IpAddr: ip, Port: uint64(port)})
}
- configMap["serverConfigs"] = serverConfigs
var clientConfig nacosConstant.ClientConfig
timeout, err := time.ParseDuration(url.GetParam(constant.REGISTRY_TIMEOUT_KEY, constant.DEFAULT_REG_TIMEOUT))
if err != nil {
- return nil, err
+ return []nacosConstant.ServerConfig{}, nacosConstant.ClientConfig{}, err
+ }
+ //enable local cache when nacos can not connect.
+ notLoadCache, err := strconv.ParseBool(url.GetParam(constant.NACOS_NOT_LOAD_LOCAL_CACHE, "true"))
+ if err != nil {
+ notLoadCache = false
}
clientConfig.TimeoutMs = uint64(timeout.Seconds() * 1000)
- clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
+ // clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
clientConfig.CacheDir = url.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
clientConfig.LogDir = url.GetParam(constant.NACOS_LOG_DIR_KEY, "")
clientConfig.Endpoint = url.GetParam(constant.NACOS_ENDPOINT, "")
clientConfig.NamespaceId = url.GetParam(constant.NACOS_NAMESPACE_ID, "")
clientConfig.Username = url.GetParam(constant.NACOS_USERNAME, "")
clientConfig.Password = url.GetParam(constant.NACOS_PASSWORD, "")
- clientConfig.NamespaceId = url.GetParam(constant.NACOS_NAMESPACE_ID, "")
- clientConfig.NotLoadCacheAtStart = true
- configMap["clientConfig"] = clientConfig
+ clientConfig.NotLoadCacheAtStart = notLoadCache
- return configMap, nil
+ return serverConfigs, clientConfig, nil
}
-// NewNacosClient creates an instance with the config
-func NewNacosClient(rc *config.RemoteConfig) (naming_client.INamingClient, error) {
+// NewNacosClient create an instance with the config
+func NewNacosClient(rc *config.RemoteConfig) (*nacosClient.NacosNamingClient, error) {
if len(rc.Address) == 0 {
return nil, perrors.New("nacos address is empty!")
}
- configMap := make(map[string]interface{}, 2)
-
addresses := strings.Split(rc.Address, ",")
- serverConfigs := make([]nacosConstant.ServerConfig, 0, len(addresses))
+ scs := make([]nacosConstant.ServerConfig, 0, len(addresses))
for _, addr := range addresses {
ip, portStr, err := net.SplitHostPort(addr)
if err != nil {
return nil, perrors.WithMessagef(err, "split [%s] ", addr)
}
port, _ := strconv.Atoi(portStr)
- serverConfigs = append(serverConfigs, nacosConstant.ServerConfig{
+ scs = append(scs, nacosConstant.ServerConfig{
IpAddr: ip,
Port: uint64(port),
})
}
- configMap["serverConfigs"] = serverConfigs
- var clientConfig nacosConstant.ClientConfig
+ var cc nacosConstant.ClientConfig
timeout := rc.Timeout()
- clientConfig.TimeoutMs = uint64(timeout.Nanoseconds() / constant.MsToNanoRate)
- clientConfig.ListenInterval = 2 * clientConfig.TimeoutMs
- clientConfig.CacheDir = rc.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
- clientConfig.LogDir = rc.GetParam(constant.NACOS_LOG_DIR_KEY, "")
- clientConfig.Endpoint = rc.Address
- clientConfig.Username = rc.Username
- clientConfig.Password = rc.Password
- clientConfig.NotLoadCacheAtStart = true
- clientConfig.NamespaceId = rc.GetParam(constant.NACOS_NAMESPACE_ID, "")
- configMap["clientConfig"] = clientConfig
+ //enable local cache when nacos can not connect.
+ notLoadCache, err := strconv.ParseBool(rc.GetParam(constant.NACOS_NOT_LOAD_LOCAL_CACHE, "true"))
+ if err != nil {
+ notLoadCache = false
+ }
+ cc.TimeoutMs = uint64(timeout.Nanoseconds() / constant.MsToNanoRate)
+ // cc.ListenInterval = 2 * cc.TimeoutMs
+ cc.CacheDir = rc.GetParam(constant.NACOS_CACHE_DIR_KEY, "")
+ cc.LogDir = rc.GetParam(constant.NACOS_LOG_DIR_KEY, "")
+ cc.Endpoint = rc.GetParam(constant.NACOS_ENDPOINT, "")
+ cc.NamespaceId = rc.GetParam(constant.NACOS_NAMESPACE_ID, "")
+ cc.Username = rc.Username
+ cc.Password = rc.Password
+ cc.NotLoadCacheAtStart = notLoadCache
- return clients.CreateNamingClient(configMap)
+ return nacosClient.NewNacosNamingClient(getNacosClientName(), true, scs, cc)
+}
+
+// NewNacosClientByUrl created
+func NewNacosClientByUrl(url *common.URL) (*nacosClient.NacosNamingClient, error) {
+ scs, cc, err := GetNacosConfig(url)
+ if err != nil {
+ return nil, err
+ }
+ return nacosClient.NewNacosNamingClient(getNacosClientName(), true, scs, cc)
+}
+
+// getNacosClientName get nacos client name
+func getNacosClientName() string {
+ name := config.GetApplicationConfig().Name
+ if len(name) > 0 {
+ return name
+ }
+ return "nacos-client"
}
diff --git a/remoting/nacos/builder_test.go b/remoting/nacos/builder_test.go
index 1f7fa81..0d43ed0 100644
--- a/remoting/nacos/builder_test.go
+++ b/remoting/nacos/builder_test.go
@@ -18,6 +18,8 @@
package nacos
import (
+ "net/url"
+ "strconv"
"testing"
)
@@ -26,6 +28,8 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/config"
)
@@ -49,3 +53,45 @@ func TestNewNacosClient(t *testing.T) {
assert.NotNil(t, client)
assert.Nil(t, err)
}
+
+func TestGetNacosConfig(t *testing.T) {
+ regurl := getRegUrl()
+ sc, cc, err := GetNacosConfig(regurl)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, sc)
+ assert.NotNil(t, cc)
+ assert.Equal(t, cc.TimeoutMs, uint64(5000))
+}
+
+func TestNewNacosConfigClient(t *testing.T) {
+
+ regurl := getRegUrl()
+ client, err := NewNacosConfigClientByUrl(regurl)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, client)
+}
+
+func TestNewNacosClientByUrl(t *testing.T) {
+ regurl := getRegUrl()
+ client, err := NewNacosClientByUrl(regurl)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, client)
+}
+
+func getRegUrl() *common.URL {
+
+ regurlMap := url.Values{}
+ regurlMap.Set(constant.ROLE_KEY, strconv.Itoa(common.PROVIDER))
+ regurlMap.Set(constant.NACOS_NOT_LOAD_LOCAL_CACHE, "true")
+ // regurlMap.Set(constant.NACOS_USERNAME, "nacos")
+ // regurlMap.Set(constant.NACOS_PASSWORD, "nacos")
+ regurlMap.Set(constant.NACOS_NAMESPACE_ID, "nacos")
+ regurlMap.Set(constant.REGISTRY_TIMEOUT_KEY, "5s")
+
+ regurl, _ := common.NewURL("registry://console.nacos.io:80", common.WithParams(regurlMap))
+
+ return regurl
+}