You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by fa...@apache.org on 2020/02/09 12:03:01 UTC
[dubbo-go] branch 1.3 updated: add zk register code
This is an automated email from the ASF dual-hosted git repository.
fangyc pushed a commit to branch 1.3
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/1.3 by this push:
new 7fcb34e add zk register code
new 5f82281 Merge pull request #355 from pantianying/fix_zkproblemto1.3
7fcb34e is described below
commit 7fcb34eb5612a5ad47c24ca4bc7aecc194f801e4
Author: pantianying <60...@qq.com>
AuthorDate: Sat Feb 8 12:34:14 2020 +0800
add zk register code
---
config_center/zookeeper/impl.go | 63 +++---
config_center/zookeeper/impl_test.go | 2 +-
go.mod | 2 +-
go.sum | 4 +-
registry/base_register.go | 375 +++++++++++++++++++++++++++++++++++
registry/etcdv3/listener.go | 10 +-
registry/etcdv3/registry.go | 264 +++---------------------
registry/etcdv3/registry_test.go | 14 +-
registry/registry.go | 12 +-
registry/zookeeper/listener.go | 18 +-
registry/zookeeper/registry.go | 354 ++++-----------------------------
registry/zookeeper/registry_test.go | 4 +-
remoting/etcdv3/client.go | 32 ++-
remoting/etcdv3/facade.go | 7 +-
remoting/etcdv3/listener.go | 9 +-
remoting/listener.go | 6 +
remoting/zookeeper/client.go | 128 ++++++++----
remoting/zookeeper/client_test.go | 11 +-
remoting/zookeeper/facade.go | 7 +-
remoting/zookeeper/facade_test.go | 4 +-
remoting/zookeeper/listener.go | 75 +++----
remoting/zookeeper/listener_test.go | 5 +-
22 files changed, 689 insertions(+), 717 deletions(-)
diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index 504d491..70fb196 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -24,8 +24,8 @@ import (
)
import (
+ "github.com/dubbogo/go-zookeeper/zk"
perrors "github.com/pkg/errors"
- "github.com/samuel/go-zookeeper/zk"
)
import (
@@ -37,7 +37,11 @@ import (
"github.com/apache/dubbo-go/remoting/zookeeper"
)
-const ZkClient = "zk config_center"
+const (
+ // ZkClient
+ //zookeeper client name
+ ZkClient = "zk config_center"
+)
type zookeeperDynamicConfiguration struct {
url *common.URL
@@ -134,10 +138,9 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
content, _, err := c.client.GetContent(c.rootPath + "/" + key)
if err != nil {
return "", perrors.WithStack(err)
- } else {
- return string(content), nil
}
+ return string(content), nil
}
//For zookeeper, getConfig and getConfigs have the same meaning.
@@ -156,57 +159,57 @@ func (c *zookeeperDynamicConfiguration) SetParser(p parser.ConfigurationParser)
c.parser = p
}
-func (r *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
- return r.client
+func (c *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
+ return c.client
}
-func (r *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
- r.client = client
+func (c *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
+ c.client = client
}
-func (r *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
- return &r.cltLock
+func (c *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
+ return &c.cltLock
}
-func (r *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
- return &r.wg
+func (c *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
+ return &c.wg
}
-func (r *zookeeperDynamicConfiguration) GetDone() chan struct{} {
- return r.done
+func (c *zookeeperDynamicConfiguration) Done() chan struct{} {
+ return c.done
}
-func (r *zookeeperDynamicConfiguration) GetUrl() common.URL {
- return *r.url
+func (c *zookeeperDynamicConfiguration) GetUrl() common.URL {
+ return *c.url
}
-func (r *zookeeperDynamicConfiguration) Destroy() {
- if r.listener != nil {
- r.listener.Close()
+func (c *zookeeperDynamicConfiguration) Destroy() {
+ if c.listener != nil {
+ c.listener.Close()
}
- close(r.done)
- r.wg.Wait()
- r.closeConfigs()
+ close(c.done)
+ c.wg.Wait()
+ c.closeConfigs()
}
-func (r *zookeeperDynamicConfiguration) IsAvailable() bool {
+func (c *zookeeperDynamicConfiguration) IsAvailable() bool {
select {
- case <-r.done:
+ case <-c.done:
return false
default:
return true
}
}
-func (r *zookeeperDynamicConfiguration) closeConfigs() {
- r.cltLock.Lock()
- defer r.cltLock.Unlock()
+func (c *zookeeperDynamicConfiguration) closeConfigs() {
+ c.cltLock.Lock()
+ defer c.cltLock.Unlock()
logger.Infof("begin to close provider zk client")
// Close the old client first to close the tmp node
- r.client.Close()
- r.client = nil
+ c.client.Close()
+ c.client = nil
}
-func (r *zookeeperDynamicConfiguration) RestartCallBack() bool {
+func (c *zookeeperDynamicConfiguration) RestartCallBack() bool {
return true
}
diff --git a/config_center/zookeeper/impl_test.go b/config_center/zookeeper/impl_test.go
index e614009..cca4427 100644
--- a/config_center/zookeeper/impl_test.go
+++ b/config_center/zookeeper/impl_test.go
@@ -24,7 +24,7 @@ import (
)
import (
- "github.com/samuel/go-zookeeper/zk"
+ "github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
diff --git a/go.mod b/go.mod
index c89b397..f5ac8e5 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
github.com/creasty/defaults v1.3.0
github.com/dubbogo/getty v1.3.2
+ github.com/dubbogo/go-zookeeper v1.0.0
github.com/dubbogo/gost v1.5.2
github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
github.com/go-errors/errors v1.0.1 // indirect
@@ -37,7 +38,6 @@ require (
github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
github.com/pkg/errors v0.8.1
github.com/prometheus/client_golang v1.1.0 // indirect
- github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
github.com/satori/go.uuid v1.2.0
github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
github.com/soheilhy/cmux v0.1.4 // indirect
diff --git a/go.sum b/go.sum
index b23cb24..0f42e86 100644
--- a/go.sum
+++ b/go.sum
@@ -104,6 +104,8 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk
github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
github.com/dubbogo/getty v1.3.2 h1:l1KVSs/1CtTKbIPTrkTtBT6S9ddvmswDGoAnnl2CDpM=
github.com/dubbogo/getty v1.3.2/go.mod h1:ANbVQ9tbpZ2b0xdR8nRrgS/oXIsZAeRxzvPSOn/7mbk=
+github.com/dubbogo/go-zookeeper v1.0.0 h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM=
+github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
github.com/dubbogo/gost v1.5.1 h1:oG5dzaWf1KYynBaBoUIOkgT+YD0niHV6xxI0Odq7hDg=
github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
@@ -410,8 +412,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 h1:7YvPJVmEeFHR1Tj9sZEYsmarJEQfMVYpd/Vyy/A8dqE=
github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
-github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
-github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
diff --git a/registry/base_register.go b/registry/base_register.go
new file mode 100644
index 0000000..5b9aef8
--- /dev/null
+++ b/registry/base_register.go
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+ "context"
+ "fmt"
+ "net/url"
+ "os"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
+)
+
+import (
+ gxnet "github.com/dubbogo/gost/net"
+ perrors "github.com/pkg/errors"
+)
+
+import (
+ "github.com/apache/dubbo-go/common"
+ "github.com/apache/dubbo-go/common/constant"
+ "github.com/apache/dubbo-go/common/logger"
+)
+
+const (
+ // RegistryConnDelay connection delay
+ RegistryConnDelay = 3
+ // MaxWaitInterval max wait interval
+ MaxWaitInterval = 3 * time.Second
+)
+
+var (
+ processID = ""
+ localIP = ""
+)
+
+func init() {
+ processID = fmt.Sprintf("%d", os.Getpid())
+ localIP, _ = gxnet.GetLocalIP()
+}
+
+/*
+ * -----------------------------------NOTICE---------------------------------------------
+ * If there is no special case, you'd better inherit BaseRegistry and implement the
+ * FacadeBasedRegistry interface instead of directly implementing the Registry interface.
+ * --------------------------------------------------------------------------------------
+ */
+
+/*
+ * FacadeBasedRegistry interface is subclass of Registry, and it is designed for registry who want to inherit BaseRegistry.
+ * You have to implement the interface to inherit BaseRegistry.
+ */
+type FacadeBasedRegistry interface {
+ Registry
+ CreatePath(string) error
+ DoRegister(string, string) error
+ DoSubscribe(conf *common.URL) (Listener, error)
+ CloseAndNilClient()
+ CloseListener()
+ InitListeners()
+}
+
+// BaseRegistry is a common logic abstract for registry. It implement Registry interface.
+type BaseRegistry struct {
+ context context.Context
+ facadeBasedRegistry FacadeBasedRegistry
+ *common.URL
+ birth int64 // time of file birth, seconds since Epoch; 0 if unknown
+ wg sync.WaitGroup // wg+done for zk restart
+ done chan struct{}
+ cltLock sync.Mutex //ctl lock is a lock for services map
+ services map[string]common.URL // service name + protocol -> service config, for store the service registered
+}
+
+// InitBaseRegistry for init some local variables and set BaseRegistry's subclass to it
+func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBasedRegistry) Registry {
+ r.URL = url
+ r.birth = time.Now().UnixNano()
+ r.done = make(chan struct{})
+ r.services = make(map[string]common.URL)
+ r.facadeBasedRegistry = facadeRegistry
+ return r
+}
+
+// GetUrl for get registry's url
+func (r *BaseRegistry) GetUrl() common.URL {
+ return *r.URL
+}
+
+// Destroy for graceful down
+func (r *BaseRegistry) Destroy() {
+ //first step close registry's all listeners
+ r.facadeBasedRegistry.CloseListener()
+ // then close r.done to notify other program who listen to it
+ close(r.done)
+ // wait waitgroup done (wait listeners outside close over)
+ r.wg.Wait()
+ //close registry client
+ r.closeRegisters()
+}
+
+// Register implement interface registry to register
+func (r *BaseRegistry) Register(conf common.URL) error {
+ var (
+ ok bool
+ err error
+ )
+ role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
+ // Check if the service has been registered
+ r.cltLock.Lock()
+ _, ok = r.services[conf.Key()]
+ r.cltLock.Unlock()
+ if ok {
+ return perrors.Errorf("Path{%s} has been registered", conf.Key())
+ }
+
+ err = r.register(conf)
+ if err != nil {
+ return perrors.WithMessagef(err, "register(conf:%+v)", conf)
+ }
+
+ r.cltLock.Lock()
+ r.services[conf.Key()] = conf
+ r.cltLock.Unlock()
+ logger.Debugf("(%sRegistry)Register(conf{%#v})", common.DubboRole[role], conf)
+
+ return nil
+}
+
+// service is for getting service path stored in url
+func (r *BaseRegistry) service(c common.URL) string {
+ return url.QueryEscape(c.Service())
+}
+
+// RestartCallBack for reregister when reconnect
+func (r *BaseRegistry) RestartCallBack() bool {
+
+ // copy r.services
+ services := []common.URL{}
+ for _, confIf := range r.services {
+ services = append(services, confIf)
+ }
+
+ flag := true
+ for _, confIf := range services {
+ err := r.register(confIf)
+ if err != nil {
+ logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
+ confIf, perrors.WithStack(err))
+ flag = false
+ break
+ }
+ logger.Infof("success to re-register service :%v", confIf.Key())
+ }
+ r.facadeBasedRegistry.InitListeners()
+
+ return flag
+}
+
+// register for register url to registry, include init params
+func (r *BaseRegistry) register(c common.URL) error {
+ var (
+ err error
+ //revision string
+ params url.Values
+ rawURL string
+ encodedURL string
+ dubboPath string
+ //conf config.URL
+ )
+ params = url.Values{}
+
+ c.RangeParams(func(key, value string) bool {
+ params.Add(key, value)
+ return true
+ })
+
+ params.Add("pid", processID)
+ params.Add("ip", localIP)
+ //params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
+
+ role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
+ switch role {
+
+ case common.PROVIDER:
+ dubboPath, rawURL, err = r.providerRegistry(c, params)
+ case common.CONSUMER:
+ dubboPath, rawURL, err = r.consumerRegistry(c, params)
+ default:
+ return perrors.Errorf("@c{%v} type is not referencer or provider", c)
+ }
+ encodedURL = url.QueryEscape(rawURL)
+ dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
+ err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL)
+
+ if err != nil {
+ return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL)
+ }
+ return nil
+}
+
+// providerRegistry for provider role do
+func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) {
+ var (
+ dubboPath string
+ rawURL string
+ err error
+ )
+ if c.Path == "" || len(c.Methods) == 0 {
+ return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
+ }
+ dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
+ r.cltLock.Lock()
+ err = r.facadeBasedRegistry.CreatePath(dubboPath)
+ r.cltLock.Unlock()
+ if err != nil {
+ logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
+ return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath)
+ }
+ params.Add("anyhost", "true")
+
+ // Dubbo java consumer to start looking for the provider url,because the category does not match,
+ // the provider will not find, causing the consumer can not start, so we use consumers.
+ // DubboRole = [...]string{"consumer", "", "", "provider"}
+ // params.Add("category", (RoleType(PROVIDER)).Role())
+ params.Add("category", (common.RoleType(common.PROVIDER)).String())
+ params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
+
+ params.Add("side", (common.RoleType(common.PROVIDER)).Role())
+
+ if len(c.Methods) == 0 {
+ params.Add("methods", strings.Join(c.Methods, ","))
+ }
+ logger.Debugf("provider url params:%#v", params)
+ var host string
+ if c.Ip == "" {
+ host = localIP + ":" + c.Port
+ } else {
+ host = c.Ip + ":" + c.Port
+ }
+
+ rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode())
+ // Print your own registration service providers.
+ dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String())
+ logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
+ return dubboPath, rawURL, nil
+}
+
+// consumerRegistry for consumer role do
+func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) {
+ var (
+ dubboPath string
+ rawURL string
+ err error
+ )
+ dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER])
+ r.cltLock.Lock()
+ err = r.facadeBasedRegistry.CreatePath(dubboPath)
+ r.cltLock.Unlock()
+ if err != nil {
+ logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
+ return "", "", perrors.WithStack(err)
+ }
+ dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
+ r.cltLock.Lock()
+ err = r.facadeBasedRegistry.CreatePath(dubboPath)
+ r.cltLock.Unlock()
+ if err != nil {
+ logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
+ return "", "", perrors.WithStack(err)
+ }
+
+ params.Add("protocol", c.Protocol)
+ params.Add("category", (common.RoleType(common.CONSUMER)).String())
+ params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
+
+ rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
+ dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String())
+
+ logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
+ return dubboPath, rawURL, nil
+}
+
+// sleepWait...
+func sleepWait(n int) {
+ wait := time.Duration((n + 1) * 2e8)
+ if wait > MaxWaitInterval {
+ wait = MaxWaitInterval
+ }
+ time.Sleep(wait)
+}
+
+// Subscribe :subscribe from registry, event will notify by notifyListener
+func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
+ n := 0
+ for {
+ n++
+ if !r.IsAvailable() {
+ logger.Warnf("event listener game over.")
+ return
+ }
+
+ listener, err := r.facadeBasedRegistry.DoSubscribe(url)
+ if err != nil {
+ if !r.IsAvailable() {
+ logger.Warnf("event listener game over.")
+ return
+ }
+ logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
+ time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
+ continue
+ }
+
+ for {
+ if serviceEvent, err := listener.Next(); err != nil {
+ logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
+ listener.Close()
+ break
+ } else {
+ logger.Infof("update begin, service event: %v", serviceEvent.String())
+ notifyListener.Notify(serviceEvent)
+ }
+
+ }
+ sleepWait(n)
+ }
+}
+
+// closeRegisters close and remove registry client and reset services map
+func (r *BaseRegistry) closeRegisters() {
+ r.cltLock.Lock()
+ defer r.cltLock.Unlock()
+ logger.Infof("begin to close provider client")
+ // Close and remove(set to nil) the registry client
+ r.facadeBasedRegistry.CloseAndNilClient()
+ // reset the services map
+ r.services = nil
+}
+
+// IsAvailable judge to is registry not closed by chan r.done
+func (r *BaseRegistry) IsAvailable() bool {
+ select {
+ case <-r.done:
+ return false
+ default:
+ return true
+ }
+}
+
+// WaitGroup open for outside add the waitgroup to add some logic before registry destroyed over(graceful down)
+func (r *BaseRegistry) WaitGroup() *sync.WaitGroup {
+ return &r.wg
+}
+
+// Done open for outside to listen the event of registry Destroy() called.
+func (r *BaseRegistry) Done() chan struct{} {
+ return r.done
+}
diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go
index 31d62fa..5ed56f6 100644
--- a/registry/etcdv3/listener.go
+++ b/registry/etcdv3/listener.go
@@ -39,6 +39,7 @@ type dataListener struct {
listener config_center.ConfigurationListener
}
+// NewRegistryDataListener ...
func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
return &dataListener{listener: listener, interestedURL: []*common.URL{}}
}
@@ -77,9 +78,10 @@ type configurationListener struct {
events chan *config_center.ConfigChangeEvent
}
+// NewConfigurationListener for listening the event of etcdv3.
func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
// add a new waiter
- reg.wg.Add(1)
+ reg.WaitGroup().Add(1)
return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
}
func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
@@ -89,7 +91,7 @@ func (l *configurationListener) Process(configType *config_center.ConfigChangeEv
func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
- case <-l.registry.done:
+ case <-l.registry.Done():
logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
return nil, perrors.New("listener stopped")
@@ -97,7 +99,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
logger.Infof("got etcd event %#v", e)
if e.ConfigType == remoting.EventTypeDel {
select {
- case <-l.registry.done:
+ case <-l.registry.Done():
logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
default:
}
@@ -108,5 +110,5 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
}
}
func (l *configurationListener) Close() {
- l.registry.wg.Done()
+ l.registry.WaitGroup().Done()
}
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index b058113..e1c2576 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -19,17 +19,13 @@ package etcdv3
import (
"fmt"
- "net/url"
- "os"
"path"
- "strconv"
"strings"
"sync"
"time"
)
import (
- gxnet "github.com/dubbogo/gost/net"
perrors "github.com/pkg/errors"
)
@@ -42,74 +38,39 @@ import (
"github.com/apache/dubbo-go/remoting/etcdv3"
)
-var (
- processID = ""
- localIP = ""
-)
-
const (
- Name = "etcdv3"
- RegistryConnDelay = 3
+ // Name module name
+ Name = "etcdv3"
)
func init() {
- processID = fmt.Sprintf("%d", os.Getpid())
- localIP, _ = gxnet.GetLocalIP()
extension.SetRegistry(Name, newETCDV3Registry)
}
type etcdV3Registry struct {
- *common.URL
- birth int64 // time of file birth, seconds since Epoch; 0 if unknown
-
- cltLock sync.Mutex
- client *etcdv3.Client
- services map[string]common.URL // service name + protocol -> service config
-
+ registry.BaseRegistry
+ cltLock sync.Mutex
+ client *etcdv3.Client
listenerLock sync.Mutex
listener *etcdv3.EventListener
dataListener *dataListener
configListener *configurationListener
-
- wg sync.WaitGroup // wg+done for etcd client restart
- done chan struct{}
}
+// Client get the etcdv3 client
func (r *etcdV3Registry) Client() *etcdv3.Client {
return r.client
}
+
+//SetClient set the etcdv3 client
func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
r.client = client
}
+
+//
func (r *etcdV3Registry) ClientLock() *sync.Mutex {
return &r.cltLock
}
-func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup {
- return &r.wg
-}
-func (r *etcdV3Registry) GetDone() chan struct{} {
- return r.done
-}
-func (r *etcdV3Registry) RestartCallBack() bool {
-
- services := []common.URL{}
- for _, confIf := range r.services {
- services = append(services, confIf)
- }
-
- flag := true
- for _, confIf := range services {
- err := r.Register(confIf)
- if err != nil {
- logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}",
- confIf, perrors.WithStack(err))
- flag = false
- break
- }
- logger.Infof("success to re-register service :%v", confIf.Key())
- }
- return flag
-}
func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
@@ -122,12 +83,9 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
logger.Infof("etcd address is: %v, timeout is: %s", url.Location, timeout.String())
- r := &etcdV3Registry{
- URL: url,
- birth: time.Now().UnixNano(),
- done: make(chan struct{}),
- services: make(map[string]common.URL),
- }
+ r := &etcdV3Registry{}
+
+ r.InitBaseRegistry(url, r)
if err := etcdv3.ValidateClient(
r,
@@ -137,89 +95,37 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
); err != nil {
return nil, err
}
+ r.WaitGroup().Add(1) //etcdv3 client start successful, then wg +1
- r.wg.Add(1)
go etcdv3.HandleClientRestart(r)
- r.listener = etcdv3.NewEventListener(r.client)
- r.configListener = NewConfigurationListener(r)
- r.dataListener = NewRegistryDataListener(r.configListener)
+ r.InitListeners()
return r, nil
}
-func (r *etcdV3Registry) GetUrl() common.URL {
- return *r.URL
-}
-
-func (r *etcdV3Registry) IsAvailable() bool {
-
- select {
- case <-r.done:
- return false
- default:
- return true
- }
+func (r *etcdV3Registry) InitListeners() {
+ r.listener = etcdv3.NewEventListener(r.client)
+ r.configListener = NewConfigurationListener(r)
+ r.dataListener = NewRegistryDataListener(r.configListener)
}
-func (r *etcdV3Registry) Destroy() {
-
- if r.configListener != nil {
- r.configListener.Close()
- }
- r.stop()
+func (r *etcdV3Registry) DoRegister(root string, node string) error {
+ return r.client.Create(path.Join(root, node), "")
}
-func (r *etcdV3Registry) stop() {
-
- close(r.done)
-
- // close current client
+func (r *etcdV3Registry) CloseAndNilClient() {
r.client.Close()
-
- r.cltLock.Lock()
r.client = nil
- r.services = nil
- r.cltLock.Unlock()
}
-func (r *etcdV3Registry) Register(svc common.URL) error {
-
- role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
- if err != nil {
- return perrors.WithMessage(err, "get registry role")
- }
-
- r.cltLock.Lock()
- if _, ok := r.services[svc.Key()]; ok {
- r.cltLock.Unlock()
- return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
- }
- r.cltLock.Unlock()
-
- switch role {
- case common.PROVIDER:
- logger.Debugf("(provider register )Register(conf{%#v})", svc)
- if err := r.registerProvider(svc); err != nil {
- return perrors.WithMessage(err, "register provider")
- }
- case common.CONSUMER:
- logger.Debugf("(consumer register )Register(conf{%#v})", svc)
- if err := r.registerConsumer(svc); err != nil {
- return perrors.WithMessage(err, "register consumer")
- }
- default:
- return perrors.New(fmt.Sprintf("unknown role %d", role))
+func (r *etcdV3Registry) CloseListener() {
+ if r.configListener != nil {
+ r.configListener.Close()
}
-
- r.cltLock.Lock()
- r.services[svc.Key()] = svc
- r.cltLock.Unlock()
- return nil
}
-func (r *etcdV3Registry) createDirIfNotExist(k string) error {
-
+func (r *etcdV3Registry) CreatePath(k string) error {
var tmpPath string
for _, str := range strings.Split(k, "/")[1:] {
tmpPath = path.Join(tmpPath, "/", str)
@@ -231,89 +137,7 @@ func (r *etcdV3Registry) createDirIfNotExist(k string) error {
return nil
}
-func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
-
- consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER])
- if err := r.createDirIfNotExist(consumersNode); err != nil {
- logger.Errorf("etcd client create path %s: %v", consumersNode, err)
- return perrors.WithMessage(err, "etcd create consumer nodes")
- }
- providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
- if err := r.createDirIfNotExist(providersNode); err != nil {
- return perrors.WithMessage(err, "create provider node")
- }
-
- params := url.Values{}
-
- params.Add("protocol", svc.Protocol)
-
- params.Add("category", (common.RoleType(common.CONSUMER)).String())
- params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
-
- encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
- dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
- if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
- return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
- }
-
- return nil
-}
-
-func (r *etcdV3Registry) registerProvider(svc common.URL) error {
-
- if len(svc.Path) == 0 || len(svc.Methods) == 0 {
- return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods))
- }
-
- var (
- urlPath string
- encodedURL string
- dubboPath string
- )
-
- providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
- if err := r.createDirIfNotExist(providersNode); err != nil {
- return perrors.WithMessage(err, "create provider node")
- }
-
- params := url.Values{}
-
- svc.RangeParams(func(key, value string) bool {
- params[key] = []string{value}
- return true
- })
- params.Add("pid", processID)
- params.Add("ip", localIP)
- params.Add("anyhost", "true")
- params.Add("category", (common.RoleType(common.PROVIDER)).String())
- params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
- params.Add("side", (common.RoleType(common.PROVIDER)).Role())
-
- if len(svc.Methods) == 0 {
- params.Add("methods", strings.Join(svc.Methods, ","))
- }
-
- logger.Debugf("provider url params:%#v", params)
- var host string
- if len(svc.Ip) == 0 {
- host = localIP + ":" + svc.Port
- } else {
- host = svc.Ip + ":" + svc.Port
- }
-
- urlPath = svc.Path
-
- encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode()))
- dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())
-
- if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
- return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
- }
-
- return nil
-}
-
-func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) {
+func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
var (
configListener *configurationListener
@@ -346,37 +170,3 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) {
return configListener, nil
}
-
-//subscribe from registry
-func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
- for {
- if !r.IsAvailable() {
- logger.Warnf("event listener game over.")
- return
- }
-
- listener, err := r.subscribe(url)
- if err != nil {
- if !r.IsAvailable() {
- logger.Warnf("event listener game over.")
- return
- }
- logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
- time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
- continue
- }
-
- for {
- if serviceEvent, err := listener.Next(); err != nil {
- logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
- listener.Close()
- return
- } else {
- logger.Infof("update begin, service event: %v", serviceEvent.String())
- notifyListener.Notify(serviceEvent)
- }
-
- }
-
- }
-}
diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go
index 6c05a8a..87cf240 100644
--- a/registry/etcdv3/registry_test.go
+++ b/registry/etcdv3/registry_test.go
@@ -46,7 +46,8 @@ func initRegistry(t *testing.T) *etcdV3Registry {
}
out := reg.(*etcdV3Registry)
- out.client.CleanKV()
+ err = out.client.CleanKV()
+ assert.NoError(t, err)
return out
}
@@ -58,6 +59,7 @@ func (suite *RegistryTestSuite) TestRegister() {
reg := initRegistry(t)
err := reg.Register(url)
+ assert.NoError(t, err)
children, _, err := reg.client.GetChildrenKVList("/dubbo/com.ikurento.user.UserProvider/providers")
if err != nil {
t.Fatal(err)
@@ -83,8 +85,9 @@ func (suite *RegistryTestSuite) TestSubscribe() {
regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
reg2 := initRegistry(t)
- reg2.Register(url)
- listener, err := reg2.subscribe(&url)
+ err = reg2.Register(url)
+ assert.NoError(t, err)
+ listener, err := reg2.DoSubscribe(&url)
if err != nil {
t.Fatal(err)
}
@@ -102,7 +105,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() {
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
reg := initRegistry(t)
- _, err := reg.subscribe(&url)
+ _, err := reg.DoSubscribe(&url)
if err != nil {
t.Fatal(err)
}
@@ -120,7 +123,8 @@ func (suite *RegistryTestSuite) TestProviderDestory() {
t := suite.T()
reg := initRegistry(t)
url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
- reg.Register(url)
+ err := reg.Register(url)
+ assert.NoError(t, err)
//listener.Close()
time.Sleep(1e9)
diff --git a/registry/registry.go b/registry/registry.go
index c7279a2..d673864 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -21,7 +21,13 @@ import (
"github.com/apache/dubbo-go/common"
)
-// Extension - Registry
+/*
+ * -----------------------------------NOTICE---------------------------------------------
+ * If there is no special case, you'd better inherit BaseRegistry and implement the
+ * FacadeBasedRegistry interface instead of directly implementing the Registry interface.
+ * --------------------------------------------------------------------------------------
+ */
+// Registry Extension - Registry
type Registry interface {
common.Node
//used for service provider calling , register services to registry
@@ -38,11 +44,13 @@ type Registry interface {
//mode2 : callback mode, subscribe with notify(notify listener).
Subscribe(*common.URL, NotifyListener)
}
+
+// NotifyListener ...
type NotifyListener interface {
Notify(*ServiceEvent)
}
-//Deprecated!
+// Listener Deprecated!
type Listener interface {
Next() (*ServiceEvent, error)
Close()
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 53a5926..e895243 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -36,18 +36,23 @@ import (
zk "github.com/apache/dubbo-go/remoting/zookeeper"
)
+// RegistryDataListener ...
type RegistryDataListener struct {
interestedURL []*common.URL
listener config_center.ConfigurationListener
}
+// NewRegistryDataListener ...
func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener {
return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}}
}
+
+// AddInterestedURL ...
func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
l.interestedURL = append(l.interestedURL, url)
}
+// DataChange ...
func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
// Intercept the last bit
index := strings.Index(eventType.Path, "/providers/")
@@ -71,6 +76,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
return false
}
+// RegistryConfigurationListener ...
type RegistryConfigurationListener struct {
client *zk.ZookeeperClient
registry *zkRegistry
@@ -79,14 +85,18 @@ type RegistryConfigurationListener struct {
closeOnce sync.Once
}
+// NewRegistryConfigurationListener for listening the event of zk.
func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
- reg.wg.Add(1)
+ reg.WaitGroup().Add(1)
return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false}
}
+
+// Process ...
func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
l.events <- configType
}
+// Next ...
func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
for {
select {
@@ -94,7 +104,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
return nil, perrors.New("listener stopped")
- case <-l.registry.done:
+ case <-l.registry.Done():
logger.Warnf("zk consumer register has quit, so zk event listener exit now.")
return nil, perrors.New("listener stopped")
@@ -111,11 +121,13 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
}
}
}
+
+// Close ...
func (l *RegistryConfigurationListener) Close() {
// ensure that the listener will be closed at most once.
l.closeOnce.Do(func() {
l.isClosed = true
- l.registry.wg.Done()
+ l.registry.WaitGroup().Done()
})
}
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index 24c4158..f4e53dc 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -18,20 +18,16 @@
package zookeeper
import (
- "context"
"fmt"
"net/url"
- "os"
- "strconv"
"strings"
"sync"
"time"
)
import (
- gxnet "github.com/dubbogo/gost/net"
+ "github.com/dubbogo/go-zookeeper/zk"
perrors "github.com/pkg/errors"
- "github.com/samuel/go-zookeeper/zk"
)
import (
@@ -44,20 +40,11 @@ import (
)
const (
- RegistryZkClient = "zk registry"
- RegistryConnDelay = 3
- MaxWaitInterval = time.Duration(3e9)
-)
-
-var (
- processID = ""
- localIP = ""
+ // RegistryZkClient zk client name
+ RegistryZkClient = "zk registry"
)
func init() {
- processID = fmt.Sprintf("%d", os.Getpid())
- localIP, _ = gxnet.GetLocalIP()
- //plugins.PluggableRegistries["zookeeper"] = newZkRegistry
extension.SetRegistry("zookeeper", newZkRegistry)
}
@@ -66,20 +53,13 @@ func init() {
/////////////////////////////////////
type zkRegistry struct {
- context context.Context
- *common.URL
- birth int64 // time of file birth, seconds since Epoch; 0 if unknown
- wg sync.WaitGroup // wg+done for zk restart
- done chan struct{}
-
- cltLock sync.Mutex
- client *zookeeper.ZookeeperClient
- services map[string]common.URL // service name + protocol -> service config
-
+ registry.BaseRegistry
+ client *zookeeper.ZookeeperClient
listenerLock sync.Mutex
listener *zookeeper.ZkEventListener
dataListener *RegistryDataListener
configListener *RegistryConfigurationListener
+ cltLock sync.Mutex
//for provider
zkPath map[string]int // key = protocol://ip:port/interface
}
@@ -89,21 +69,17 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
err error
r *zkRegistry
)
-
r = &zkRegistry{
- URL: url,
- birth: time.Now().UnixNano(),
- done: make(chan struct{}),
- services: make(map[string]common.URL),
- zkPath: make(map[string]int),
+ zkPath: make(map[string]int),
}
+ r.InitBaseRegistry(url, r)
err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
if err != nil {
return nil, err
}
+ r.WaitGroup().Add(1) //zk client start successful, then wg +1
- r.wg.Add(1)
go zookeeper.HandleClientRestart(r)
r.listener = zookeeper.NewZkEventListener(r.client)
@@ -113,10 +89,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
return r, nil
}
+// Options ...
type Options struct {
client *zookeeper.ZookeeperClient
}
+// Option ...
type Option func(*Options)
func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
@@ -128,27 +106,41 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust
)
r = &zkRegistry{
- URL: url,
- birth: time.Now().UnixNano(),
- done: make(chan struct{}),
- services: make(map[string]common.URL),
- zkPath: make(map[string]int),
+ zkPath: make(map[string]int),
}
-
+ r.InitBaseRegistry(url, r)
c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
if err != nil {
return nil, nil, err
}
- r.wg.Add(1)
+ r.WaitGroup().Add(1) //zk client start successful, then wg +1
go zookeeper.HandleClientRestart(r)
+ r.InitListeners()
+ return c, r, nil
+}
+func (r *zkRegistry) InitListeners() {
r.listener = zookeeper.NewZkEventListener(r.client)
r.configListener = NewRegistryConfigurationListener(r.client, r)
r.dataListener = NewRegistryDataListener(r.configListener)
+}
- return c, r, nil
+func (r *zkRegistry) CreatePath(path string) error {
+ return r.ZkClient().Create(path)
+}
+
+func (r *zkRegistry) DoRegister(root string, node string) error {
+ return r.registerTempZookeeperNode(root, node)
+}
+
+func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) {
+ return r.getListener(conf)
}
+func (r *zkRegistry) CloseAndNilClient() {
+ r.client.Close()
+ r.client = nil
+}
func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
return r.client
}
@@ -161,222 +153,10 @@ func (r *zkRegistry) ZkClientLock() *sync.Mutex {
return &r.cltLock
}
-func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
- return &r.wg
-}
-
-func (r *zkRegistry) GetDone() chan struct{} {
- return r.done
-}
-
-func (r *zkRegistry) GetUrl() common.URL {
- return *r.URL
-}
-
-func (r *zkRegistry) Destroy() {
+func (r *zkRegistry) CloseListener() {
if r.configListener != nil {
r.configListener.Close()
}
- close(r.done)
- r.wg.Wait()
- r.closeRegisters()
-}
-
-func (r *zkRegistry) RestartCallBack() bool {
-
- // copy r.services
- services := []common.URL{}
- for _, confIf := range r.services {
- services = append(services, confIf)
- }
-
- flag := true
- for _, confIf := range services {
- err := r.register(confIf)
- if err != nil {
- logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
- confIf, perrors.WithStack(err))
- flag = false
- break
- }
- logger.Infof("success to re-register service :%v", confIf.Key())
- }
- r.listener = zookeeper.NewZkEventListener(r.client)
- r.configListener = NewRegistryConfigurationListener(r.client, r)
- r.dataListener = NewRegistryDataListener(r.configListener)
-
- return flag
-}
-
-func (r *zkRegistry) Register(conf common.URL) error {
- var (
- ok bool
- err error
- )
- role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
- switch role {
- case common.CONSUMER:
- r.cltLock.Lock()
- _, ok = r.services[conf.Key()]
- r.cltLock.Unlock()
- if ok {
- return perrors.Errorf("Path{%s} has been registered", conf.Path)
- }
-
- err = r.register(conf)
- if err != nil {
- return perrors.WithStack(err)
- }
-
- r.cltLock.Lock()
- r.services[conf.Key()] = conf
- r.cltLock.Unlock()
- logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
-
- case common.PROVIDER:
-
- // Check if the service has been registered
- r.cltLock.Lock()
- // Note the difference between consumer and consumerZookeeperRegistry (consumer use conf.Path).
- // Because the consumer wants to provide monitoring functions for the selector,
- // the provider allows multiple groups or versions of the same service to be registered.
- _, ok = r.services[conf.Key()]
- r.cltLock.Unlock()
- if ok {
- return perrors.Errorf("Path{%s} has been registered", conf.Key())
- }
-
- err = r.register(conf)
- if err != nil {
- return perrors.WithMessagef(err, "register(conf:%+v)", conf)
- }
-
- r.cltLock.Lock()
- r.services[conf.Key()] = conf
- r.cltLock.Unlock()
-
- logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf)
- }
-
- return nil
-}
-
-func (r *zkRegistry) service(c common.URL) string {
- return url.QueryEscape(c.Service())
-}
-
-func (r *zkRegistry) register(c common.URL) error {
- var (
- err error
- //revision string
- params url.Values
- rawURL string
- encodedURL string
- dubboPath string
- //conf config.URL
- )
-
- err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
- if err != nil {
- return perrors.WithStack(err)
- }
- params = url.Values{}
-
- c.RangeParams(func(key, value string) bool {
- params.Add(key, value)
- return true
- })
-
- params.Add("pid", processID)
- params.Add("ip", localIP)
- //params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
-
- role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
- switch role {
-
- case common.PROVIDER:
-
- if c.Path == "" || len(c.Methods) == 0 {
- return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
- }
- // 先创建服务下面的provider node
- dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
- r.cltLock.Lock()
- err = r.client.Create(dubboPath)
- r.cltLock.Unlock()
- if err != nil {
- logger.Errorf("zkClient.create(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
- return perrors.WithMessagef(err, "zkclient.Create(path:%s)", dubboPath)
- }
- params.Add("anyhost", "true")
-
- // Dubbo java consumer to start looking for the provider url,because the category does not match,
- // the provider will not find, causing the consumer can not start, so we use consumers.
- // DubboRole = [...]string{"consumer", "", "", "provider"}
- // params.Add("category", (RoleType(PROVIDER)).Role())
- params.Add("category", (common.RoleType(common.PROVIDER)).String())
- params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
-
- params.Add("side", (common.RoleType(common.PROVIDER)).Role())
-
- if len(c.Methods) == 0 {
- params.Add("methods", strings.Join(c.Methods, ","))
- }
- logger.Debugf("provider zk url params:%#v", params)
- var host string
- if c.Ip == "" {
- host = localIP + ":" + c.Port
- } else {
- host = c.Ip + ":" + c.Port
- }
-
- rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode())
- encodedURL = url.QueryEscape(rawURL)
-
- // Print your own registration service providers.
- dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String())
- logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
-
- case common.CONSUMER:
- dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER])
- r.cltLock.Lock()
- err = r.client.Create(dubboPath)
- r.cltLock.Unlock()
- if err != nil {
- logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
- return perrors.WithStack(err)
- }
- dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
- r.cltLock.Lock()
- err = r.client.Create(dubboPath)
- r.cltLock.Unlock()
- if err != nil {
- logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
- return perrors.WithStack(err)
- }
-
- params.Add("protocol", c.Protocol)
-
- params.Add("category", (common.RoleType(common.CONSUMER)).String())
- params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
-
- rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
- encodedURL = url.QueryEscape(rawURL)
-
- dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String())
- logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
-
- default:
- return perrors.Errorf("@c{%v} type is not referencer or provider", c)
- }
-
- dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
- err = r.registerTempZookeeperNode(dubboPath, encodedURL)
-
- if err != nil {
- return perrors.WithMessagef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
- }
- return nil
}
func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
@@ -406,53 +186,6 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
return nil
}
-func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
- return r.getListener(conf)
-}
-func sleepWait(n int) {
- wait := time.Duration((n + 1) * 2e8)
- if wait > MaxWaitInterval {
- wait = MaxWaitInterval
- }
- time.Sleep(wait)
-}
-
-//subscribe from registry
-func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
- n := 0
- for {
- n++
- if !r.IsAvailable() {
- logger.Warnf("event listener game over.")
- return
- }
-
- listener, err := r.subscribe(url)
- if err != nil {
- if !r.IsAvailable() {
- logger.Warnf("event listener game over.")
- return
- }
- logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
- time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
- continue
- }
-
- for {
- if serviceEvent, err := listener.Next(); err != nil {
- logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
- listener.Close()
- break
- } else {
- logger.Infof("update begin, service event: %v", serviceEvent.String())
- notifyListener.Notify(serviceEvent)
- }
-
- }
- sleepWait(n)
- }
-}
-
func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
var (
zkListener *RegistryConfigurationListener
@@ -489,22 +222,3 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
return zkListener, nil
}
-
-func (r *zkRegistry) closeRegisters() {
- r.cltLock.Lock()
- defer r.cltLock.Unlock()
- logger.Infof("begin to close provider zk client")
- // Close the old client first to close the tmp node.
- r.client.Close()
- r.client = nil
- r.services = nil
-}
-
-func (r *zkRegistry) IsAvailable() bool {
- select {
- case <-r.done:
- return false
- default:
- return true
- }
-}
diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go
index 2c7bb90..5e5189c 100644
--- a/registry/zookeeper/registry_test.go
+++ b/registry/zookeeper/registry_test.go
@@ -64,7 +64,7 @@ func Test_Subscribe(t *testing.T) {
_, reg2, _ := newMockZkRegistry(®url, zookeeper.WithTestCluster(ts))
reg2.Register(url)
- listener, _ := reg2.subscribe(&url)
+ listener, _ := reg2.DoSubscribe(&url)
serviceEvent, _ := listener.Next()
assert.NoError(t, err)
@@ -85,7 +85,7 @@ func Test_ConsumerDestory(t *testing.T) {
assert.NoError(t, err)
err = reg.Register(url)
assert.NoError(t, err)
- _, err = reg.subscribe(&url)
+ _, err = reg.DoSubscribe(&url)
assert.NoError(t, err)
//listener.Close()
diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go
index 0509685..ba3ea6e 100644
--- a/remoting/etcdv3/client.go
+++ b/remoting/etcdv3/client.go
@@ -36,16 +36,22 @@ import (
)
const (
- ConnDelay = 3
- MaxFailTimes = 15
+ // ConnDelay connection dalay
+ ConnDelay = 3
+ // MaxFailTimes max failure times
+ MaxFailTimes = 15
+ // RegistryETCDV3Client client name
RegistryETCDV3Client = "etcd registry"
)
var (
+ // ErrNilETCDV3Client ...
ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
- ErrKVPairNotFound = perrors.New("k/v pair not found")
+ // ErrKVPairNotFound ...
+ ErrKVPairNotFound = perrors.New("k/v pair not found")
)
+// Options ...
type Options struct {
name string
endpoints []string
@@ -54,30 +60,38 @@ type Options struct {
heartbeat int // heartbeat second
}
+// Option ...
type Option func(*Options)
+// WithEndpoints ...
func WithEndpoints(endpoints ...string) Option {
return func(opt *Options) {
opt.endpoints = endpoints
}
}
+
+// WithName ...
func WithName(name string) Option {
return func(opt *Options) {
opt.name = name
}
}
+
+// WithTimeout ...
func WithTimeout(timeout time.Duration) Option {
return func(opt *Options) {
opt.timeout = timeout
}
}
+// WithHeartbeat ...
func WithHeartbeat(heartbeat int) Option {
return func(opt *Options) {
opt.heartbeat = heartbeat
}
}
+// ValidateClient ...
func ValidateClient(container clientFacade, opts ...Option) error {
options := &Options{
@@ -117,6 +131,7 @@ func ValidateClient(container clientFacade, opts ...Option) error {
return nil
}
+// Client ...
type Client struct {
lock sync.RWMutex
@@ -191,6 +206,7 @@ func (c *Client) stop() bool {
return false
}
+// Close ...
func (c *Client) Close() {
if c == nil {
@@ -309,6 +325,7 @@ func (c *Client) get(k string) (string, error) {
return string(resp.Kvs[0].Value), nil
}
+// CleanKV ...
func (c *Client) CleanKV() error {
c.lock.RLock()
@@ -408,10 +425,12 @@ func (c *Client) keepAliveKV(k string, v string) error {
return nil
}
+// Done ...
func (c *Client) Done() <-chan struct{} {
return c.exit
}
+// Valid ...
func (c *Client) Valid() bool {
select {
case <-c.exit:
@@ -428,6 +447,7 @@ func (c *Client) Valid() bool {
return true
}
+// Create ...
func (c *Client) Create(k string, v string) error {
err := c.put(k, v)
@@ -437,6 +457,7 @@ func (c *Client) Create(k string, v string) error {
return nil
}
+// Delete ...
func (c *Client) Delete(k string) error {
err := c.delete(k)
@@ -447,6 +468,7 @@ func (c *Client) Delete(k string) error {
return nil
}
+// RegisterTemp ...
func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
completeKey := path.Join(basePath, node)
@@ -459,6 +481,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
return completeKey, nil
}
+// GetChildrenKVList ...
func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
kList, vList, err := c.getChildren(k)
@@ -468,6 +491,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
return kList, vList, nil
}
+// Get ...
func (c *Client) Get(k string) (string, error) {
v, err := c.get(k)
@@ -478,6 +502,7 @@ func (c *Client) Get(k string) (string, error) {
return v, nil
}
+// Watch ...
func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
wc, err := c.watch(k)
@@ -487,6 +512,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
return wc, nil
}
+// WatchWithPrefix ...
func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
wc, err := c.watchWithPrefix(prefix)
diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go
index 499044b..35befc8 100644
--- a/remoting/etcdv3/facade.go
+++ b/remoting/etcdv3/facade.go
@@ -38,11 +38,12 @@ type clientFacade interface {
SetClient(*Client)
ClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container
- GetDone() chan struct{} //for etcd client control
+ Done() chan struct{} //for etcd client control
RestartCallBack() bool
common.Node
}
+// HandleClientRestart ...
func HandleClientRestart(r clientFacade) {
var (
@@ -54,7 +55,7 @@ func HandleClientRestart(r clientFacade) {
LOOP:
for {
select {
- case <-r.GetDone():
+ case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
break LOOP
// re-register all services
@@ -71,7 +72,7 @@ LOOP:
failTimes = 0
for {
select {
- case <-r.GetDone():
+ case <-r.Done():
logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go
index a4d5805..a51a68b 100644
--- a/remoting/etcdv3/listener.go
+++ b/remoting/etcdv3/listener.go
@@ -33,6 +33,7 @@ import (
"github.com/apache/dubbo-go/remoting"
)
+// EventListener ...
type EventListener struct {
client *Client
keyMapLock sync.Mutex
@@ -40,6 +41,7 @@ type EventListener struct {
wg sync.WaitGroup
}
+// NewEventListener ...
func NewEventListener(client *Client) *EventListener {
return &EventListener{
client: client,
@@ -47,7 +49,7 @@ func NewEventListener(client *Client) *EventListener {
}
}
-// Listen on a spec key
+// ListenServiceNodeEvent Listen on a spec key
// this method will return true when spec key deleted,
// this method will return false when deep layer connection lose
func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
@@ -134,7 +136,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin
panic("unreachable")
}
-// Listen on a set of key with spec prefix
+// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix
func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
l.wg.Add(1)
@@ -180,7 +182,7 @@ func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
-// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
+// ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
@@ -229,6 +231,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
}(key)
}
+// Close ...
func (l *EventListener) Close() {
l.wg.Wait()
}
diff --git a/remoting/listener.go b/remoting/listener.go
index 8d1e357..3713ba0 100644
--- a/remoting/listener.go
+++ b/remoting/listener.go
@@ -21,6 +21,7 @@ import (
"fmt"
)
+// DataListener ...
type DataListener interface {
DataChange(eventType Event) bool //bool is return for interface implement is interesting
}
@@ -29,11 +30,15 @@ type DataListener interface {
// event type
//////////////////////////////////////////
+// EventType ...
type EventType int
const (
+ // EventTypeAdd ...
EventTypeAdd = iota
+ // EventTypeDel ...
EventTypeDel
+ // EventTypeUpdate ...
EventTypeUpdate
)
@@ -51,6 +56,7 @@ func (t EventType) String() string {
// service event
//////////////////////////////////////////
+// Event ...
type Event struct {
Path string
Action EventType
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 19d6529..f95231b 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -25,8 +25,8 @@ import (
)
import (
+ "github.com/dubbogo/go-zookeeper/zk"
perrors "github.com/pkg/errors"
- "github.com/samuel/go-zookeeper/zk"
)
import (
@@ -35,14 +35,19 @@ import (
)
const (
- ConnDelay = 3
+ // ConnDelay connection delay interval
+ ConnDelay = 3
+ // MaxFailTimes max fail times
MaxFailTimes = 15
)
var (
errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
+ errNilChildren = perrors.Errorf("has none children")
+ errNilNode = perrors.Errorf("node does not exist")
)
+// ZookeeperClient ...
type ZookeeperClient struct {
name string
ZkAddrs []string
@@ -54,6 +59,7 @@ type ZookeeperClient struct {
eventRegistry map[string][]*chan struct{}
}
+// StateToString ...
func StateToString(state zk.State) string {
switch state {
case zk.StateDisconnected:
@@ -85,6 +91,7 @@ func StateToString(state zk.State) string {
return "zookeeper unknown state"
}
+// Options ...
type Options struct {
zkName string
client *ZookeeperClient
@@ -92,14 +99,17 @@ type Options struct {
ts *zk.TestCluster
}
+// Option ...
type Option func(*Options)
+// WithZkName ...
func WithZkName(name string) Option {
return func(opt *Options) {
opt.zkName = name
}
}
+// ValidateZookeeperClient ...
func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
var (
err error
@@ -173,12 +183,14 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*
return z, nil
}
+// WithTestCluster ...
func WithTestCluster(ts *zk.TestCluster) Option {
return func(opt *Options) {
opt.ts = ts
}
}
+// NewMockZookeeperClient ...
func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
var (
err error
@@ -224,6 +236,7 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
return ts, z, event, nil
}
+// HandleZkEvent ...
func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
var (
state int
@@ -248,11 +261,13 @@ LOOP:
logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
z.stop()
z.Lock()
- if z.Conn != nil {
- z.Conn.Close()
- z.Conn = nil
- }
+ conn := z.Conn
+ z.Conn = nil
z.Unlock()
+ if conn != nil {
+ conn.Close()
+ }
+
break LOOP
case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
@@ -282,6 +297,7 @@ LOOP:
}
}
+// RegisterEvent ...
func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" || event == nil {
return
@@ -296,6 +312,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
z.Unlock()
}
+// UnregisterEvent ...
func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
if zkPath == "" {
return
@@ -322,6 +339,7 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
}
}
+// Done ...
func (z *ZookeeperClient) Done() <-chan struct{} {
return z.exit
}
@@ -337,6 +355,7 @@ func (z *ZookeeperClient) stop() bool {
return false
}
+// ZkConnValid ...
func (z *ZookeeperClient) ZkConnValid() bool {
select {
case <-z.exit:
@@ -354,6 +373,7 @@ func (z *ZookeeperClient) ZkConnValid() bool {
return valid
}
+// Close ...
func (z *ZookeeperClient) Close() {
if z == nil {
return
@@ -362,14 +382,17 @@ func (z *ZookeeperClient) Close() {
z.stop()
z.Wait.Wait()
z.Lock()
- if z.Conn != nil {
- z.Conn.Close()
- z.Conn = nil
- }
+ conn := z.Conn
+ z.Conn = nil
z.Unlock()
+ if conn != nil {
+ conn.Close()
+ }
+
logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
}
+// Create ...
func (z *ZookeeperClient) Create(basePath string) error {
var (
err error
@@ -381,10 +404,12 @@ func (z *ZookeeperClient) Create(basePath string) error {
tmpPath = path.Join(tmpPath, "/", str)
err = errNilZkClientConn
z.Lock()
- if z.Conn != nil {
- _, err = z.Conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
- }
+ conn := z.Conn
z.Unlock()
+ if conn != nil {
+ _, err = conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
+ }
+
if err != nil {
if err == zk.ErrNodeExists {
logger.Infof("zk.create(\"%s\") exists\n", tmpPath)
@@ -398,6 +423,7 @@ func (z *ZookeeperClient) Create(basePath string) error {
return nil
}
+// Delete ...
func (z *ZookeeperClient) Delete(basePath string) error {
var (
err error
@@ -405,14 +431,16 @@ func (z *ZookeeperClient) Delete(basePath string) error {
err = errNilZkClientConn
z.Lock()
- if z.Conn != nil {
- err = z.Conn.Delete(basePath, -1)
- }
+ conn := z.Conn
z.Unlock()
+ if conn != nil {
+ err = conn.Delete(basePath, -1)
+ }
return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
}
+// RegisterTemp ...
func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
var (
err error
@@ -425,10 +453,12 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
data = []byte("")
zkPath = path.Join(basePath) + "/" + node
z.Lock()
- if z.Conn != nil {
- tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
- }
+ conn := z.Conn
z.Unlock()
+ if conn != nil {
+ tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
+ }
+
//if err != nil && err != zk.ErrNodeExists {
if err != nil {
logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
@@ -439,6 +469,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
return tmpPath, nil
}
+// RegisterTempSeq ...
func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
var (
err error
@@ -447,15 +478,17 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
err = errNilZkClientConn
z.Lock()
- if z.Conn != nil {
- tmpPath, err = z.Conn.Create(
+ conn := z.Conn
+ z.Unlock()
+ if conn != nil {
+ tmpPath, err = conn.Create(
path.Join(basePath)+"/",
data,
zk.FlagEphemeral|zk.FlagSequence,
zk.WorldACL(zk.PermAll),
)
}
- z.Unlock()
+
logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath)
if err != nil && err != zk.ErrNodeExists {
logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)\n",
@@ -467,37 +500,44 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
return tmpPath, nil
}
+// GetChildrenW ...
func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
var (
err error
children []string
stat *zk.Stat
- event <-chan zk.Event
+ watcher *zk.Watcher
)
err = errNilZkClientConn
z.Lock()
- if z.Conn != nil {
- children, stat, event, err = z.Conn.ChildrenW(path)
- }
+ conn := z.Conn
z.Unlock()
+ if conn != nil {
+ children, stat, watcher, err = conn.ChildrenW(path)
+ }
+
if err != nil {
+ if err == zk.ErrNoChildrenForEphemerals {
+ return nil, nil, errNilChildren
+ }
if err == zk.ErrNoNode {
- return nil, nil, perrors.Errorf("path{%s} has none children", path)
+ return nil, nil, errNilNode
}
logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path)
}
if stat == nil {
- return nil, nil, perrors.Errorf("path{%s} has none children", path)
+ return nil, nil, perrors.Errorf("path{%s} get stat is nil", path)
}
if len(children) == 0 {
- return nil, nil, perrors.Errorf("path{%s} has none children", path)
+ return nil, nil, errNilChildren
}
- return children, event, nil
+ return children, watcher.EvtCh, nil
}
+// GetChildren ...
func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
var (
err error
@@ -507,10 +547,12 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
err = errNilZkClientConn
z.Lock()
- if z.Conn != nil {
- children, stat, err = z.Conn.Children(path)
- }
+ conn := z.Conn
z.Unlock()
+ if conn != nil {
+ children, stat, err = conn.Children(path)
+ }
+
if err != nil {
if err == zk.ErrNoNode {
return nil, perrors.Errorf("path{%s} has none children", path)
@@ -522,25 +564,28 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
return nil, perrors.Errorf("path{%s} has none children", path)
}
if len(children) == 0 {
- return nil, perrors.Errorf("path{%s} has none children", path)
+ return nil, errNilChildren
}
return children, nil
}
+// ExistW ...
func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
var (
- exist bool
- err error
- event <-chan zk.Event
+ exist bool
+ err error
+ watcher *zk.Watcher
)
err = errNilZkClientConn
z.Lock()
- if z.Conn != nil {
- exist, _, event, err = z.Conn.ExistsW(zkPath)
- }
+ conn := z.Conn
z.Unlock()
+ if conn != nil {
+ exist, _, watcher, err = conn.ExistsW(zkPath)
+ }
+
if err != nil {
logger.Warnf("zkClient{%s}.ExistsW(path{%s}) = error{%v}.", z.name, zkPath, perrors.WithStack(err))
return nil, perrors.WithMessagef(err, "zk.ExistsW(path:%s)", zkPath)
@@ -550,9 +595,10 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
return nil, perrors.Errorf("zkClient{%s} App zk path{%s} does not exist.", z.name, zkPath)
}
- return event, nil
+ return watcher.EvtCh, nil
}
+// GetContent ...
func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
return z.Conn.Get(zkPath)
}
diff --git a/remoting/zookeeper/client_test.go b/remoting/zookeeper/client_test.go
index f1bd0c2..cb41eb3 100644
--- a/remoting/zookeeper/client_test.go
+++ b/remoting/zookeeper/client_test.go
@@ -24,7 +24,7 @@ import (
)
import (
- "github.com/samuel/go-zookeeper/zk"
+ "github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
@@ -133,3 +133,12 @@ func TestRegisterTempSeq(t *testing.T) {
states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
verifyEventStateOrder(t, event, states, "event channel")
}
+
+func Test_UnregisterEvent(t *testing.T) {
+ client := &ZookeeperClient{}
+ client.eventRegistry = make(map[string][]*chan struct{})
+ array := []*chan struct{}{}
+ array = append(array, new(chan struct{}))
+ client.eventRegistry["test"] = array
+ client.UnregisterEvent("test", new(chan struct{}))
+}
diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go
index cdc7ead..055db4f 100644
--- a/remoting/zookeeper/facade.go
+++ b/remoting/zookeeper/facade.go
@@ -35,11 +35,12 @@ type zkClientFacade interface {
SetZkClient(*ZookeeperClient)
ZkClientLock() *sync.Mutex
WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
- GetDone() chan struct{} //for zk client control
+ Done() chan struct{} //for zk client control
RestartCallBack() bool
common.Node
}
+// HandleClientRestart ...
func HandleClientRestart(r zkClientFacade) {
var (
err error
@@ -51,7 +52,7 @@ func HandleClientRestart(r zkClientFacade) {
LOOP:
for {
select {
- case <-r.GetDone():
+ case <-r.Done():
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
// re-register all services
@@ -67,7 +68,7 @@ LOOP:
failTimes = 0
for {
select {
- case <-r.GetDone():
+ case <-r.Done():
logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
break LOOP
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection zk.
diff --git a/remoting/zookeeper/facade_test.go b/remoting/zookeeper/facade_test.go
index 58e0d69..175d758 100644
--- a/remoting/zookeeper/facade_test.go
+++ b/remoting/zookeeper/facade_test.go
@@ -24,7 +24,7 @@ import (
"time"
)
import (
- "github.com/samuel/go-zookeeper/zk"
+ "github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
import (
@@ -55,7 +55,7 @@ func (r *mockFacade) WaitGroup() *sync.WaitGroup {
return &r.wg
}
-func (r *mockFacade) GetDone() chan struct{} {
+func (r *mockFacade) Done() chan struct{} {
return r.done
}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 0b9db5e..4493c06 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -19,15 +19,14 @@ package zookeeper
import (
"path"
- "strings"
"sync"
"time"
)
import (
"github.com/dubbogo/getty"
+ "github.com/dubbogo/go-zookeeper/zk"
perrors "github.com/pkg/errors"
- "github.com/samuel/go-zookeeper/zk"
)
import (
@@ -35,6 +34,7 @@ import (
"github.com/apache/dubbo-go/remoting"
)
+// ZkEventListener ...
type ZkEventListener struct {
client *ZookeeperClient
pathMapLock sync.Mutex
@@ -42,6 +42,7 @@ type ZkEventListener struct {
wg sync.WaitGroup
}
+// NewZkEventListener ...
func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
@@ -49,10 +50,12 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
}
}
+// SetClient ...
func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
l.client = client
}
+// ListenServiceNodeEvent ...
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
l.wg.Add(1)
defer l.wg.Done()
@@ -107,8 +110,17 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
- logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
- return
+ if err == errNilChildren {
+ content, _, err := l.client.Conn.Get(zkPath)
+ if err != nil {
+ logger.Errorf("Get new node path {%v} 's content error,message is {%v}", zkPath, perrors.WithStack(err))
+ } else {
+ listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
+ }
+
+ } else {
+ logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
+ }
}
// a node was added -- listen the new node
@@ -178,7 +190,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
if MaxFailTimes <= failTimes {
failTimes = MaxFailTimes
}
- logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
+ logger.Infof("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
// clear the event channel
CLEAR:
for {
@@ -189,6 +201,11 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
}
}
l.client.RegisterEvent(zkPath, &event)
+ if err == errNilNode {
+ logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkPath)
+ l.client.UnregisterEvent(zkPath, &event)
+ return
+ }
select {
case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
l.client.UnregisterEvent(zkPath, &event)
@@ -263,56 +280,11 @@ func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
-// this func is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
+// ListenServiceEvent is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
// registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
// |
// --------> ListenServiceNodeEvent
func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
- var (
- err error
- dubboPath string
- children []string
- )
-
- zkPath = strings.ReplaceAll(zkPath, "$", "%24")
- l.pathMapLock.Lock()
- _, ok := l.pathMap[zkPath]
- l.pathMapLock.Unlock()
- if ok {
- logger.Warnf("@zkPath %s has already been listened.", zkPath)
- return
- }
-
- l.pathMapLock.Lock()
- l.pathMap[zkPath] = struct{}{}
- l.pathMapLock.Unlock()
-
- logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
- children, err = l.client.GetChildren(zkPath)
- if err != nil {
- children = nil
- logger.Warnf("fail to get children of zk path{%s}", zkPath)
- }
-
- for _, c := range children {
- // listen l service node
- dubboPath = path.Join(zkPath, c)
- content, _, err := l.client.Conn.Get(dubboPath)
- if err != nil {
- logger.Errorf("Get new node path {%v} 's content error,message is {%v}", dubboPath, perrors.WithStack(err))
- }
- if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
- continue
- }
- logger.Infof("listen dubbo service key{%s}", dubboPath)
- go func(zkPath string, listener remoting.DataListener) {
- if l.ListenServiceNodeEvent(zkPath) {
- listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
- }
- logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
- }(dubboPath, listener)
- }
-
logger.Infof("listen dubbo path{%s}", zkPath)
go func(zkPath string, listener remoting.DataListener) {
l.listenDirEvent(zkPath, listener)
@@ -324,6 +296,7 @@ func (l *ZkEventListener) valid() bool {
return l.client.ZkConnValid()
}
+// Close ...
func (l *ZkEventListener) Close() {
l.wg.Wait()
}
diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go
index aa627c7..43e9aca 100644
--- a/remoting/zookeeper/listener_test.go
+++ b/remoting/zookeeper/listener_test.go
@@ -24,7 +24,7 @@ import (
"time"
)
import (
- "github.com/samuel/go-zookeeper/zk"
+ "github.com/dubbogo/go-zookeeper/zk"
"github.com/stretchr/testify/assert"
)
import (
@@ -97,12 +97,11 @@ func TestListener(t *testing.T) {
listener := NewZkEventListener(client)
dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
listener.ListenServiceEvent("/dubbo", dataListener)
-
+ time.Sleep(1 * time.Second)
_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
assert.NoError(t, err)
wait.Wait()
assert.Equal(t, changedData, dataListener.eventList[1].Content)
- client.Close()
}