You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/01/12 04:54:43 UTC

[dubbo-go] branch 3.0 updated: refactor zk dynamic configuration listener (#1665)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 20e6451  refactor zk dynamic configuration listener (#1665)
20e6451 is described below

commit 20e6451d49e31e36b47da1b96fbe6c1489d28dd8
Author: Mulavar <97...@qq.com>
AuthorDate: Wed Jan 12 12:54:38 2022 +0800

    refactor zk dynamic configuration listener (#1665)
    
    Co-authored-by: dongjianhui03 <do...@meituan.com>
---
 cluster/router/v3router/router_chain.go |  34 +++++++----
 common/constant/default.go              |   7 +--
 common/constant/key.go                  |   1 +
 config/consumer_config.go               |   2 +-
 config_center/zookeeper/impl.go         |  29 ++++++---
 config_center/zookeeper/listener.go     |  56 +++++++++---------
 remoting/listener.go                    |   2 +-
 remoting/zookeeper/client.go            |   2 +-
 remoting/zookeeper/listener.go          | 101 +++++++++++++++++++++-----------
 9 files changed, 146 insertions(+), 88 deletions(-)

diff --git a/cluster/router/v3router/router_chain.go b/cluster/router/v3router/router_chain.go
index 783e94a..6d593cf 100644
--- a/cluster/router/v3router/router_chain.go
+++ b/cluster/router/v3router/router_chain.go
@@ -35,6 +35,7 @@ import (
 	"dubbo.apache.org/dubbo-go/v3/config"
 	"dubbo.apache.org/dubbo-go/v3/config_center"
 	"dubbo.apache.org/dubbo-go/v3/protocol"
+	"dubbo.apache.org/dubbo-go/v3/remoting"
 )
 
 // RouterChain contains all uniform router logic
@@ -46,31 +47,36 @@ type RouterChain struct {
 
 // nolint
 func NewUniformRouterChain() (router.PriorityRouter, error) {
-	// 1. add mesh route listener
+	// 1. Add mesh route listener
 	r := &RouterChain{}
 	rootConfig := config.GetRootConfig()
 	dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
 	if dynamicConfiguration == nil {
-		logger.Infof("[Mesh Router] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
+		logger.Infof("[NewUniformRouterChain] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
 		return nil, nil
 	}
-	dynamicConfiguration.AddListener(rootConfig.Application.Name, r)
 
-	// 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
+	// 2. Try to get mesh rules configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
 	key := rootConfig.Application.Name + constant.MeshRouteSuffix
+	group := rootConfig.ConfigCenter.Group
+	if group == "" {
+		group = constant.Dubbo
+	}
+	dynamicConfiguration.AddListener(group+constant.PathSeparator+key, r)
 	meshRouteValue, err := dynamicConfiguration.GetProperties(key, config_center.WithGroup(rootConfig.ConfigCenter.Group))
 	if err != nil {
-		// the mesh route may not be initialized now
-		logger.Warnf("Can not get mesh route for key=%s, error=%v", key, err)
+		// The mesh rules may not be initialized now
+		logger.Warnf("[NewUniformRouterChain]Can not get mesh rules for group=%s, key=%s, error=%+v", rootConfig.ConfigCenter.Group, key, err)
 		return r, nil
 	}
-	logger.Debugf("Successfully get mesh route:%s", meshRouteValue)
+	logger.Debugf("[NewUniformRouterChain]Successfully get mesh rules:%s", meshRouteValue)
 	routes, err := parseRoute(meshRouteValue)
 	if err != nil {
-		logger.Warnf("Parse mesh route failed, error=%v", err)
+		logger.Warnf("[NewUniformRouterChain]Parse mesh rules failed, error=%+v", err)
 		return nil, err
 	}
 	r.routers = routes
+	logger.Infof("[NewUniformRouterChain]Successfully init mesh rules with:\n%s", meshRouteValue)
 	return r, nil
 }
 
@@ -84,13 +90,19 @@ func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invoca
 
 // Process process route config change event
 func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
-	logger.Debugf("RouteChain process event:\n%+v", event)
+	logger.Infof("[RouteChain]Process config change event:%+v", event)
+	if event.ConfigType == remoting.EventTypeDel {
+		r.routers = nil
+		return
+	}
 	routers, err := parseRoute(event.Value.(string))
 	if err != nil {
+		logger.Warnf("[RouteChain]Parse new mesh route config error, %+v "+
+			"and we will use the original mesh rule configuration.", err)
 		return
 	}
 	r.routers = routers
-	// todo delete router
+	logger.Infof("[RouteChain]Parse Mesh Rule Success.")
 }
 
 // Name get name of ConnCheckerRouter
@@ -108,7 +120,7 @@ func (r *RouterChain) URL() *common.URL {
 	return nil
 }
 
-// parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
+// Deprecated parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
 func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte) ([]*UniformRouter, error) {
 	var virtualServiceConfigList []*config.VirtualServiceConfig
 	destRuleConfigsMap := make(map[string]map[string]map[string]string)
diff --git a/common/constant/default.go b/common/constant/default.go
index 70a7eba..5b40cce 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -27,11 +27,8 @@ const (
 )
 
 const (
-	DefaultWeight = 100     //
-	DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
-)
-
-const (
+	DefaultWeight           = 100     //
+	DefaultWarmup           = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
 	DefaultLoadBalance      = "random"
 	DefaultRetries          = "2"
 	DefaultRetriesInt       = 2
diff --git a/common/constant/key.go b/common/constant/key.go
index 97d2851..60c544c 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -49,6 +49,7 @@ const (
 	PortKey                = "port"
 	ProtocolKey            = "protocol"
 	PathSeparator          = "/"
+	DotSeparator           = "."
 	CommaSeparator         = ","
 	SslEnabledKey          = "ssl-enabled"
 	// ParamsTypeKey key used in pass through invoker factory, to define param type
diff --git a/config/consumer_config.go b/config/consumer_config.go
index f9344f3..5e641b6 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -83,7 +83,7 @@ func (cc *ConsumerConfig) Init(rc *RootConfig) error {
 			// try to use interface name defined by pb
 			triplePBService, ok := reference.(common.TriplePBService)
 			if !ok {
-				logger.Errorf("Dubbogo cannot get interface name with reference = %s."+
+				logger.Errorf("Dubbo-go cannot get interface name with reference = %s."+
 					"Please run the command 'go install github.com/dubbogo/tools/cmd/protoc-gen-go-triple@latest' to get the latest "+
 					"protoc-gen-go-triple,  and then re-generate your pb file again by this tool."+
 					"If you are not using pb serialization, please set 'interfaceName' field in reference config to let dubbogo get the interface name.", key)
diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index d6b6da1..730b882 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -20,6 +20,7 @@ package zookeeper
 import (
 	"encoding/base64"
 	"strconv"
+	"strings"
 	"sync"
 )
 
@@ -65,8 +66,9 @@ type zookeeperDynamicConfiguration struct {
 
 func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) {
 	c := &zookeeperDynamicConfiguration{
-		url:      url,
-		rootPath: "/" + url.GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/config",
+		url: url,
+		// TODO adapt config center config
+		rootPath: "/dubbo/config",
 	}
 	logger.Infof("[Zookeeper ConfigCenter] New Zookeeper ConfigCenter with Configuration: %+v, url = %+v", c, c.GetURL())
 	if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok {
@@ -93,13 +95,26 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
 
 	// Start listener
 	c.listener = zookeeper.NewZkEventListener(c.client)
-	c.cacheListener = NewCacheListener(c.rootPath)
-	c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener)
+	c.cacheListener = NewCacheListener(c.rootPath, c.listener)
+	c.listener.ListenConfigurationEvent(c.rootPath, c.cacheListener)
 	return c, nil
 }
 
-func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
-	c.cacheListener.AddListener(key, listener)
+// AddListener add listener for key
+// TODO this method should has a parameter 'group', and it does not now, so we should concat group and key with '/' manually
+func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, options ...config_center.Option) {
+	qualifiedKey := buildPath(c.rootPath, key)
+	c.cacheListener.AddListener(qualifiedKey, listener)
+}
+
+// buildPath build path and format
+func buildPath(rootPath, subPath string) string {
+	path := strings.TrimRight(rootPath+pathSeparator+subPath, pathSeparator)
+	if !strings.HasPrefix(path, pathSeparator) {
+		path = pathSeparator + path
+	}
+	path = strings.ReplaceAll(path, "//", "/")
+	return path
 }
 
 func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
@@ -118,7 +133,7 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
 	if len(tmpOpts.Group) != 0 {
 		key = tmpOpts.Group + "/" + key
 	} else {
-		key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup)
+		key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/" + key
 	}
 	content, _, err := c.client.GetContent(c.rootPath + "/" + key)
 	if err != nil {
diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go
index 5dd5457..3d31179 100644
--- a/config_center/zookeeper/listener.go
+++ b/config_center/zookeeper/listener.go
@@ -24,27 +24,33 @@ import (
 
 import (
 	"dubbo.apache.org/dubbo-go/v3/common/constant"
-	"dubbo.apache.org/dubbo-go/v3/common/logger"
-	"dubbo.apache.org/dubbo-go/v3/config"
 	"dubbo.apache.org/dubbo-go/v3/config_center"
 	"dubbo.apache.org/dubbo-go/v3/remoting"
+	"dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
 )
 
 // CacheListener defines keyListeners and rootPath
 type CacheListener struct {
-	keyListeners sync.Map
-	rootPath     string
+	// key is zkNode Path and value is set of listeners
+	keyListeners    sync.Map
+	zkEventListener *zookeeper.ZkEventListener
+	rootPath        string
 }
 
 // NewCacheListener creates a new CacheListener
-func NewCacheListener(rootPath string) *CacheListener {
-	return &CacheListener{rootPath: rootPath}
+func NewCacheListener(rootPath string, listener *zookeeper.ZkEventListener) *CacheListener {
+	return &CacheListener{zkEventListener: listener, rootPath: rootPath}
 }
 
 // AddListener will add a listener if loaded
 func (l *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
+	// FIXME do not use Client.ExistW, cause it has a bug(can not watch zk node that do not exist)
+	_, _, _, err := l.zkEventListener.Client.Conn.ExistsW(key)
 	// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
 	// make a map[your type]struct{} like set in java
+	if err != nil {
+		return
+	}
 	listeners, loaded := l.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}})
 	if loaded {
 		listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
@@ -62,36 +68,28 @@ func (l *CacheListener) RemoveListener(key string, listener config_center.Config
 
 // DataChange changes all listeners' event
 func (l *CacheListener) DataChange(event remoting.Event) bool {
+	changeType := event.Action
 	if event.Content == "" {
-		// meanings new node
-		return true
-	}
-	var key string
-	// TODO use common way
-	if strings.HasSuffix(event.Path, constant.MeshRouteSuffix) {
-		key = config.GetRootConfig().Application.Name
-	} else {
-		key = l.pathToKey(event.Path)
+		changeType = remoting.EventTypeDel
 	}
-	if key != "" {
-		if listeners, ok := l.keyListeners.Load(key); ok {
-			for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
-				listener.Process(&config_center.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action})
-			}
-			return true
+
+	if listeners, ok := l.keyListeners.Load(event.Path); ok {
+		for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
+			listener.Process(&config_center.ConfigChangeEvent{
+				Key:        l.pathToKey(event.Path),
+				Value:      event.Content,
+				ConfigType: changeType,
+			})
 		}
+		return true
 	}
 	return false
 }
 
 func (l *CacheListener) pathToKey(path string) string {
-	key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1)
-	if strings.HasSuffix(key, constant.ConfiguratorSuffix) ||
-		strings.HasSuffix(key, constant.TagRouterRuleSuffix) ||
-		strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) {
-		// governance config, so we remove the "dubbo." prefix
-		key = key[strings.Index(key, ".")+1:]
+	if len(path) == 0 {
+		return path
 	}
-	logger.Debugf("pathToKey path:%s, key:%s\n", path, key)
-	return key
+	groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1)
+	return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:]
 }
diff --git a/remoting/listener.go b/remoting/listener.go
index a87e502..ea2300f 100644
--- a/remoting/listener.go
+++ b/remoting/listener.go
@@ -23,7 +23,7 @@ import (
 
 // DataListener defines common data listener interface
 type DataListener interface {
-	DataChange(eventType Event) bool // bool is return for interface implement is interesting
+	DataChange(event Event) bool // bool is return for interface implement is interesting
 }
 
 //////////////////////////////////////////
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 5c955da..f05863a 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -52,7 +52,7 @@ func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
 		timeout := url.GetParamDuration(constant.ConfigTimeoutKey, constant.DefaultRegTimeout)
 
 		zkAddresses := strings.Split(url.Location, ",")
-		logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %d", zkName, url.Location, timeout.String())
+		logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %s", zkName, url.Location, timeout.String())
 		newClient, cltErr := gxzookeeper.NewZookeeperClient(zkName, zkAddresses, true, gxzookeeper.WithZkTimeOut(timeout))
 		if cltErr != nil {
 			logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index a7f7be8..7bf4382 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -45,7 +45,7 @@ var defaultTTL = 10 * time.Minute
 
 // nolint
 type ZkEventListener struct {
-	client      *gxzookeeper.ZookeeperClient
+	Client      *gxzookeeper.ZookeeperClient
 	pathMapLock sync.Mutex
 	pathMap     map[string]*uatomic.Int32
 	wg          sync.WaitGroup
@@ -55,17 +55,12 @@ type ZkEventListener struct {
 // NewZkEventListener returns a EventListener instance
 func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
 	return &ZkEventListener{
-		client:  client,
+		Client:  client,
 		pathMap: make(map[string]*uatomic.Int32),
 		exit:    make(chan struct{}),
 	}
 }
 
-// nolint
-func (l *ZkEventListener) SetClient(client *gxzookeeper.ZookeeperClient) {
-	l.client = client
-}
-
 // ListenServiceNodeEvent listen a path node event
 func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) {
 	// listen l service node
@@ -81,6 +76,57 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
 	}(zkPath, listener)
 }
 
+// ListenConfigurationEvent listen a path node event
+func (l *ZkEventListener) ListenConfigurationEvent(zkPath string, listener remoting.DataListener) {
+	l.wg.Add(1)
+	go func(zkPath string, listener remoting.DataListener) {
+		var eventChan = make(chan zk.Event, 16)
+		l.Client.RegisterEvent(zkPath, eventChan)
+		for {
+			select {
+			case event := <-eventChan:
+				logger.Infof("[ZkEventListener]Receive configuration change event:%#v", event)
+				if event.Type == zk.EventNodeChildrenChanged || event.Type == zk.EventNotWatching {
+					continue
+				}
+				// 1. Re-set watcher for the zk node
+				_, _, _, err := l.Client.Conn.ExistsW(event.Path)
+				if err != nil {
+					logger.Warnf("[ZkEventListener]Re-set watcher error, the reason is %+v", err)
+					continue
+				}
+
+				action := remoting.EventTypeAdd
+				var content string
+				if event.Type == zk.EventNodeDeleted {
+					action = remoting.EventTypeDel
+				} else {
+					// 2. Try to get new configuration value of the zk node
+					// Notice: The order of step 1 and step 2 cannot be swapped, if you get value(with timestamp t1)
+					// before re-set the watcher(with timestamp t2), and some one change the data of the zk node after
+					// t2 but before t1, you may get the old value, and the new value will not trigger the event.
+					contentBytes, _, err := l.Client.Conn.Get(event.Path)
+					if err != nil {
+						logger.Warnf("[ListenConfigurationEvent]Get config value error, the reason is %+v", err)
+						continue
+					}
+					content = string(contentBytes)
+					logger.Debugf("[ZkEventListener]Successfully get new config value: %s", string(content))
+				}
+
+				listener.DataChange(remoting.Event{
+					Path:    event.Path,
+					Action:  remoting.EventType(action),
+					Content: content,
+				})
+			case <-l.exit:
+				return
+			}
+		}
+
+	}(zkPath, listener)
+}
+
 // nolint
 func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
 	defer l.wg.Done()
@@ -97,7 +143,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
 
 	var zkEvent zk.Event
 	for {
-		keyEventCh, err := l.client.ExistW(zkPath)
+		keyEventCh, err := l.Client.ExistW(zkPath)
 		if err != nil {
 			logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
 			return false
@@ -111,7 +157,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
 			case zk.EventNodeDataChanged:
 				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
 				if len(listener) > 0 {
-					content, _, err := l.client.Conn.Get(zkEvent.Path)
+					content, _, err := l.Client.Conn.Get(zkEvent.Path)
 					if err != nil {
 						logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err)
 						return false
@@ -119,9 +165,9 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
 					listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeUpdate, Content: string(content)})
 				}
 			case zk.EventNodeCreated:
-				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
+				logger.Warnf("[ZkEventListener][listenServiceNodeEvent]Get a EventNodeCreated event for path {%s}", zkPath)
 				if len(listener) > 0 {
-					content, _, err := l.client.Conn.Get(zkEvent.Path)
+					content, _, err := l.Client.Conn.Get(zkEvent.Path)
 					if err != nil {
 						logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err)
 						return false
@@ -129,9 +175,9 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
 					listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeAdd, Content: string(content)})
 				}
 			case zk.EventNotWatching:
-				logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
+				logger.Infof("[ZkEventListener][listenServiceNodeEvent]Get a EventNotWatching event for path {%s}", zkPath)
 			case zk.EventNodeDeleted:
-				logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
+				logger.Infof("[ZkEventListener][listenServiceNodeEvent]Get a EventNodeDeleted event for path {%s}", zkPath)
 				return true
 			}
 		case <-l.exit:
@@ -150,11 +196,11 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		return false
 	}
 
-	newChildren, err := l.client.GetChildren(zkPath)
+	newChildren, err := l.Client.GetChildren(zkPath)
 	if err != nil {
 		// TODO need to ignore this error in gost
 		if err == gxzookeeper.ErrNilChildren {
-			content, _, connErr := l.client.Conn.Get(zkPath)
+			content, _, connErr := l.Client.Conn.Get(zkPath)
 			if connErr != nil {
 				logger.Errorf("Get new node path {%v} 's content error,message is  {%v}",
 					zkPath, perrors.WithStack(connErr))
@@ -176,7 +222,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 
 		newNode = path.Join(zkPath, n)
 		logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode)
-		content, _, connErr := l.client.Conn.Get(newNode)
+		content, _, connErr := l.Client.Conn.Get(newNode)
 		if connErr != nil {
 			logger.Errorf("Get new node path {%v} 's content error,message is  {%v}",
 				newNode, perrors.WithStack(connErr))
@@ -231,7 +277,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
 	}
 	for {
 		// Get current children with watcher for the zkRootPath
-		children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
+		children, childEventCh, err := l.Client.GetChildrenW(zkRootPath)
 		if err != nil {
 			failTimes++
 			if MaxFailTimes <= failTimes {
@@ -277,14 +323,14 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
 			}
 
 			// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
-			l.client.RLock()
-			if l.client.Conn == nil {
-				l.client.RUnlock()
+			l.Client.RLock()
+			if l.Client.Conn == nil {
+				l.Client.RUnlock()
 				break
 			}
-			content, _, err := l.client.Conn.Get(zkNodePath)
+			content, _, err := l.Client.Conn.Get(zkNodePath)
 
-			l.client.RUnlock()
+			l.Client.RUnlock()
 			if err != nil {
 				logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err))
 			}
@@ -304,17 +350,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
 				}
 				logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
 			}(zkNodePath, listener)
-
-			// listen sub path recursive
-			// if zkPath is end of "providers/ & consumers/" we do not listen children dir
-			if strings.LastIndex(zkRootPath, constant.ProviderCategory) == -1 &&
-				strings.LastIndex(zkRootPath, constant.ConsumerCategory) == -1 {
-				l.wg.Add(1)
-				go func(zkPath string, listener remoting.DataListener) {
-					l.listenDirEvent(conf, zkPath, listener)
-					logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
-				}(zkNodePath, listener)
-			}
 		}
 		if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
 			return