You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by fa...@apache.org on 2020/02/09 12:03:01 UTC

[dubbo-go] branch 1.3 updated: add zk register code

This is an automated email from the ASF dual-hosted git repository.

fangyc pushed a commit to branch 1.3
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.3 by this push:
     new 7fcb34e  add zk register code
     new 5f82281  Merge pull request #355 from pantianying/fix_zkproblemto1.3
7fcb34e is described below

commit 7fcb34eb5612a5ad47c24ca4bc7aecc194f801e4
Author: pantianying <60...@qq.com>
AuthorDate: Sat Feb 8 12:34:14 2020 +0800

    add zk register code
---
 config_center/zookeeper/impl.go      |  63 +++---
 config_center/zookeeper/impl_test.go |   2 +-
 go.mod                               |   2 +-
 go.sum                               |   4 +-
 registry/base_register.go            | 375 +++++++++++++++++++++++++++++++++++
 registry/etcdv3/listener.go          |  10 +-
 registry/etcdv3/registry.go          | 264 +++---------------------
 registry/etcdv3/registry_test.go     |  14 +-
 registry/registry.go                 |  12 +-
 registry/zookeeper/listener.go       |  18 +-
 registry/zookeeper/registry.go       | 354 ++++-----------------------------
 registry/zookeeper/registry_test.go  |   4 +-
 remoting/etcdv3/client.go            |  32 ++-
 remoting/etcdv3/facade.go            |   7 +-
 remoting/etcdv3/listener.go          |   9 +-
 remoting/listener.go                 |   6 +
 remoting/zookeeper/client.go         | 128 ++++++++----
 remoting/zookeeper/client_test.go    |  11 +-
 remoting/zookeeper/facade.go         |   7 +-
 remoting/zookeeper/facade_test.go    |   4 +-
 remoting/zookeeper/listener.go       |  75 +++----
 remoting/zookeeper/listener_test.go  |   5 +-
 22 files changed, 689 insertions(+), 717 deletions(-)

diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index 504d491..70fb196 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -24,8 +24,8 @@ import (
 )
 
 import (
+	"github.com/dubbogo/go-zookeeper/zk"
 	perrors "github.com/pkg/errors"
-	"github.com/samuel/go-zookeeper/zk"
 )
 
 import (
@@ -37,7 +37,11 @@ import (
 	"github.com/apache/dubbo-go/remoting/zookeeper"
 )
 
-const ZkClient = "zk config_center"
+const (
+	// ZkClient
+	//zookeeper client name
+	ZkClient = "zk config_center"
+)
 
 type zookeeperDynamicConfiguration struct {
 	url      *common.URL
@@ -134,10 +138,9 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
 	content, _, err := c.client.GetContent(c.rootPath + "/" + key)
 	if err != nil {
 		return "", perrors.WithStack(err)
-	} else {
-		return string(content), nil
 	}
 
+	return string(content), nil
 }
 
 //For zookeeper, getConfig and getConfigs have the same meaning.
@@ -156,57 +159,57 @@ func (c *zookeeperDynamicConfiguration) SetParser(p parser.ConfigurationParser)
 	c.parser = p
 }
 
-func (r *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
-	return r.client
+func (c *zookeeperDynamicConfiguration) ZkClient() *zookeeper.ZookeeperClient {
+	return c.client
 }
 
-func (r *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
-	r.client = client
+func (c *zookeeperDynamicConfiguration) SetZkClient(client *zookeeper.ZookeeperClient) {
+	c.client = client
 }
 
-func (r *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
-	return &r.cltLock
+func (c *zookeeperDynamicConfiguration) ZkClientLock() *sync.Mutex {
+	return &c.cltLock
 }
 
-func (r *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
-	return &r.wg
+func (c *zookeeperDynamicConfiguration) WaitGroup() *sync.WaitGroup {
+	return &c.wg
 }
 
-func (r *zookeeperDynamicConfiguration) GetDone() chan struct{} {
-	return r.done
+func (c *zookeeperDynamicConfiguration) Done() chan struct{} {
+	return c.done
 }
 
-func (r *zookeeperDynamicConfiguration) GetUrl() common.URL {
-	return *r.url
+func (c *zookeeperDynamicConfiguration) GetUrl() common.URL {
+	return *c.url
 }
 
-func (r *zookeeperDynamicConfiguration) Destroy() {
-	if r.listener != nil {
-		r.listener.Close()
+func (c *zookeeperDynamicConfiguration) Destroy() {
+	if c.listener != nil {
+		c.listener.Close()
 	}
-	close(r.done)
-	r.wg.Wait()
-	r.closeConfigs()
+	close(c.done)
+	c.wg.Wait()
+	c.closeConfigs()
 }
 
-func (r *zookeeperDynamicConfiguration) IsAvailable() bool {
+func (c *zookeeperDynamicConfiguration) IsAvailable() bool {
 	select {
-	case <-r.done:
+	case <-c.done:
 		return false
 	default:
 		return true
 	}
 }
 
-func (r *zookeeperDynamicConfiguration) closeConfigs() {
-	r.cltLock.Lock()
-	defer r.cltLock.Unlock()
+func (c *zookeeperDynamicConfiguration) closeConfigs() {
+	c.cltLock.Lock()
+	defer c.cltLock.Unlock()
 	logger.Infof("begin to close provider zk client")
 	// Close the old client first to close the tmp node
-	r.client.Close()
-	r.client = nil
+	c.client.Close()
+	c.client = nil
 }
 
-func (r *zookeeperDynamicConfiguration) RestartCallBack() bool {
+func (c *zookeeperDynamicConfiguration) RestartCallBack() bool {
 	return true
 }
diff --git a/config_center/zookeeper/impl_test.go b/config_center/zookeeper/impl_test.go
index e614009..cca4427 100644
--- a/config_center/zookeeper/impl_test.go
+++ b/config_center/zookeeper/impl_test.go
@@ -24,7 +24,7 @@ import (
 )
 
 import (
-	"github.com/samuel/go-zookeeper/zk"
+	"github.com/dubbogo/go-zookeeper/zk"
 	"github.com/stretchr/testify/assert"
 )
 
diff --git a/go.mod b/go.mod
index c89b397..f5ac8e5 100644
--- a/go.mod
+++ b/go.mod
@@ -13,6 +13,7 @@ require (
 	github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f // indirect
 	github.com/creasty/defaults v1.3.0
 	github.com/dubbogo/getty v1.3.2
+	github.com/dubbogo/go-zookeeper v1.0.0
 	github.com/dubbogo/gost v1.5.2
 	github.com/fastly/go-utils v0.0.0-20180712184237-d95a45783239 // indirect
 	github.com/go-errors/errors v1.0.1 // indirect
@@ -37,7 +38,6 @@ require (
 	github.com/nacos-group/nacos-sdk-go v0.0.0-20190723125407-0242d42e3dbb
 	github.com/pkg/errors v0.8.1
 	github.com/prometheus/client_golang v1.1.0 // indirect
-	github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec
 	github.com/satori/go.uuid v1.2.0
 	github.com/smartystreets/goconvey v0.0.0-20190710185942-9d28bd7c0945 // indirect
 	github.com/soheilhy/cmux v0.1.4 // indirect
diff --git a/go.sum b/go.sum
index b23cb24..0f42e86 100644
--- a/go.sum
+++ b/go.sum
@@ -104,6 +104,8 @@ github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk
 github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk=
 github.com/dubbogo/getty v1.3.2 h1:l1KVSs/1CtTKbIPTrkTtBT6S9ddvmswDGoAnnl2CDpM=
 github.com/dubbogo/getty v1.3.2/go.mod h1:ANbVQ9tbpZ2b0xdR8nRrgS/oXIsZAeRxzvPSOn/7mbk=
+github.com/dubbogo/go-zookeeper v1.0.0 h1:RsYdlGwhDW+iKXM3eIIcvt34P2swLdmQfuIJxsHlGoM=
+github.com/dubbogo/go-zookeeper v1.0.0/go.mod h1:fn6n2CAEer3novYgk9ULLwAjuV8/g4DdC2ENwRb6E+c=
 github.com/dubbogo/gost v1.5.1 h1:oG5dzaWf1KYynBaBoUIOkgT+YD0niHV6xxI0Odq7hDg=
 github.com/dubbogo/gost v1.5.1/go.mod h1:pPTjVyoJan3aPxBPNUX0ADkXjPibLo+/Ib0/fADXSG8=
 github.com/dubbogo/gost v1.5.2 h1:ri/03971hdpnn3QeCU+4UZgnRNGDXLDGDucR/iozZm8=
@@ -410,8 +412,6 @@ github.com/rogpeppe/fastuuid v0.0.0-20150106093220-6724a57986af/go.mod h1:XWv6So
 github.com/ryanuber/columnize v0.0.0-20160712163229-9b3edd62028f/go.mod h1:sm1tb6uqfes/u+d4ooFouqFdy9/2g9QGwK3SQygK0Ts=
 github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735 h1:7YvPJVmEeFHR1Tj9sZEYsmarJEQfMVYpd/Vyy/A8dqE=
 github.com/ryanuber/go-glob v0.0.0-20170128012129-256dc444b735/go.mod h1:807d1WSdnB0XRJzKNil9Om6lcp/3a0v4qIHxIXzX/Yc=
-github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec h1:6ncX5ko6B9LntYM0YBRXkiSaZMmLYeZ/NWcmeB43mMY=
-github.com/samuel/go-zookeeper v0.0.0-20180130194729-c4fab1ac1bec/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E=
 github.com/satori/go.uuid v1.2.0 h1:0uYX9dsZ2yD7q2RtLRtPSdGDWzjeM3TbMJP9utgA0ww=
 github.com/satori/go.uuid v1.2.0/go.mod h1:dA0hQrYB0VpLJoorglMZABFdXlWrHn1NEOzdhQKdks0=
 github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I=
diff --git a/registry/base_register.go b/registry/base_register.go
new file mode 100644
index 0000000..5b9aef8
--- /dev/null
+++ b/registry/base_register.go
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package registry
+
+import (
+	"context"
+	"fmt"
+	"net/url"
+	"os"
+	"strconv"
+	"strings"
+	"sync"
+	"time"
+)
+
+import (
+	gxnet "github.com/dubbogo/gost/net"
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/logger"
+)
+
+const (
+	// RegistryConnDelay connection delay
+	RegistryConnDelay = 3
+	// MaxWaitInterval max wait interval
+	MaxWaitInterval = 3 * time.Second
+)
+
+var (
+	processID = ""
+	localIP   = ""
+)
+
+func init() {
+	processID = fmt.Sprintf("%d", os.Getpid())
+	localIP, _ = gxnet.GetLocalIP()
+}
+
+/*
+ * -----------------------------------NOTICE---------------------------------------------
+ * If there is no special case, you'd better inherit BaseRegistry and implement the
+ * FacadeBasedRegistry interface instead of directly implementing the Registry interface.
+ * --------------------------------------------------------------------------------------
+ */
+
+/*
+ * FacadeBasedRegistry interface is subclass of Registry, and it is designed for registry who want to inherit BaseRegistry.
+ * You have to implement the interface to inherit BaseRegistry.
+ */
+type FacadeBasedRegistry interface {
+	Registry
+	CreatePath(string) error
+	DoRegister(string, string) error
+	DoSubscribe(conf *common.URL) (Listener, error)
+	CloseAndNilClient()
+	CloseListener()
+	InitListeners()
+}
+
+// BaseRegistry is a common logic abstract for registry. It implement Registry interface.
+type BaseRegistry struct {
+	context             context.Context
+	facadeBasedRegistry FacadeBasedRegistry
+	*common.URL
+	birth    int64          // time of file birth, seconds since Epoch; 0 if unknown
+	wg       sync.WaitGroup // wg+done for zk restart
+	done     chan struct{}
+	cltLock  sync.Mutex            //ctl lock is a lock for services map
+	services map[string]common.URL // service name + protocol -> service config, for store the service registered
+}
+
+// InitBaseRegistry for init some local variables and set BaseRegistry's subclass to it
+func (r *BaseRegistry) InitBaseRegistry(url *common.URL, facadeRegistry FacadeBasedRegistry) Registry {
+	r.URL = url
+	r.birth = time.Now().UnixNano()
+	r.done = make(chan struct{})
+	r.services = make(map[string]common.URL)
+	r.facadeBasedRegistry = facadeRegistry
+	return r
+}
+
+// GetUrl for get registry's url
+func (r *BaseRegistry) GetUrl() common.URL {
+	return *r.URL
+}
+
+// Destroy for graceful down
+func (r *BaseRegistry) Destroy() {
+	//first step close registry's all listeners
+	r.facadeBasedRegistry.CloseListener()
+	// then close r.done to notify other program who listen to it
+	close(r.done)
+	// wait waitgroup done (wait listeners outside close over)
+	r.wg.Wait()
+	//close registry client
+	r.closeRegisters()
+}
+
+// Register implement interface registry to register
+func (r *BaseRegistry) Register(conf common.URL) error {
+	var (
+		ok  bool
+		err error
+	)
+	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
+	// Check if the service has been registered
+	r.cltLock.Lock()
+	_, ok = r.services[conf.Key()]
+	r.cltLock.Unlock()
+	if ok {
+		return perrors.Errorf("Path{%s} has been registered", conf.Key())
+	}
+
+	err = r.register(conf)
+	if err != nil {
+		return perrors.WithMessagef(err, "register(conf:%+v)", conf)
+	}
+
+	r.cltLock.Lock()
+	r.services[conf.Key()] = conf
+	r.cltLock.Unlock()
+	logger.Debugf("(%sRegistry)Register(conf{%#v})", common.DubboRole[role], conf)
+
+	return nil
+}
+
+// service is for getting service path stored in url
+func (r *BaseRegistry) service(c common.URL) string {
+	return url.QueryEscape(c.Service())
+}
+
+// RestartCallBack for reregister when reconnect
+func (r *BaseRegistry) RestartCallBack() bool {
+
+	// copy r.services
+	services := []common.URL{}
+	for _, confIf := range r.services {
+		services = append(services, confIf)
+	}
+
+	flag := true
+	for _, confIf := range services {
+		err := r.register(confIf)
+		if err != nil {
+			logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
+				confIf, perrors.WithStack(err))
+			flag = false
+			break
+		}
+		logger.Infof("success to re-register service :%v", confIf.Key())
+	}
+	r.facadeBasedRegistry.InitListeners()
+
+	return flag
+}
+
+// register for register url to registry, include init params
+func (r *BaseRegistry) register(c common.URL) error {
+	var (
+		err error
+		//revision   string
+		params     url.Values
+		rawURL     string
+		encodedURL string
+		dubboPath  string
+		//conf       config.URL
+	)
+	params = url.Values{}
+
+	c.RangeParams(func(key, value string) bool {
+		params.Add(key, value)
+		return true
+	})
+
+	params.Add("pid", processID)
+	params.Add("ip", localIP)
+	//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
+
+	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
+	switch role {
+
+	case common.PROVIDER:
+		dubboPath, rawURL, err = r.providerRegistry(c, params)
+	case common.CONSUMER:
+		dubboPath, rawURL, err = r.consumerRegistry(c, params)
+	default:
+		return perrors.Errorf("@c{%v} type is not referencer or provider", c)
+	}
+	encodedURL = url.QueryEscape(rawURL)
+	dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
+	err = r.facadeBasedRegistry.DoRegister(dubboPath, encodedURL)
+
+	if err != nil {
+		return perrors.WithMessagef(err, "register Node(path:%s, url:%s)", dubboPath, rawURL)
+	}
+	return nil
+}
+
+// providerRegistry for provider role do
+func (r *BaseRegistry) providerRegistry(c common.URL, params url.Values) (string, string, error) {
+	var (
+		dubboPath string
+		rawURL    string
+		err       error
+	)
+	if c.Path == "" || len(c.Methods) == 0 {
+		return "", "", perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
+	}
+	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
+	r.cltLock.Lock()
+	err = r.facadeBasedRegistry.CreatePath(dubboPath)
+	r.cltLock.Unlock()
+	if err != nil {
+		logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
+		return "", "", perrors.WithMessagef(err, "facadeBasedRegistry.CreatePath(path:%s)", dubboPath)
+	}
+	params.Add("anyhost", "true")
+
+	// Dubbo java consumer to start looking for the provider url,because the category does not match,
+	// the provider will not find, causing the consumer can not start, so we use consumers.
+	// DubboRole               = [...]string{"consumer", "", "", "provider"}
+	// params.Add("category", (RoleType(PROVIDER)).Role())
+	params.Add("category", (common.RoleType(common.PROVIDER)).String())
+	params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
+
+	params.Add("side", (common.RoleType(common.PROVIDER)).Role())
+
+	if len(c.Methods) == 0 {
+		params.Add("methods", strings.Join(c.Methods, ","))
+	}
+	logger.Debugf("provider url params:%#v", params)
+	var host string
+	if c.Ip == "" {
+		host = localIP + ":" + c.Port
+	} else {
+		host = c.Ip + ":" + c.Port
+	}
+
+	rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode())
+	// Print your own registration service providers.
+	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String())
+	logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
+	return dubboPath, rawURL, nil
+}
+
+// consumerRegistry for consumer role do
+func (r *BaseRegistry) consumerRegistry(c common.URL, params url.Values) (string, string, error) {
+	var (
+		dubboPath string
+		rawURL    string
+		err       error
+	)
+	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER])
+	r.cltLock.Lock()
+	err = r.facadeBasedRegistry.CreatePath(dubboPath)
+	r.cltLock.Unlock()
+	if err != nil {
+		logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
+		return "", "", perrors.WithStack(err)
+	}
+	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
+	r.cltLock.Lock()
+	err = r.facadeBasedRegistry.CreatePath(dubboPath)
+	r.cltLock.Unlock()
+	if err != nil {
+		logger.Errorf("facadeBasedRegistry.CreatePath(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
+		return "", "", perrors.WithStack(err)
+	}
+
+	params.Add("protocol", c.Protocol)
+	params.Add("category", (common.RoleType(common.CONSUMER)).String())
+	params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
+
+	rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
+	dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String())
+
+	logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
+	return dubboPath, rawURL, nil
+}
+
+// sleepWait...
+func sleepWait(n int) {
+	wait := time.Duration((n + 1) * 2e8)
+	if wait > MaxWaitInterval {
+		wait = MaxWaitInterval
+	}
+	time.Sleep(wait)
+}
+
+// Subscribe :subscribe from registry, event will notify by notifyListener
+func (r *BaseRegistry) Subscribe(url *common.URL, notifyListener NotifyListener) {
+	n := 0
+	for {
+		n++
+		if !r.IsAvailable() {
+			logger.Warnf("event listener game over.")
+			return
+		}
+
+		listener, err := r.facadeBasedRegistry.DoSubscribe(url)
+		if err != nil {
+			if !r.IsAvailable() {
+				logger.Warnf("event listener game over.")
+				return
+			}
+			logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
+			time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
+			continue
+		}
+
+		for {
+			if serviceEvent, err := listener.Next(); err != nil {
+				logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
+				listener.Close()
+				break
+			} else {
+				logger.Infof("update begin, service event: %v", serviceEvent.String())
+				notifyListener.Notify(serviceEvent)
+			}
+
+		}
+		sleepWait(n)
+	}
+}
+
+// closeRegisters close and remove registry client and reset services map
+func (r *BaseRegistry) closeRegisters() {
+	r.cltLock.Lock()
+	defer r.cltLock.Unlock()
+	logger.Infof("begin to close provider client")
+	// Close and remove(set to nil) the registry client
+	r.facadeBasedRegistry.CloseAndNilClient()
+	// reset the services map
+	r.services = nil
+}
+
+// IsAvailable judge to is registry not closed by chan r.done
+func (r *BaseRegistry) IsAvailable() bool {
+	select {
+	case <-r.done:
+		return false
+	default:
+		return true
+	}
+}
+
+// WaitGroup open for outside add the waitgroup to add some logic before registry destroyed over(graceful down)
+func (r *BaseRegistry) WaitGroup() *sync.WaitGroup {
+	return &r.wg
+}
+
+// Done open for outside to listen the event of registry Destroy() called.
+func (r *BaseRegistry) Done() chan struct{} {
+	return r.done
+}
diff --git a/registry/etcdv3/listener.go b/registry/etcdv3/listener.go
index 31d62fa..5ed56f6 100644
--- a/registry/etcdv3/listener.go
+++ b/registry/etcdv3/listener.go
@@ -39,6 +39,7 @@ type dataListener struct {
 	listener      config_center.ConfigurationListener
 }
 
+// NewRegistryDataListener ...
 func NewRegistryDataListener(listener config_center.ConfigurationListener) *dataListener {
 	return &dataListener{listener: listener, interestedURL: []*common.URL{}}
 }
@@ -77,9 +78,10 @@ type configurationListener struct {
 	events   chan *config_center.ConfigChangeEvent
 }
 
+// NewConfigurationListener for listening the event of etcdv3.
 func NewConfigurationListener(reg *etcdV3Registry) *configurationListener {
 	// add a new waiter
-	reg.wg.Add(1)
+	reg.WaitGroup().Add(1)
 	return &configurationListener{registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32)}
 }
 func (l *configurationListener) Process(configType *config_center.ConfigChangeEvent) {
@@ -89,7 +91,7 @@ func (l *configurationListener) Process(configType *config_center.ConfigChangeEv
 func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
 	for {
 		select {
-		case <-l.registry.done:
+		case <-l.registry.Done():
 			logger.Warnf("listener's etcd client connection is broken, so etcd event listener exit now.")
 			return nil, perrors.New("listener stopped")
 
@@ -97,7 +99,7 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
 			logger.Infof("got etcd event %#v", e)
 			if e.ConfigType == remoting.EventTypeDel {
 				select {
-				case <-l.registry.done:
+				case <-l.registry.Done():
 					logger.Warnf("update @result{%s}. But its connection to registry is invalid", e.Value)
 				default:
 				}
@@ -108,5 +110,5 @@ func (l *configurationListener) Next() (*registry.ServiceEvent, error) {
 	}
 }
 func (l *configurationListener) Close() {
-	l.registry.wg.Done()
+	l.registry.WaitGroup().Done()
 }
diff --git a/registry/etcdv3/registry.go b/registry/etcdv3/registry.go
index b058113..e1c2576 100644
--- a/registry/etcdv3/registry.go
+++ b/registry/etcdv3/registry.go
@@ -19,17 +19,13 @@ package etcdv3
 
 import (
 	"fmt"
-	"net/url"
-	"os"
 	"path"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
 )
 
 import (
-	gxnet "github.com/dubbogo/gost/net"
 	perrors "github.com/pkg/errors"
 )
 
@@ -42,74 +38,39 @@ import (
 	"github.com/apache/dubbo-go/remoting/etcdv3"
 )
 
-var (
-	processID = ""
-	localIP   = ""
-)
-
 const (
-	Name              = "etcdv3"
-	RegistryConnDelay = 3
+	// Name module name
+	Name = "etcdv3"
 )
 
 func init() {
-	processID = fmt.Sprintf("%d", os.Getpid())
-	localIP, _ = gxnet.GetLocalIP()
 	extension.SetRegistry(Name, newETCDV3Registry)
 }
 
 type etcdV3Registry struct {
-	*common.URL
-	birth int64 // time of file birth, seconds since Epoch; 0 if unknown
-
-	cltLock  sync.Mutex
-	client   *etcdv3.Client
-	services map[string]common.URL // service name + protocol -> service config
-
+	registry.BaseRegistry
+	cltLock        sync.Mutex
+	client         *etcdv3.Client
 	listenerLock   sync.Mutex
 	listener       *etcdv3.EventListener
 	dataListener   *dataListener
 	configListener *configurationListener
-
-	wg   sync.WaitGroup // wg+done for etcd client restart
-	done chan struct{}
 }
 
+// Client get the etcdv3 client
 func (r *etcdV3Registry) Client() *etcdv3.Client {
 	return r.client
 }
+
+//SetClient set the etcdv3 client
 func (r *etcdV3Registry) SetClient(client *etcdv3.Client) {
 	r.client = client
 }
+
+//
 func (r *etcdV3Registry) ClientLock() *sync.Mutex {
 	return &r.cltLock
 }
-func (r *etcdV3Registry) WaitGroup() *sync.WaitGroup {
-	return &r.wg
-}
-func (r *etcdV3Registry) GetDone() chan struct{} {
-	return r.done
-}
-func (r *etcdV3Registry) RestartCallBack() bool {
-
-	services := []common.URL{}
-	for _, confIf := range r.services {
-		services = append(services, confIf)
-	}
-
-	flag := true
-	for _, confIf := range services {
-		err := r.Register(confIf)
-		if err != nil {
-			logger.Errorf("(etcdV3ProviderRegistry)register(conf{%#v}) = error{%#v}",
-				confIf, perrors.WithStack(err))
-			flag = false
-			break
-		}
-		logger.Infof("success to re-register service :%v", confIf.Key())
-	}
-	return flag
-}
 
 func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
 
@@ -122,12 +83,9 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
 
 	logger.Infof("etcd address is: %v, timeout is: %s", url.Location, timeout.String())
 
-	r := &etcdV3Registry{
-		URL:      url,
-		birth:    time.Now().UnixNano(),
-		done:     make(chan struct{}),
-		services: make(map[string]common.URL),
-	}
+	r := &etcdV3Registry{}
+
+	r.InitBaseRegistry(url, r)
 
 	if err := etcdv3.ValidateClient(
 		r,
@@ -137,89 +95,37 @@ func newETCDV3Registry(url *common.URL) (registry.Registry, error) {
 	); err != nil {
 		return nil, err
 	}
+	r.WaitGroup().Add(1) //etcdv3 client start successful, then wg +1
 
-	r.wg.Add(1)
 	go etcdv3.HandleClientRestart(r)
 
-	r.listener = etcdv3.NewEventListener(r.client)
-	r.configListener = NewConfigurationListener(r)
-	r.dataListener = NewRegistryDataListener(r.configListener)
+	r.InitListeners()
 
 	return r, nil
 }
 
-func (r *etcdV3Registry) GetUrl() common.URL {
-	return *r.URL
-}
-
-func (r *etcdV3Registry) IsAvailable() bool {
-
-	select {
-	case <-r.done:
-		return false
-	default:
-		return true
-	}
+func (r *etcdV3Registry) InitListeners() {
+	r.listener = etcdv3.NewEventListener(r.client)
+	r.configListener = NewConfigurationListener(r)
+	r.dataListener = NewRegistryDataListener(r.configListener)
 }
 
-func (r *etcdV3Registry) Destroy() {
-
-	if r.configListener != nil {
-		r.configListener.Close()
-	}
-	r.stop()
+func (r *etcdV3Registry) DoRegister(root string, node string) error {
+	return r.client.Create(path.Join(root, node), "")
 }
 
-func (r *etcdV3Registry) stop() {
-
-	close(r.done)
-
-	// close current client
+func (r *etcdV3Registry) CloseAndNilClient() {
 	r.client.Close()
-
-	r.cltLock.Lock()
 	r.client = nil
-	r.services = nil
-	r.cltLock.Unlock()
 }
 
-func (r *etcdV3Registry) Register(svc common.URL) error {
-
-	role, err := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
-	if err != nil {
-		return perrors.WithMessage(err, "get registry role")
-	}
-
-	r.cltLock.Lock()
-	if _, ok := r.services[svc.Key()]; ok {
-		r.cltLock.Unlock()
-		return perrors.New(fmt.Sprintf("Path{%s} has been registered", svc.Path))
-	}
-	r.cltLock.Unlock()
-
-	switch role {
-	case common.PROVIDER:
-		logger.Debugf("(provider register )Register(conf{%#v})", svc)
-		if err := r.registerProvider(svc); err != nil {
-			return perrors.WithMessage(err, "register provider")
-		}
-	case common.CONSUMER:
-		logger.Debugf("(consumer register )Register(conf{%#v})", svc)
-		if err := r.registerConsumer(svc); err != nil {
-			return perrors.WithMessage(err, "register consumer")
-		}
-	default:
-		return perrors.New(fmt.Sprintf("unknown role %d", role))
+func (r *etcdV3Registry) CloseListener() {
+	if r.configListener != nil {
+		r.configListener.Close()
 	}
-
-	r.cltLock.Lock()
-	r.services[svc.Key()] = svc
-	r.cltLock.Unlock()
-	return nil
 }
 
-func (r *etcdV3Registry) createDirIfNotExist(k string) error {
-
+func (r *etcdV3Registry) CreatePath(k string) error {
 	var tmpPath string
 	for _, str := range strings.Split(k, "/")[1:] {
 		tmpPath = path.Join(tmpPath, "/", str)
@@ -231,89 +137,7 @@ func (r *etcdV3Registry) createDirIfNotExist(k string) error {
 	return nil
 }
 
-func (r *etcdV3Registry) registerConsumer(svc common.URL) error {
-
-	consumersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.CONSUMER])
-	if err := r.createDirIfNotExist(consumersNode); err != nil {
-		logger.Errorf("etcd client create path %s: %v", consumersNode, err)
-		return perrors.WithMessage(err, "etcd create consumer nodes")
-	}
-	providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
-	if err := r.createDirIfNotExist(providersNode); err != nil {
-		return perrors.WithMessage(err, "create provider node")
-	}
-
-	params := url.Values{}
-
-	params.Add("protocol", svc.Protocol)
-
-	params.Add("category", (common.RoleType(common.CONSUMER)).String())
-	params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
-
-	encodedURL := url.QueryEscape(fmt.Sprintf("consumer://%s%s?%s", localIP, svc.Path, params.Encode()))
-	dubboPath := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.CONSUMER)).String())
-	if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
-		return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
-	}
-
-	return nil
-}
-
-func (r *etcdV3Registry) registerProvider(svc common.URL) error {
-
-	if len(svc.Path) == 0 || len(svc.Methods) == 0 {
-		return perrors.New(fmt.Sprintf("service path %s or service method %s", svc.Path, svc.Methods))
-	}
-
-	var (
-		urlPath    string
-		encodedURL string
-		dubboPath  string
-	)
-
-	providersNode := fmt.Sprintf("/dubbo/%s/%s", svc.Service(), common.DubboNodes[common.PROVIDER])
-	if err := r.createDirIfNotExist(providersNode); err != nil {
-		return perrors.WithMessage(err, "create provider node")
-	}
-
-	params := url.Values{}
-
-	svc.RangeParams(func(key, value string) bool {
-		params[key] = []string{value}
-		return true
-	})
-	params.Add("pid", processID)
-	params.Add("ip", localIP)
-	params.Add("anyhost", "true")
-	params.Add("category", (common.RoleType(common.PROVIDER)).String())
-	params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
-	params.Add("side", (common.RoleType(common.PROVIDER)).Role())
-
-	if len(svc.Methods) == 0 {
-		params.Add("methods", strings.Join(svc.Methods, ","))
-	}
-
-	logger.Debugf("provider url params:%#v", params)
-	var host string
-	if len(svc.Ip) == 0 {
-		host = localIP + ":" + svc.Port
-	} else {
-		host = svc.Ip + ":" + svc.Port
-	}
-
-	urlPath = svc.Path
-
-	encodedURL = url.QueryEscape(fmt.Sprintf("%s://%s%s?%s", svc.Protocol, host, urlPath, params.Encode()))
-	dubboPath = fmt.Sprintf("/dubbo/%s/%s", svc.Service(), (common.RoleType(common.PROVIDER)).String())
-
-	if err := r.client.Create(path.Join(dubboPath, encodedURL), ""); err != nil {
-		return perrors.WithMessagef(err, "create k/v in etcd (path:%s, url:%s)", dubboPath, encodedURL)
-	}
-
-	return nil
-}
-
-func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) {
+func (r *etcdV3Registry) DoSubscribe(svc *common.URL) (registry.Listener, error) {
 
 	var (
 		configListener *configurationListener
@@ -346,37 +170,3 @@ func (r *etcdV3Registry) subscribe(svc *common.URL) (registry.Listener, error) {
 
 	return configListener, nil
 }
-
-//subscribe from registry
-func (r *etcdV3Registry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
-	for {
-		if !r.IsAvailable() {
-			logger.Warnf("event listener game over.")
-			return
-		}
-
-		listener, err := r.subscribe(url)
-		if err != nil {
-			if !r.IsAvailable() {
-				logger.Warnf("event listener game over.")
-				return
-			}
-			logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
-			time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
-			continue
-		}
-
-		for {
-			if serviceEvent, err := listener.Next(); err != nil {
-				logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
-				listener.Close()
-				return
-			} else {
-				logger.Infof("update begin, service event: %v", serviceEvent.String())
-				notifyListener.Notify(serviceEvent)
-			}
-
-		}
-
-	}
-}
diff --git a/registry/etcdv3/registry_test.go b/registry/etcdv3/registry_test.go
index 6c05a8a..87cf240 100644
--- a/registry/etcdv3/registry_test.go
+++ b/registry/etcdv3/registry_test.go
@@ -46,7 +46,8 @@ func initRegistry(t *testing.T) *etcdV3Registry {
 	}
 
 	out := reg.(*etcdV3Registry)
-	out.client.CleanKV()
+	err = out.client.CleanKV()
+	assert.NoError(t, err)
 	return out
 }
 
@@ -58,6 +59,7 @@ func (suite *RegistryTestSuite) TestRegister() {
 
 	reg := initRegistry(t)
 	err := reg.Register(url)
+	assert.NoError(t, err)
 	children, _, err := reg.client.GetChildrenKVList("/dubbo/com.ikurento.user.UserProvider/providers")
 	if err != nil {
 		t.Fatal(err)
@@ -83,8 +85,9 @@ func (suite *RegistryTestSuite) TestSubscribe() {
 	regurl.SetParam(constant.ROLE_KEY, strconv.Itoa(common.CONSUMER))
 	reg2 := initRegistry(t)
 
-	reg2.Register(url)
-	listener, err := reg2.subscribe(&url)
+	err = reg2.Register(url)
+	assert.NoError(t, err)
+	listener, err := reg2.DoSubscribe(&url)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -102,7 +105,7 @@ func (suite *RegistryTestSuite) TestConsumerDestory() {
 	url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
 
 	reg := initRegistry(t)
-	_, err := reg.subscribe(&url)
+	_, err := reg.DoSubscribe(&url)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -120,7 +123,8 @@ func (suite *RegistryTestSuite) TestProviderDestory() {
 	t := suite.T()
 	reg := initRegistry(t)
 	url, _ := common.NewURL(context.Background(), "dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider", common.WithParamsValue(constant.CLUSTER_KEY, "mock"), common.WithMethods([]string{"GetUser", "AddUser"}))
-	reg.Register(url)
+	err := reg.Register(url)
+	assert.NoError(t, err)
 
 	//listener.Close()
 	time.Sleep(1e9)
diff --git a/registry/registry.go b/registry/registry.go
index c7279a2..d673864 100644
--- a/registry/registry.go
+++ b/registry/registry.go
@@ -21,7 +21,13 @@ import (
 	"github.com/apache/dubbo-go/common"
 )
 
-// Extension - Registry
+/*
+ * -----------------------------------NOTICE---------------------------------------------
+ * If there is no special case, you'd better inherit BaseRegistry and implement the
+ * FacadeBasedRegistry interface instead of directly implementing the Registry interface.
+ * --------------------------------------------------------------------------------------
+ */
+// Registry Extension - Registry
 type Registry interface {
 	common.Node
 	//used for service provider calling , register services to registry
@@ -38,11 +44,13 @@ type Registry interface {
 	//mode2 : callback mode, subscribe with notify(notify listener).
 	Subscribe(*common.URL, NotifyListener)
 }
+
+// NotifyListener ...
 type NotifyListener interface {
 	Notify(*ServiceEvent)
 }
 
-//Deprecated!
+// Listener Deprecated!
 type Listener interface {
 	Next() (*ServiceEvent, error)
 	Close()
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 53a5926..e895243 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -36,18 +36,23 @@ import (
 	zk "github.com/apache/dubbo-go/remoting/zookeeper"
 )
 
+// RegistryDataListener ...
 type RegistryDataListener struct {
 	interestedURL []*common.URL
 	listener      config_center.ConfigurationListener
 }
 
+// NewRegistryDataListener ...
 func NewRegistryDataListener(listener config_center.ConfigurationListener) *RegistryDataListener {
 	return &RegistryDataListener{listener: listener, interestedURL: []*common.URL{}}
 }
+
+// AddInterestedURL ...
 func (l *RegistryDataListener) AddInterestedURL(url *common.URL) {
 	l.interestedURL = append(l.interestedURL, url)
 }
 
+// DataChange ...
 func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
 	// Intercept the last bit
 	index := strings.Index(eventType.Path, "/providers/")
@@ -71,6 +76,7 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
 	return false
 }
 
+// RegistryConfigurationListener ...
 type RegistryConfigurationListener struct {
 	client    *zk.ZookeeperClient
 	registry  *zkRegistry
@@ -79,14 +85,18 @@ type RegistryConfigurationListener struct {
 	closeOnce sync.Once
 }
 
+// NewRegistryConfigurationListener for listening the event of zk.
 func NewRegistryConfigurationListener(client *zk.ZookeeperClient, reg *zkRegistry) *RegistryConfigurationListener {
-	reg.wg.Add(1)
+	reg.WaitGroup().Add(1)
 	return &RegistryConfigurationListener{client: client, registry: reg, events: make(chan *config_center.ConfigChangeEvent, 32), isClosed: false}
 }
+
+// Process ...
 func (l *RegistryConfigurationListener) Process(configType *config_center.ConfigChangeEvent) {
 	l.events <- configType
 }
 
+// Next ...
 func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
 	for {
 		select {
@@ -94,7 +104,7 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
 			logger.Warnf("listener's zk client connection is broken, so zk event listener exit now.")
 			return nil, perrors.New("listener stopped")
 
-		case <-l.registry.done:
+		case <-l.registry.Done():
 			logger.Warnf("zk consumer register has quit, so zk event listener exit now.")
 			return nil, perrors.New("listener stopped")
 
@@ -111,11 +121,13 @@ func (l *RegistryConfigurationListener) Next() (*registry.ServiceEvent, error) {
 		}
 	}
 }
+
+// Close ...
 func (l *RegistryConfigurationListener) Close() {
 	// ensure that the listener will be closed at most once.
 	l.closeOnce.Do(func() {
 		l.isClosed = true
-		l.registry.wg.Done()
+		l.registry.WaitGroup().Done()
 	})
 }
 
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index 24c4158..f4e53dc 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -18,20 +18,16 @@
 package zookeeper
 
 import (
-	"context"
 	"fmt"
 	"net/url"
-	"os"
-	"strconv"
 	"strings"
 	"sync"
 	"time"
 )
 
 import (
-	gxnet "github.com/dubbogo/gost/net"
+	"github.com/dubbogo/go-zookeeper/zk"
 	perrors "github.com/pkg/errors"
-	"github.com/samuel/go-zookeeper/zk"
 )
 
 import (
@@ -44,20 +40,11 @@ import (
 )
 
 const (
-	RegistryZkClient  = "zk registry"
-	RegistryConnDelay = 3
-	MaxWaitInterval   = time.Duration(3e9)
-)
-
-var (
-	processID = ""
-	localIP   = ""
+	// RegistryZkClient zk client name
+	RegistryZkClient = "zk registry"
 )
 
 func init() {
-	processID = fmt.Sprintf("%d", os.Getpid())
-	localIP, _ = gxnet.GetLocalIP()
-	//plugins.PluggableRegistries["zookeeper"] = newZkRegistry
 	extension.SetRegistry("zookeeper", newZkRegistry)
 }
 
@@ -66,20 +53,13 @@ func init() {
 /////////////////////////////////////
 
 type zkRegistry struct {
-	context context.Context
-	*common.URL
-	birth int64          // time of file birth, seconds since Epoch; 0 if unknown
-	wg    sync.WaitGroup // wg+done for zk restart
-	done  chan struct{}
-
-	cltLock  sync.Mutex
-	client   *zookeeper.ZookeeperClient
-	services map[string]common.URL // service name + protocol -> service config
-
+	registry.BaseRegistry
+	client         *zookeeper.ZookeeperClient
 	listenerLock   sync.Mutex
 	listener       *zookeeper.ZkEventListener
 	dataListener   *RegistryDataListener
 	configListener *RegistryConfigurationListener
+	cltLock        sync.Mutex
 	//for provider
 	zkPath map[string]int // key = protocol://ip:port/interface
 }
@@ -89,21 +69,17 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
 		err error
 		r   *zkRegistry
 	)
-
 	r = &zkRegistry{
-		URL:      url,
-		birth:    time.Now().UnixNano(),
-		done:     make(chan struct{}),
-		services: make(map[string]common.URL),
-		zkPath:   make(map[string]int),
+		zkPath: make(map[string]int),
 	}
+	r.InitBaseRegistry(url, r)
 
 	err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
 	if err != nil {
 		return nil, err
 	}
+	r.WaitGroup().Add(1) //zk client start successful, then wg +1
 
-	r.wg.Add(1)
 	go zookeeper.HandleClientRestart(r)
 
 	r.listener = zookeeper.NewZkEventListener(r.client)
@@ -113,10 +89,12 @@ func newZkRegistry(url *common.URL) (registry.Registry, error) {
 	return r, nil
 }
 
+// Options ...
 type Options struct {
 	client *zookeeper.ZookeeperClient
 }
 
+// Option ...
 type Option func(*Options)
 
 func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestCluster, *zkRegistry, error) {
@@ -128,27 +106,41 @@ func newMockZkRegistry(url *common.URL, opts ...zookeeper.Option) (*zk.TestClust
 	)
 
 	r = &zkRegistry{
-		URL:      url,
-		birth:    time.Now().UnixNano(),
-		done:     make(chan struct{}),
-		services: make(map[string]common.URL),
-		zkPath:   make(map[string]int),
+		zkPath: make(map[string]int),
 	}
-
+	r.InitBaseRegistry(url, r)
 	c, r.client, _, err = zookeeper.NewMockZookeeperClient("test", 15*time.Second, opts...)
 	if err != nil {
 		return nil, nil, err
 	}
-	r.wg.Add(1)
+	r.WaitGroup().Add(1) //zk client start successful, then wg +1
 	go zookeeper.HandleClientRestart(r)
+	r.InitListeners()
+	return c, r, nil
+}
 
+func (r *zkRegistry) InitListeners() {
 	r.listener = zookeeper.NewZkEventListener(r.client)
 	r.configListener = NewRegistryConfigurationListener(r.client, r)
 	r.dataListener = NewRegistryDataListener(r.configListener)
+}
 
-	return c, r, nil
+func (r *zkRegistry) CreatePath(path string) error {
+	return r.ZkClient().Create(path)
+}
+
+func (r *zkRegistry) DoRegister(root string, node string) error {
+	return r.registerTempZookeeperNode(root, node)
+}
+
+func (r *zkRegistry) DoSubscribe(conf *common.URL) (registry.Listener, error) {
+	return r.getListener(conf)
 }
 
+func (r *zkRegistry) CloseAndNilClient() {
+	r.client.Close()
+	r.client = nil
+}
 func (r *zkRegistry) ZkClient() *zookeeper.ZookeeperClient {
 	return r.client
 }
@@ -161,222 +153,10 @@ func (r *zkRegistry) ZkClientLock() *sync.Mutex {
 	return &r.cltLock
 }
 
-func (r *zkRegistry) WaitGroup() *sync.WaitGroup {
-	return &r.wg
-}
-
-func (r *zkRegistry) GetDone() chan struct{} {
-	return r.done
-}
-
-func (r *zkRegistry) GetUrl() common.URL {
-	return *r.URL
-}
-
-func (r *zkRegistry) Destroy() {
+func (r *zkRegistry) CloseListener() {
 	if r.configListener != nil {
 		r.configListener.Close()
 	}
-	close(r.done)
-	r.wg.Wait()
-	r.closeRegisters()
-}
-
-func (r *zkRegistry) RestartCallBack() bool {
-
-	// copy r.services
-	services := []common.URL{}
-	for _, confIf := range r.services {
-		services = append(services, confIf)
-	}
-
-	flag := true
-	for _, confIf := range services {
-		err := r.register(confIf)
-		if err != nil {
-			logger.Errorf("(ZkProviderRegistry)register(conf{%#v}) = error{%#v}",
-				confIf, perrors.WithStack(err))
-			flag = false
-			break
-		}
-		logger.Infof("success to re-register service :%v", confIf.Key())
-	}
-	r.listener = zookeeper.NewZkEventListener(r.client)
-	r.configListener = NewRegistryConfigurationListener(r.client, r)
-	r.dataListener = NewRegistryDataListener(r.configListener)
-
-	return flag
-}
-
-func (r *zkRegistry) Register(conf common.URL) error {
-	var (
-		ok  bool
-		err error
-	)
-	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
-	switch role {
-	case common.CONSUMER:
-		r.cltLock.Lock()
-		_, ok = r.services[conf.Key()]
-		r.cltLock.Unlock()
-		if ok {
-			return perrors.Errorf("Path{%s} has been registered", conf.Path)
-		}
-
-		err = r.register(conf)
-		if err != nil {
-			return perrors.WithStack(err)
-		}
-
-		r.cltLock.Lock()
-		r.services[conf.Key()] = conf
-		r.cltLock.Unlock()
-		logger.Debugf("(consumerZkConsumerRegistry)Register(conf{%#v})", conf)
-
-	case common.PROVIDER:
-
-		// Check if the service has been registered
-		r.cltLock.Lock()
-		// Note the difference between consumer and consumerZookeeperRegistry (consumer use conf.Path).
-		// Because the consumer wants to provide monitoring functions for the selector,
-		// the provider allows multiple groups or versions of the same service to be registered.
-		_, ok = r.services[conf.Key()]
-		r.cltLock.Unlock()
-		if ok {
-			return perrors.Errorf("Path{%s} has been registered", conf.Key())
-		}
-
-		err = r.register(conf)
-		if err != nil {
-			return perrors.WithMessagef(err, "register(conf:%+v)", conf)
-		}
-
-		r.cltLock.Lock()
-		r.services[conf.Key()] = conf
-		r.cltLock.Unlock()
-
-		logger.Debugf("(ZkProviderRegistry)Register(conf{%#v})", conf)
-	}
-
-	return nil
-}
-
-func (r *zkRegistry) service(c common.URL) string {
-	return url.QueryEscape(c.Service())
-}
-
-func (r *zkRegistry) register(c common.URL) error {
-	var (
-		err error
-		//revision   string
-		params     url.Values
-		rawURL     string
-		encodedURL string
-		dubboPath  string
-		//conf       config.URL
-	)
-
-	err = zookeeper.ValidateZookeeperClient(r, zookeeper.WithZkName(RegistryZkClient))
-	if err != nil {
-		return perrors.WithStack(err)
-	}
-	params = url.Values{}
-
-	c.RangeParams(func(key, value string) bool {
-		params.Add(key, value)
-		return true
-	})
-
-	params.Add("pid", processID)
-	params.Add("ip", localIP)
-	//params.Add("timeout", fmt.Sprintf("%d", int64(r.Timeout)/1e6))
-
-	role, _ := strconv.Atoi(r.URL.GetParam(constant.ROLE_KEY, ""))
-	switch role {
-
-	case common.PROVIDER:
-
-		if c.Path == "" || len(c.Methods) == 0 {
-			return perrors.Errorf("conf{Path:%s, Methods:%s}", c.Path, c.Methods)
-		}
-		// 先创建服务下面的provider node
-		dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
-		r.cltLock.Lock()
-		err = r.client.Create(dubboPath)
-		r.cltLock.Unlock()
-		if err != nil {
-			logger.Errorf("zkClient.create(path{%s}) = error{%#v}", dubboPath, perrors.WithStack(err))
-			return perrors.WithMessagef(err, "zkclient.Create(path:%s)", dubboPath)
-		}
-		params.Add("anyhost", "true")
-
-		// Dubbo java consumer to start looking for the provider url,because the category does not match,
-		// the provider will not find, causing the consumer can not start, so we use consumers.
-		// DubboRole               = [...]string{"consumer", "", "", "provider"}
-		// params.Add("category", (RoleType(PROVIDER)).Role())
-		params.Add("category", (common.RoleType(common.PROVIDER)).String())
-		params.Add("dubbo", "dubbo-provider-golang-"+constant.Version)
-
-		params.Add("side", (common.RoleType(common.PROVIDER)).Role())
-
-		if len(c.Methods) == 0 {
-			params.Add("methods", strings.Join(c.Methods, ","))
-		}
-		logger.Debugf("provider zk url params:%#v", params)
-		var host string
-		if c.Ip == "" {
-			host = localIP + ":" + c.Port
-		} else {
-			host = c.Ip + ":" + c.Port
-		}
-
-		rawURL = fmt.Sprintf("%s://%s%s?%s", c.Protocol, host, c.Path, params.Encode())
-		encodedURL = url.QueryEscape(rawURL)
-
-		// Print your own registration service providers.
-		dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.PROVIDER)).String())
-		logger.Debugf("provider path:%s, url:%s", dubboPath, rawURL)
-
-	case common.CONSUMER:
-		dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.CONSUMER])
-		r.cltLock.Lock()
-		err = r.client.Create(dubboPath)
-		r.cltLock.Unlock()
-		if err != nil {
-			logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
-			return perrors.WithStack(err)
-		}
-		dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), common.DubboNodes[common.PROVIDER])
-		r.cltLock.Lock()
-		err = r.client.Create(dubboPath)
-		r.cltLock.Unlock()
-		if err != nil {
-			logger.Errorf("zkClient.create(path{%s}) = error{%v}", dubboPath, perrors.WithStack(err))
-			return perrors.WithStack(err)
-		}
-
-		params.Add("protocol", c.Protocol)
-
-		params.Add("category", (common.RoleType(common.CONSUMER)).String())
-		params.Add("dubbo", "dubbogo-consumer-"+constant.Version)
-
-		rawURL = fmt.Sprintf("consumer://%s%s?%s", localIP, c.Path, params.Encode())
-		encodedURL = url.QueryEscape(rawURL)
-
-		dubboPath = fmt.Sprintf("/dubbo/%s/%s", r.service(c), (common.RoleType(common.CONSUMER)).String())
-		logger.Debugf("consumer path:%s, url:%s", dubboPath, rawURL)
-
-	default:
-		return perrors.Errorf("@c{%v} type is not referencer or provider", c)
-	}
-
-	dubboPath = strings.ReplaceAll(dubboPath, "$", "%24")
-	err = r.registerTempZookeeperNode(dubboPath, encodedURL)
-
-	if err != nil {
-		return perrors.WithMessagef(err, "registerTempZookeeperNode(path:%s, url:%s)", dubboPath, rawURL)
-	}
-	return nil
 }
 
 func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
@@ -406,53 +186,6 @@ func (r *zkRegistry) registerTempZookeeperNode(root string, node string) error {
 	return nil
 }
 
-func (r *zkRegistry) subscribe(conf *common.URL) (registry.Listener, error) {
-	return r.getListener(conf)
-}
-func sleepWait(n int) {
-	wait := time.Duration((n + 1) * 2e8)
-	if wait > MaxWaitInterval {
-		wait = MaxWaitInterval
-	}
-	time.Sleep(wait)
-}
-
-//subscribe from registry
-func (r *zkRegistry) Subscribe(url *common.URL, notifyListener registry.NotifyListener) {
-	n := 0
-	for {
-		n++
-		if !r.IsAvailable() {
-			logger.Warnf("event listener game over.")
-			return
-		}
-
-		listener, err := r.subscribe(url)
-		if err != nil {
-			if !r.IsAvailable() {
-				logger.Warnf("event listener game over.")
-				return
-			}
-			logger.Warnf("getListener() = err:%v", perrors.WithStack(err))
-			time.Sleep(time.Duration(RegistryConnDelay) * time.Second)
-			continue
-		}
-
-		for {
-			if serviceEvent, err := listener.Next(); err != nil {
-				logger.Warnf("Selector.watch() = error{%v}", perrors.WithStack(err))
-				listener.Close()
-				break
-			} else {
-				logger.Infof("update begin, service event: %v", serviceEvent.String())
-				notifyListener.Notify(serviceEvent)
-			}
-
-		}
-		sleepWait(n)
-	}
-}
-
 func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListener, error) {
 	var (
 		zkListener *RegistryConfigurationListener
@@ -489,22 +222,3 @@ func (r *zkRegistry) getListener(conf *common.URL) (*RegistryConfigurationListen
 
 	return zkListener, nil
 }
-
-func (r *zkRegistry) closeRegisters() {
-	r.cltLock.Lock()
-	defer r.cltLock.Unlock()
-	logger.Infof("begin to close provider zk client")
-	// Close the old client first to close the tmp node.
-	r.client.Close()
-	r.client = nil
-	r.services = nil
-}
-
-func (r *zkRegistry) IsAvailable() bool {
-	select {
-	case <-r.done:
-		return false
-	default:
-		return true
-	}
-}
diff --git a/registry/zookeeper/registry_test.go b/registry/zookeeper/registry_test.go
index 2c7bb90..5e5189c 100644
--- a/registry/zookeeper/registry_test.go
+++ b/registry/zookeeper/registry_test.go
@@ -64,7 +64,7 @@ func Test_Subscribe(t *testing.T) {
 	_, reg2, _ := newMockZkRegistry(&regurl, zookeeper.WithTestCluster(ts))
 
 	reg2.Register(url)
-	listener, _ := reg2.subscribe(&url)
+	listener, _ := reg2.DoSubscribe(&url)
 
 	serviceEvent, _ := listener.Next()
 	assert.NoError(t, err)
@@ -85,7 +85,7 @@ func Test_ConsumerDestory(t *testing.T) {
 	assert.NoError(t, err)
 	err = reg.Register(url)
 	assert.NoError(t, err)
-	_, err = reg.subscribe(&url)
+	_, err = reg.DoSubscribe(&url)
 	assert.NoError(t, err)
 
 	//listener.Close()
diff --git a/remoting/etcdv3/client.go b/remoting/etcdv3/client.go
index 0509685..ba3ea6e 100644
--- a/remoting/etcdv3/client.go
+++ b/remoting/etcdv3/client.go
@@ -36,16 +36,22 @@ import (
 )
 
 const (
-	ConnDelay            = 3
-	MaxFailTimes         = 15
+	// ConnDelay connection dalay
+	ConnDelay = 3
+	// MaxFailTimes max failure times
+	MaxFailTimes = 15
+	// RegistryETCDV3Client client name
 	RegistryETCDV3Client = "etcd registry"
 )
 
 var (
+	// ErrNilETCDV3Client ...
 	ErrNilETCDV3Client = perrors.New("etcd raw client is nil") // full describe the ERR
-	ErrKVPairNotFound  = perrors.New("k/v pair not found")
+	// ErrKVPairNotFound ...
+	ErrKVPairNotFound = perrors.New("k/v pair not found")
 )
 
+// Options ...
 type Options struct {
 	name      string
 	endpoints []string
@@ -54,30 +60,38 @@ type Options struct {
 	heartbeat int // heartbeat second
 }
 
+// Option ...
 type Option func(*Options)
 
+// WithEndpoints ...
 func WithEndpoints(endpoints ...string) Option {
 	return func(opt *Options) {
 		opt.endpoints = endpoints
 	}
 }
+
+// WithName ...
 func WithName(name string) Option {
 	return func(opt *Options) {
 		opt.name = name
 	}
 }
+
+// WithTimeout ...
 func WithTimeout(timeout time.Duration) Option {
 	return func(opt *Options) {
 		opt.timeout = timeout
 	}
 }
 
+// WithHeartbeat ...
 func WithHeartbeat(heartbeat int) Option {
 	return func(opt *Options) {
 		opt.heartbeat = heartbeat
 	}
 }
 
+// ValidateClient ...
 func ValidateClient(container clientFacade, opts ...Option) error {
 
 	options := &Options{
@@ -117,6 +131,7 @@ func ValidateClient(container clientFacade, opts ...Option) error {
 	return nil
 }
 
+// Client ...
 type Client struct {
 	lock sync.RWMutex
 
@@ -191,6 +206,7 @@ func (c *Client) stop() bool {
 	return false
 }
 
+// Close ...
 func (c *Client) Close() {
 
 	if c == nil {
@@ -309,6 +325,7 @@ func (c *Client) get(k string) (string, error) {
 	return string(resp.Kvs[0].Value), nil
 }
 
+// CleanKV ...
 func (c *Client) CleanKV() error {
 
 	c.lock.RLock()
@@ -408,10 +425,12 @@ func (c *Client) keepAliveKV(k string, v string) error {
 	return nil
 }
 
+// Done ...
 func (c *Client) Done() <-chan struct{} {
 	return c.exit
 }
 
+// Valid ...
 func (c *Client) Valid() bool {
 	select {
 	case <-c.exit:
@@ -428,6 +447,7 @@ func (c *Client) Valid() bool {
 	return true
 }
 
+// Create ...
 func (c *Client) Create(k string, v string) error {
 
 	err := c.put(k, v)
@@ -437,6 +457,7 @@ func (c *Client) Create(k string, v string) error {
 	return nil
 }
 
+// Delete ...
 func (c *Client) Delete(k string) error {
 
 	err := c.delete(k)
@@ -447,6 +468,7 @@ func (c *Client) Delete(k string) error {
 	return nil
 }
 
+// RegisterTemp ...
 func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
 
 	completeKey := path.Join(basePath, node)
@@ -459,6 +481,7 @@ func (c *Client) RegisterTemp(basePath string, node string) (string, error) {
 	return completeKey, nil
 }
 
+// GetChildrenKVList ...
 func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
 
 	kList, vList, err := c.getChildren(k)
@@ -468,6 +491,7 @@ func (c *Client) GetChildrenKVList(k string) ([]string, []string, error) {
 	return kList, vList, nil
 }
 
+// Get ...
 func (c *Client) Get(k string) (string, error) {
 
 	v, err := c.get(k)
@@ -478,6 +502,7 @@ func (c *Client) Get(k string) (string, error) {
 	return v, nil
 }
 
+// Watch ...
 func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
 
 	wc, err := c.watch(k)
@@ -487,6 +512,7 @@ func (c *Client) Watch(k string) (clientv3.WatchChan, error) {
 	return wc, nil
 }
 
+// WatchWithPrefix ...
 func (c *Client) WatchWithPrefix(prefix string) (clientv3.WatchChan, error) {
 
 	wc, err := c.watchWithPrefix(prefix)
diff --git a/remoting/etcdv3/facade.go b/remoting/etcdv3/facade.go
index 499044b..35befc8 100644
--- a/remoting/etcdv3/facade.go
+++ b/remoting/etcdv3/facade.go
@@ -38,11 +38,12 @@ type clientFacade interface {
 	SetClient(*Client)
 	ClientLock() *sync.Mutex
 	WaitGroup() *sync.WaitGroup //for wait group control, etcd client listener & etcd client container
-	GetDone() chan struct{}     //for etcd client control
+	Done() chan struct{}        //for etcd client control
 	RestartCallBack() bool
 	common.Node
 }
 
+// HandleClientRestart ...
 func HandleClientRestart(r clientFacade) {
 
 	var (
@@ -54,7 +55,7 @@ func HandleClientRestart(r clientFacade) {
 LOOP:
 	for {
 		select {
-		case <-r.GetDone():
+		case <-r.Done():
 			logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDV3 goroutine exit now...")
 			break LOOP
 			// re-register all services
@@ -71,7 +72,7 @@ LOOP:
 			failTimes = 0
 			for {
 				select {
-				case <-r.GetDone():
+				case <-r.Done():
 					logger.Warnf("(ETCDV3ProviderRegistry)reconnectETCDRegistry goroutine exit now...")
 					break LOOP
 				case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // avoid connect frequent
diff --git a/remoting/etcdv3/listener.go b/remoting/etcdv3/listener.go
index a4d5805..a51a68b 100644
--- a/remoting/etcdv3/listener.go
+++ b/remoting/etcdv3/listener.go
@@ -33,6 +33,7 @@ import (
 	"github.com/apache/dubbo-go/remoting"
 )
 
+// EventListener ...
 type EventListener struct {
 	client     *Client
 	keyMapLock sync.Mutex
@@ -40,6 +41,7 @@ type EventListener struct {
 	wg         sync.WaitGroup
 }
 
+// NewEventListener ...
 func NewEventListener(client *Client) *EventListener {
 	return &EventListener{
 		client: client,
@@ -47,7 +49,7 @@ func NewEventListener(client *Client) *EventListener {
 	}
 }
 
-// Listen on a spec key
+// ListenServiceNodeEvent Listen on a spec key
 // this method will return true when spec key deleted,
 // this method will return false when deep layer connection lose
 func (l *EventListener) ListenServiceNodeEvent(key string, listener ...remoting.DataListener) bool {
@@ -134,7 +136,7 @@ func (l *EventListener) handleEvents(event *clientv3.Event, listeners ...remotin
 	panic("unreachable")
 }
 
-// Listen on a set of key with spec prefix
+// ListenServiceNodeEventWithPrefix Listen on a set of key with spec prefix
 func (l *EventListener) ListenServiceNodeEventWithPrefix(prefix string, listener ...remoting.DataListener) {
 
 	l.wg.Add(1)
@@ -180,7 +182,7 @@ func timeSecondDuration(sec int) time.Duration {
 	return time.Duration(sec) * time.Second
 }
 
-// this func is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
+// ListenServiceEvent is invoked by etcdv3 ConsumerRegistry::Registe/ etcdv3 ConsumerRegistry::get/etcdv3 ConsumerRegistry::getListener
 // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
 //                            |
 //                            --------> ListenServiceNodeEvent
@@ -229,6 +231,7 @@ func (l *EventListener) ListenServiceEvent(key string, listener remoting.DataLis
 	}(key)
 }
 
+// Close ...
 func (l *EventListener) Close() {
 	l.wg.Wait()
 }
diff --git a/remoting/listener.go b/remoting/listener.go
index 8d1e357..3713ba0 100644
--- a/remoting/listener.go
+++ b/remoting/listener.go
@@ -21,6 +21,7 @@ import (
 	"fmt"
 )
 
+// DataListener ...
 type DataListener interface {
 	DataChange(eventType Event) bool //bool is return for interface implement is interesting
 }
@@ -29,11 +30,15 @@ type DataListener interface {
 // event type
 //////////////////////////////////////////
 
+// EventType ...
 type EventType int
 
 const (
+	// EventTypeAdd ...
 	EventTypeAdd = iota
+	// EventTypeDel ...
 	EventTypeDel
+	// EventTypeUpdate ...
 	EventTypeUpdate
 )
 
@@ -51,6 +56,7 @@ func (t EventType) String() string {
 // service event
 //////////////////////////////////////////
 
+// Event ...
 type Event struct {
 	Path    string
 	Action  EventType
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 19d6529..f95231b 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -25,8 +25,8 @@ import (
 )
 
 import (
+	"github.com/dubbogo/go-zookeeper/zk"
 	perrors "github.com/pkg/errors"
-	"github.com/samuel/go-zookeeper/zk"
 )
 
 import (
@@ -35,14 +35,19 @@ import (
 )
 
 const (
-	ConnDelay    = 3
+	// ConnDelay connection delay interval
+	ConnDelay = 3
+	// MaxFailTimes max fail times
 	MaxFailTimes = 15
 )
 
 var (
 	errNilZkClientConn = perrors.New("zookeeperclient{conn} is nil")
+	errNilChildren     = perrors.Errorf("has none children")
+	errNilNode         = perrors.Errorf("node does not exist")
 )
 
+// ZookeeperClient ...
 type ZookeeperClient struct {
 	name          string
 	ZkAddrs       []string
@@ -54,6 +59,7 @@ type ZookeeperClient struct {
 	eventRegistry map[string][]*chan struct{}
 }
 
+// StateToString ...
 func StateToString(state zk.State) string {
 	switch state {
 	case zk.StateDisconnected:
@@ -85,6 +91,7 @@ func StateToString(state zk.State) string {
 	return "zookeeper unknown state"
 }
 
+// Options ...
 type Options struct {
 	zkName string
 	client *ZookeeperClient
@@ -92,14 +99,17 @@ type Options struct {
 	ts *zk.TestCluster
 }
 
+// Option ...
 type Option func(*Options)
 
+// WithZkName ...
 func WithZkName(name string) Option {
 	return func(opt *Options) {
 		opt.zkName = name
 	}
 }
 
+// ValidateZookeeperClient ...
 func ValidateZookeeperClient(container zkClientFacade, opts ...Option) error {
 	var (
 		err error
@@ -173,12 +183,14 @@ func newZookeeperClient(name string, zkAddrs []string, timeout time.Duration) (*
 	return z, nil
 }
 
+// WithTestCluster ...
 func WithTestCluster(ts *zk.TestCluster) Option {
 	return func(opt *Options) {
 		opt.ts = ts
 	}
 }
 
+// NewMockZookeeperClient ...
 func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option) (*zk.TestCluster, *ZookeeperClient, <-chan zk.Event, error) {
 	var (
 		err   error
@@ -224,6 +236,7 @@ func NewMockZookeeperClient(name string, timeout time.Duration, opts ...Option)
 	return ts, z, event, nil
 }
 
+// HandleZkEvent ...
 func (z *ZookeeperClient) HandleZkEvent(session <-chan zk.Event) {
 	var (
 		state int
@@ -248,11 +261,13 @@ LOOP:
 				logger.Warnf("zk{addr:%s} state is StateDisconnected, so close the zk client{name:%s}.", z.ZkAddrs, z.name)
 				z.stop()
 				z.Lock()
-				if z.Conn != nil {
-					z.Conn.Close()
-					z.Conn = nil
-				}
+				conn := z.Conn
+				z.Conn = nil
 				z.Unlock()
+				if conn != nil {
+					conn.Close()
+				}
+
 				break LOOP
 			case (int)(zk.EventNodeDataChanged), (int)(zk.EventNodeChildrenChanged):
 				logger.Infof("zkClient{%s} get zk node changed event{path:%s}", z.name, event.Path)
@@ -282,6 +297,7 @@ LOOP:
 	}
 }
 
+// RegisterEvent ...
 func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
 	if zkPath == "" || event == nil {
 		return
@@ -296,6 +312,7 @@ func (z *ZookeeperClient) RegisterEvent(zkPath string, event *chan struct{}) {
 	z.Unlock()
 }
 
+// UnregisterEvent ...
 func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
 	if zkPath == "" {
 		return
@@ -322,6 +339,7 @@ func (z *ZookeeperClient) UnregisterEvent(zkPath string, event *chan struct{}) {
 	}
 }
 
+// Done ...
 func (z *ZookeeperClient) Done() <-chan struct{} {
 	return z.exit
 }
@@ -337,6 +355,7 @@ func (z *ZookeeperClient) stop() bool {
 	return false
 }
 
+// ZkConnValid ...
 func (z *ZookeeperClient) ZkConnValid() bool {
 	select {
 	case <-z.exit:
@@ -354,6 +373,7 @@ func (z *ZookeeperClient) ZkConnValid() bool {
 	return valid
 }
 
+// Close ...
 func (z *ZookeeperClient) Close() {
 	if z == nil {
 		return
@@ -362,14 +382,17 @@ func (z *ZookeeperClient) Close() {
 	z.stop()
 	z.Wait.Wait()
 	z.Lock()
-	if z.Conn != nil {
-		z.Conn.Close()
-		z.Conn = nil
-	}
+	conn := z.Conn
+	z.Conn = nil
 	z.Unlock()
+	if conn != nil {
+		conn.Close()
+	}
+
 	logger.Warnf("zkClient{name:%s, zk addr:%s} exit now.", z.name, z.ZkAddrs)
 }
 
+// Create ...
 func (z *ZookeeperClient) Create(basePath string) error {
 	var (
 		err     error
@@ -381,10 +404,12 @@ func (z *ZookeeperClient) Create(basePath string) error {
 		tmpPath = path.Join(tmpPath, "/", str)
 		err = errNilZkClientConn
 		z.Lock()
-		if z.Conn != nil {
-			_, err = z.Conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
-		}
+		conn := z.Conn
 		z.Unlock()
+		if conn != nil {
+			_, err = conn.Create(tmpPath, []byte(""), 0, zk.WorldACL(zk.PermAll))
+		}
+
 		if err != nil {
 			if err == zk.ErrNodeExists {
 				logger.Infof("zk.create(\"%s\") exists\n", tmpPath)
@@ -398,6 +423,7 @@ func (z *ZookeeperClient) Create(basePath string) error {
 	return nil
 }
 
+// Delete ...
 func (z *ZookeeperClient) Delete(basePath string) error {
 	var (
 		err error
@@ -405,14 +431,16 @@ func (z *ZookeeperClient) Delete(basePath string) error {
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.Conn != nil {
-		err = z.Conn.Delete(basePath, -1)
-	}
+	conn := z.Conn
 	z.Unlock()
+	if conn != nil {
+		err = conn.Delete(basePath, -1)
+	}
 
 	return perrors.WithMessagef(err, "Delete(basePath:%s)", basePath)
 }
 
+// RegisterTemp ...
 func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, error) {
 	var (
 		err     error
@@ -425,10 +453,12 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
 	data = []byte("")
 	zkPath = path.Join(basePath) + "/" + node
 	z.Lock()
-	if z.Conn != nil {
-		tmpPath, err = z.Conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
-	}
+	conn := z.Conn
 	z.Unlock()
+	if conn != nil {
+		tmpPath, err = conn.Create(zkPath, data, zk.FlagEphemeral, zk.WorldACL(zk.PermAll))
+	}
+
 	//if err != nil && err != zk.ErrNodeExists {
 	if err != nil {
 		logger.Warnf("conn.Create(\"%s\", zk.FlagEphemeral) = error(%v)\n", zkPath, perrors.WithStack(err))
@@ -439,6 +469,7 @@ func (z *ZookeeperClient) RegisterTemp(basePath string, node string) (string, er
 	return tmpPath, nil
 }
 
+// RegisterTempSeq ...
 func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string, error) {
 	var (
 		err     error
@@ -447,15 +478,17 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.Conn != nil {
-		tmpPath, err = z.Conn.Create(
+	conn := z.Conn
+	z.Unlock()
+	if conn != nil {
+		tmpPath, err = conn.Create(
 			path.Join(basePath)+"/",
 			data,
 			zk.FlagEphemeral|zk.FlagSequence,
 			zk.WorldACL(zk.PermAll),
 		)
 	}
-	z.Unlock()
+
 	logger.Debugf("zookeeperClient.RegisterTempSeq(basePath{%s}) = tempPath{%s}", basePath, tmpPath)
 	if err != nil && err != zk.ErrNodeExists {
 		logger.Errorf("zkClient{%s} conn.Create(\"%s\", \"%s\", zk.FlagEphemeral|zk.FlagSequence) error(%v)\n",
@@ -467,37 +500,44 @@ func (z *ZookeeperClient) RegisterTempSeq(basePath string, data []byte) (string,
 	return tmpPath, nil
 }
 
+// GetChildrenW ...
 func (z *ZookeeperClient) GetChildrenW(path string) ([]string, <-chan zk.Event, error) {
 	var (
 		err      error
 		children []string
 		stat     *zk.Stat
-		event    <-chan zk.Event
+		watcher  *zk.Watcher
 	)
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.Conn != nil {
-		children, stat, event, err = z.Conn.ChildrenW(path)
-	}
+	conn := z.Conn
 	z.Unlock()
+	if conn != nil {
+		children, stat, watcher, err = conn.ChildrenW(path)
+	}
+
 	if err != nil {
+		if err == zk.ErrNoChildrenForEphemerals {
+			return nil, nil, errNilChildren
+		}
 		if err == zk.ErrNoNode {
-			return nil, nil, perrors.Errorf("path{%s} has none children", path)
+			return nil, nil, errNilNode
 		}
 		logger.Errorf("zk.ChildrenW(path{%s}) = error(%v)", path, err)
 		return nil, nil, perrors.WithMessagef(err, "zk.ChildrenW(path:%s)", path)
 	}
 	if stat == nil {
-		return nil, nil, perrors.Errorf("path{%s} has none children", path)
+		return nil, nil, perrors.Errorf("path{%s} get stat is nil", path)
 	}
 	if len(children) == 0 {
-		return nil, nil, perrors.Errorf("path{%s} has none children", path)
+		return nil, nil, errNilChildren
 	}
 
-	return children, event, nil
+	return children, watcher.EvtCh, nil
 }
 
+// GetChildren ...
 func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
 	var (
 		err      error
@@ -507,10 +547,12 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.Conn != nil {
-		children, stat, err = z.Conn.Children(path)
-	}
+	conn := z.Conn
 	z.Unlock()
+	if conn != nil {
+		children, stat, err = conn.Children(path)
+	}
+
 	if err != nil {
 		if err == zk.ErrNoNode {
 			return nil, perrors.Errorf("path{%s} has none children", path)
@@ -522,25 +564,28 @@ func (z *ZookeeperClient) GetChildren(path string) ([]string, error) {
 		return nil, perrors.Errorf("path{%s} has none children", path)
 	}
 	if len(children) == 0 {
-		return nil, perrors.Errorf("path{%s} has none children", path)
+		return nil, errNilChildren
 	}
 
 	return children, nil
 }
 
+// ExistW ...
 func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
 	var (
-		exist bool
-		err   error
-		event <-chan zk.Event
+		exist   bool
+		err     error
+		watcher *zk.Watcher
 	)
 
 	err = errNilZkClientConn
 	z.Lock()
-	if z.Conn != nil {
-		exist, _, event, err = z.Conn.ExistsW(zkPath)
-	}
+	conn := z.Conn
 	z.Unlock()
+	if conn != nil {
+		exist, _, watcher, err = conn.ExistsW(zkPath)
+	}
+
 	if err != nil {
 		logger.Warnf("zkClient{%s}.ExistsW(path{%s}) = error{%v}.", z.name, zkPath, perrors.WithStack(err))
 		return nil, perrors.WithMessagef(err, "zk.ExistsW(path:%s)", zkPath)
@@ -550,9 +595,10 @@ func (z *ZookeeperClient) ExistW(zkPath string) (<-chan zk.Event, error) {
 		return nil, perrors.Errorf("zkClient{%s} App zk path{%s} does not exist.", z.name, zkPath)
 	}
 
-	return event, nil
+	return watcher.EvtCh, nil
 }
 
+// GetContent ...
 func (z *ZookeeperClient) GetContent(zkPath string) ([]byte, *zk.Stat, error) {
 	return z.Conn.Get(zkPath)
 }
diff --git a/remoting/zookeeper/client_test.go b/remoting/zookeeper/client_test.go
index f1bd0c2..cb41eb3 100644
--- a/remoting/zookeeper/client_test.go
+++ b/remoting/zookeeper/client_test.go
@@ -24,7 +24,7 @@ import (
 )
 
 import (
-	"github.com/samuel/go-zookeeper/zk"
+	"github.com/dubbogo/go-zookeeper/zk"
 	"github.com/stretchr/testify/assert"
 )
 
@@ -133,3 +133,12 @@ func TestRegisterTempSeq(t *testing.T) {
 	states := []zk.State{zk.StateConnecting, zk.StateConnected, zk.StateHasSession}
 	verifyEventStateOrder(t, event, states, "event channel")
 }
+
+func Test_UnregisterEvent(t *testing.T) {
+	client := &ZookeeperClient{}
+	client.eventRegistry = make(map[string][]*chan struct{})
+	array := []*chan struct{}{}
+	array = append(array, new(chan struct{}))
+	client.eventRegistry["test"] = array
+	client.UnregisterEvent("test", new(chan struct{}))
+}
diff --git a/remoting/zookeeper/facade.go b/remoting/zookeeper/facade.go
index cdc7ead..055db4f 100644
--- a/remoting/zookeeper/facade.go
+++ b/remoting/zookeeper/facade.go
@@ -35,11 +35,12 @@ type zkClientFacade interface {
 	SetZkClient(*ZookeeperClient)
 	ZkClientLock() *sync.Mutex
 	WaitGroup() *sync.WaitGroup //for wait group control, zk client listener & zk client container
-	GetDone() chan struct{}     //for zk client control
+	Done() chan struct{}        //for zk client control
 	RestartCallBack() bool
 	common.Node
 }
 
+// HandleClientRestart ...
 func HandleClientRestart(r zkClientFacade) {
 	var (
 		err error
@@ -51,7 +52,7 @@ func HandleClientRestart(r zkClientFacade) {
 LOOP:
 	for {
 		select {
-		case <-r.GetDone():
+		case <-r.Done():
 			logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
 			break LOOP
 			// re-register all services
@@ -67,7 +68,7 @@ LOOP:
 			failTimes = 0
 			for {
 				select {
-				case <-r.GetDone():
+				case <-r.Done():
 					logger.Warnf("(ZkProviderRegistry)reconnectZkRegistry goroutine exit now...")
 					break LOOP
 				case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)): // Prevent crazy reconnection zk.
diff --git a/remoting/zookeeper/facade_test.go b/remoting/zookeeper/facade_test.go
index 58e0d69..175d758 100644
--- a/remoting/zookeeper/facade_test.go
+++ b/remoting/zookeeper/facade_test.go
@@ -24,7 +24,7 @@ import (
 	"time"
 )
 import (
-	"github.com/samuel/go-zookeeper/zk"
+	"github.com/dubbogo/go-zookeeper/zk"
 	"github.com/stretchr/testify/assert"
 )
 import (
@@ -55,7 +55,7 @@ func (r *mockFacade) WaitGroup() *sync.WaitGroup {
 	return &r.wg
 }
 
-func (r *mockFacade) GetDone() chan struct{} {
+func (r *mockFacade) Done() chan struct{} {
 	return r.done
 }
 
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 0b9db5e..4493c06 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -19,15 +19,14 @@ package zookeeper
 
 import (
 	"path"
-	"strings"
 	"sync"
 	"time"
 )
 
 import (
 	"github.com/dubbogo/getty"
+	"github.com/dubbogo/go-zookeeper/zk"
 	perrors "github.com/pkg/errors"
-	"github.com/samuel/go-zookeeper/zk"
 )
 
 import (
@@ -35,6 +34,7 @@ import (
 	"github.com/apache/dubbo-go/remoting"
 )
 
+// ZkEventListener ...
 type ZkEventListener struct {
 	client      *ZookeeperClient
 	pathMapLock sync.Mutex
@@ -42,6 +42,7 @@ type ZkEventListener struct {
 	wg          sync.WaitGroup
 }
 
+// NewZkEventListener ...
 func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
 	return &ZkEventListener{
 		client:  client,
@@ -49,10 +50,12 @@ func NewZkEventListener(client *ZookeeperClient) *ZkEventListener {
 	}
 }
 
+// SetClient ...
 func (l *ZkEventListener) SetClient(client *ZookeeperClient) {
 	l.client = client
 }
 
+// ListenServiceNodeEvent ...
 func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
 	l.wg.Add(1)
 	defer l.wg.Done()
@@ -107,8 +110,17 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 
 	newChildren, err := l.client.GetChildren(zkPath)
 	if err != nil {
-		logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
-		return
+		if err == errNilChildren {
+			content, _, err := l.client.Conn.Get(zkPath)
+			if err != nil {
+				logger.Errorf("Get new node path {%v} 's content error,message is  {%v}", zkPath, perrors.WithStack(err))
+			} else {
+				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
+			}
+
+		} else {
+			logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
+		}
 	}
 
 	// a node was added -- listen the new node
@@ -178,7 +190,7 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
 			if MaxFailTimes <= failTimes {
 				failTimes = MaxFailTimes
 			}
-			logger.Warnf("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
+			logger.Infof("listenDirEvent(path{%s}) = error{%v}", zkPath, err)
 			// clear the event channel
 		CLEAR:
 			for {
@@ -189,6 +201,11 @@ func (l *ZkEventListener) listenDirEvent(zkPath string, listener remoting.DataLi
 				}
 			}
 			l.client.RegisterEvent(zkPath, &event)
+			if err == errNilNode {
+				logger.Warnf("listenDirEvent(path{%s}) got errNilNode,so exit listen", zkPath)
+				l.client.UnregisterEvent(zkPath, &event)
+				return
+			}
 			select {
 			case <-getty.GetTimeWheel().After(timeSecondDuration(failTimes * ConnDelay)):
 				l.client.UnregisterEvent(zkPath, &event)
@@ -263,56 +280,11 @@ func timeSecondDuration(sec int) time.Duration {
 	return time.Duration(sec) * time.Second
 }
 
-// this func is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
+// ListenServiceEvent is invoked by ZkConsumerRegistry::Register/ZkConsumerRegistry::get/ZkConsumerRegistry::getListener
 // registry.go:Listen -> listenServiceEvent -> listenDirEvent -> ListenServiceNodeEvent
 //                            |
 //                            --------> ListenServiceNodeEvent
 func (l *ZkEventListener) ListenServiceEvent(zkPath string, listener remoting.DataListener) {
-	var (
-		err       error
-		dubboPath string
-		children  []string
-	)
-
-	zkPath = strings.ReplaceAll(zkPath, "$", "%24")
-	l.pathMapLock.Lock()
-	_, ok := l.pathMap[zkPath]
-	l.pathMapLock.Unlock()
-	if ok {
-		logger.Warnf("@zkPath %s has already been listened.", zkPath)
-		return
-	}
-
-	l.pathMapLock.Lock()
-	l.pathMap[zkPath] = struct{}{}
-	l.pathMapLock.Unlock()
-
-	logger.Infof("listen dubbo provider path{%s} event and wait to get all provider zk nodes", zkPath)
-	children, err = l.client.GetChildren(zkPath)
-	if err != nil {
-		children = nil
-		logger.Warnf("fail to get children of zk path{%s}", zkPath)
-	}
-
-	for _, c := range children {
-		// listen l service node
-		dubboPath = path.Join(zkPath, c)
-		content, _, err := l.client.Conn.Get(dubboPath)
-		if err != nil {
-			logger.Errorf("Get new node path {%v} 's content error,message is  {%v}", dubboPath, perrors.WithStack(err))
-		}
-		if !listener.DataChange(remoting.Event{Path: dubboPath, Action: remoting.EventTypeAdd, Content: string(content)}) {
-			continue
-		}
-		logger.Infof("listen dubbo service key{%s}", dubboPath)
-		go func(zkPath string, listener remoting.DataListener) {
-			if l.ListenServiceNodeEvent(zkPath) {
-				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
-			}
-			logger.Warnf("listenSelf(zk path{%s}) goroutine exit now", zkPath)
-		}(dubboPath, listener)
-	}
-
 	logger.Infof("listen dubbo path{%s}", zkPath)
 	go func(zkPath string, listener remoting.DataListener) {
 		l.listenDirEvent(zkPath, listener)
@@ -324,6 +296,7 @@ func (l *ZkEventListener) valid() bool {
 	return l.client.ZkConnValid()
 }
 
+// Close ...
 func (l *ZkEventListener) Close() {
 	l.wg.Wait()
 }
diff --git a/remoting/zookeeper/listener_test.go b/remoting/zookeeper/listener_test.go
index aa627c7..43e9aca 100644
--- a/remoting/zookeeper/listener_test.go
+++ b/remoting/zookeeper/listener_test.go
@@ -24,7 +24,7 @@ import (
 	"time"
 )
 import (
-	"github.com/samuel/go-zookeeper/zk"
+	"github.com/dubbogo/go-zookeeper/zk"
 	"github.com/stretchr/testify/assert"
 )
 import (
@@ -97,12 +97,11 @@ func TestListener(t *testing.T) {
 	listener := NewZkEventListener(client)
 	dataListener := &mockDataListener{client: client, changedData: changedData, wait: &wait}
 	listener.ListenServiceEvent("/dubbo", dataListener)
-
+	time.Sleep(1 * time.Second)
 	_, err := client.Conn.Set("/dubbo/dubbo.properties", []byte(changedData), 1)
 	assert.NoError(t, err)
 	wait.Wait()
 	assert.Equal(t, changedData, dataListener.eventList[1].Content)
-	client.Close()
 
 }