You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/01/12 04:54:43 UTC
[dubbo-go] branch 3.0 updated: refactor zk dynamic configuration listener (#1665)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 20e6451 refactor zk dynamic configuration listener (#1665)
20e6451 is described below
commit 20e6451d49e31e36b47da1b96fbe6c1489d28dd8
Author: Mulavar <97...@qq.com>
AuthorDate: Wed Jan 12 12:54:38 2022 +0800
refactor zk dynamic configuration listener (#1665)
Co-authored-by: dongjianhui03 <do...@meituan.com>
---
cluster/router/v3router/router_chain.go | 34 +++++++----
common/constant/default.go | 7 +--
common/constant/key.go | 1 +
config/consumer_config.go | 2 +-
config_center/zookeeper/impl.go | 29 ++++++---
config_center/zookeeper/listener.go | 56 +++++++++---------
remoting/listener.go | 2 +-
remoting/zookeeper/client.go | 2 +-
remoting/zookeeper/listener.go | 101 +++++++++++++++++++++-----------
9 files changed, 146 insertions(+), 88 deletions(-)
diff --git a/cluster/router/v3router/router_chain.go b/cluster/router/v3router/router_chain.go
index 783e94a..6d593cf 100644
--- a/cluster/router/v3router/router_chain.go
+++ b/cluster/router/v3router/router_chain.go
@@ -35,6 +35,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/protocol"
+ "dubbo.apache.org/dubbo-go/v3/remoting"
)
// RouterChain contains all uniform router logic
@@ -46,31 +47,36 @@ type RouterChain struct {
// nolint
func NewUniformRouterChain() (router.PriorityRouter, error) {
- // 1. add mesh route listener
+ // 1. Add mesh route listener
r := &RouterChain{}
rootConfig := config.GetRootConfig()
dynamicConfiguration := conf.GetEnvInstance().GetDynamicConfiguration()
if dynamicConfiguration == nil {
- logger.Infof("[Mesh Router] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
+ logger.Infof("[NewUniformRouterChain] Config center does not start, please check if the configuration center has been properly configured in dubbogo.yml")
return nil, nil
}
- dynamicConfiguration.AddListener(rootConfig.Application.Name, r)
- // 2. try to get mesh route configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
+ // 2. Try to get mesh rules configuration, default key is "dubbo.io.MESHAPPRULE" with group "dubbo"
key := rootConfig.Application.Name + constant.MeshRouteSuffix
+ group := rootConfig.ConfigCenter.Group
+ if group == "" {
+ group = constant.Dubbo
+ }
+ dynamicConfiguration.AddListener(group+constant.PathSeparator+key, r)
meshRouteValue, err := dynamicConfiguration.GetProperties(key, config_center.WithGroup(rootConfig.ConfigCenter.Group))
if err != nil {
- // the mesh route may not be initialized now
- logger.Warnf("Can not get mesh route for key=%s, error=%v", key, err)
+ // The mesh rules may not be initialized now
+ logger.Warnf("[NewUniformRouterChain]Can not get mesh rules for group=%s, key=%s, error=%+v", rootConfig.ConfigCenter.Group, key, err)
return r, nil
}
- logger.Debugf("Successfully get mesh route:%s", meshRouteValue)
+ logger.Debugf("[NewUniformRouterChain]Successfully get mesh rules:%s", meshRouteValue)
routes, err := parseRoute(meshRouteValue)
if err != nil {
- logger.Warnf("Parse mesh route failed, error=%v", err)
+ logger.Warnf("[NewUniformRouterChain]Parse mesh rules failed, error=%+v", err)
return nil, err
}
r.routers = routes
+ logger.Infof("[NewUniformRouterChain]Successfully init mesh rules with:\n%s", meshRouteValue)
return r, nil
}
@@ -84,13 +90,19 @@ func (r *RouterChain) Route(invokers []protocol.Invoker, url *common.URL, invoca
// Process process route config change event
func (r *RouterChain) Process(event *config_center.ConfigChangeEvent) {
- logger.Debugf("RouteChain process event:\n%+v", event)
+ logger.Infof("[RouteChain]Process config change event:%+v", event)
+ if event.ConfigType == remoting.EventTypeDel {
+ r.routers = nil
+ return
+ }
routers, err := parseRoute(event.Value.(string))
if err != nil {
+ logger.Warnf("[RouteChain]Parse new mesh route config error, %+v "+
+ "and we will use the original mesh rule configuration.", err)
return
}
r.routers = routers
- // todo delete router
+ logger.Infof("[RouteChain]Parse Mesh Rule Success.")
}
// Name get name of ConnCheckerRouter
@@ -108,7 +120,7 @@ func (r *RouterChain) URL() *common.URL {
return nil
}
-// parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
+// Deprecated parseFromConfigToRouters parse virtualService and destinationRule yaml file bytes to target router list
func parseFromConfigToRouters(virtualServiceConfig, destinationRuleConfig []byte) ([]*UniformRouter, error) {
var virtualServiceConfigList []*config.VirtualServiceConfig
destRuleConfigsMap := make(map[string]map[string]map[string]string)
diff --git a/common/constant/default.go b/common/constant/default.go
index 70a7eba..5b40cce 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -27,11 +27,8 @@ const (
)
const (
- DefaultWeight = 100 //
- DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
-)
-
-const (
+ DefaultWeight = 100 //
+ DefaultWarmup = 10 * 60 // in java here is 10*60*1000 because of System.currentTimeMillis() is measured in milliseconds & in go time.Unix() is second
DefaultLoadBalance = "random"
DefaultRetries = "2"
DefaultRetriesInt = 2
diff --git a/common/constant/key.go b/common/constant/key.go
index 97d2851..60c544c 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -49,6 +49,7 @@ const (
PortKey = "port"
ProtocolKey = "protocol"
PathSeparator = "/"
+ DotSeparator = "."
CommaSeparator = ","
SslEnabledKey = "ssl-enabled"
// ParamsTypeKey key used in pass through invoker factory, to define param type
diff --git a/config/consumer_config.go b/config/consumer_config.go
index f9344f3..5e641b6 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -83,7 +83,7 @@ func (cc *ConsumerConfig) Init(rc *RootConfig) error {
// try to use interface name defined by pb
triplePBService, ok := reference.(common.TriplePBService)
if !ok {
- logger.Errorf("Dubbogo cannot get interface name with reference = %s."+
+ logger.Errorf("Dubbo-go cannot get interface name with reference = %s."+
"Please run the command 'go install github.com/dubbogo/tools/cmd/protoc-gen-go-triple@latest' to get the latest "+
"protoc-gen-go-triple, and then re-generate your pb file again by this tool."+
"If you are not using pb serialization, please set 'interfaceName' field in reference config to let dubbogo get the interface name.", key)
diff --git a/config_center/zookeeper/impl.go b/config_center/zookeeper/impl.go
index d6b6da1..730b882 100644
--- a/config_center/zookeeper/impl.go
+++ b/config_center/zookeeper/impl.go
@@ -20,6 +20,7 @@ package zookeeper
import (
"encoding/base64"
"strconv"
+ "strings"
"sync"
)
@@ -65,8 +66,9 @@ type zookeeperDynamicConfiguration struct {
func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfiguration, error) {
c := &zookeeperDynamicConfiguration{
- url: url,
- rootPath: "/" + url.GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/config",
+ url: url,
+ // TODO adapt config center config
+ rootPath: "/dubbo/config",
}
logger.Infof("[Zookeeper ConfigCenter] New Zookeeper ConfigCenter with Configuration: %+v, url = %+v", c, c.GetURL())
if v, ok := config.GetRootConfig().ConfigCenter.Params["base64"]; ok {
@@ -93,13 +95,26 @@ func newZookeeperDynamicConfiguration(url *common.URL) (*zookeeperDynamicConfigu
// Start listener
c.listener = zookeeper.NewZkEventListener(c.client)
- c.cacheListener = NewCacheListener(c.rootPath)
- c.listener.ListenServiceEvent(url, c.rootPath, c.cacheListener)
+ c.cacheListener = NewCacheListener(c.rootPath, c.listener)
+ c.listener.ListenConfigurationEvent(c.rootPath, c.cacheListener)
return c, nil
}
-func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
- c.cacheListener.AddListener(key, listener)
+// AddListener add listener for key
+// TODO this method should has a parameter 'group', and it does not now, so we should concat group and key with '/' manually
+func (c *zookeeperDynamicConfiguration) AddListener(key string, listener config_center.ConfigurationListener, options ...config_center.Option) {
+ qualifiedKey := buildPath(c.rootPath, key)
+ c.cacheListener.AddListener(qualifiedKey, listener)
+}
+
+// buildPath build path and format
+func buildPath(rootPath, subPath string) string {
+ path := strings.TrimRight(rootPath+pathSeparator+subPath, pathSeparator)
+ if !strings.HasPrefix(path, pathSeparator) {
+ path = pathSeparator + path
+ }
+ path = strings.ReplaceAll(path, "//", "/")
+ return path
}
func (c *zookeeperDynamicConfiguration) RemoveListener(key string, listener config_center.ConfigurationListener, opions ...config_center.Option) {
@@ -118,7 +133,7 @@ func (c *zookeeperDynamicConfiguration) GetProperties(key string, opts ...config
if len(tmpOpts.Group) != 0 {
key = tmpOpts.Group + "/" + key
} else {
- key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup)
+ key = c.GetURL().GetParam(constant.ConfigNamespaceKey, config_center.DefaultGroup) + "/" + key
}
content, _, err := c.client.GetContent(c.rootPath + "/" + key)
if err != nil {
diff --git a/config_center/zookeeper/listener.go b/config_center/zookeeper/listener.go
index 5dd5457..3d31179 100644
--- a/config_center/zookeeper/listener.go
+++ b/config_center/zookeeper/listener.go
@@ -24,27 +24,33 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
- "dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/remoting"
+ "dubbo.apache.org/dubbo-go/v3/remoting/zookeeper"
)
// CacheListener defines keyListeners and rootPath
type CacheListener struct {
- keyListeners sync.Map
- rootPath string
+ // key is zkNode Path and value is set of listeners
+ keyListeners sync.Map
+ zkEventListener *zookeeper.ZkEventListener
+ rootPath string
}
// NewCacheListener creates a new CacheListener
-func NewCacheListener(rootPath string) *CacheListener {
- return &CacheListener{rootPath: rootPath}
+func NewCacheListener(rootPath string, listener *zookeeper.ZkEventListener) *CacheListener {
+ return &CacheListener{zkEventListener: listener, rootPath: rootPath}
}
// AddListener will add a listener if loaded
func (l *CacheListener) AddListener(key string, listener config_center.ConfigurationListener) {
+ // FIXME do not use Client.ExistW, cause it has a bug(can not watch zk node that do not exist)
+ _, _, _, err := l.zkEventListener.Client.Conn.ExistsW(key)
// reference from https://stackoverflow.com/questions/34018908/golang-why-dont-we-have-a-set-datastructure
// make a map[your type]struct{} like set in java
+ if err != nil {
+ return
+ }
listeners, loaded := l.keyListeners.LoadOrStore(key, map[config_center.ConfigurationListener]struct{}{listener: {}})
if loaded {
listeners.(map[config_center.ConfigurationListener]struct{})[listener] = struct{}{}
@@ -62,36 +68,28 @@ func (l *CacheListener) RemoveListener(key string, listener config_center.Config
// DataChange changes all listeners' event
func (l *CacheListener) DataChange(event remoting.Event) bool {
+ changeType := event.Action
if event.Content == "" {
- // meanings new node
- return true
- }
- var key string
- // TODO use common way
- if strings.HasSuffix(event.Path, constant.MeshRouteSuffix) {
- key = config.GetRootConfig().Application.Name
- } else {
- key = l.pathToKey(event.Path)
+ changeType = remoting.EventTypeDel
}
- if key != "" {
- if listeners, ok := l.keyListeners.Load(key); ok {
- for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
- listener.Process(&config_center.ConfigChangeEvent{Key: key, Value: event.Content, ConfigType: event.Action})
- }
- return true
+
+ if listeners, ok := l.keyListeners.Load(event.Path); ok {
+ for listener := range listeners.(map[config_center.ConfigurationListener]struct{}) {
+ listener.Process(&config_center.ConfigChangeEvent{
+ Key: l.pathToKey(event.Path),
+ Value: event.Content,
+ ConfigType: changeType,
+ })
}
+ return true
}
return false
}
func (l *CacheListener) pathToKey(path string) string {
- key := strings.Replace(strings.Replace(path, l.rootPath+"/", "", -1), "/", ".", -1)
- if strings.HasSuffix(key, constant.ConfiguratorSuffix) ||
- strings.HasSuffix(key, constant.TagRouterRuleSuffix) ||
- strings.HasSuffix(key, constant.ConditionRouterRuleSuffix) {
- // governance config, so we remove the "dubbo." prefix
- key = key[strings.Index(key, ".")+1:]
+ if len(path) == 0 {
+ return path
}
- logger.Debugf("pathToKey path:%s, key:%s\n", path, key)
- return key
+ groupKey := strings.Replace(strings.Replace(path, l.rootPath+constant.PathSeparator, "", -1), constant.PathSeparator, constant.DotSeparator, -1)
+ return groupKey[strings.Index(groupKey, constant.DotSeparator)+1:]
}
diff --git a/remoting/listener.go b/remoting/listener.go
index a87e502..ea2300f 100644
--- a/remoting/listener.go
+++ b/remoting/listener.go
@@ -23,7 +23,7 @@ import (
// DataListener defines common data listener interface
type DataListener interface {
- DataChange(eventType Event) bool // bool is return for interface implement is interesting
+ DataChange(event Event) bool // bool is return for interface implement is interesting
}
//////////////////////////////////////////
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 5c955da..f05863a 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -52,7 +52,7 @@ func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
timeout := url.GetParamDuration(constant.ConfigTimeoutKey, constant.DefaultRegTimeout)
zkAddresses := strings.Split(url.Location, ",")
- logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %d", zkName, url.Location, timeout.String())
+ logger.Infof("[Zookeeper Client] New zookeeper client with name = %s, zkAddress = %s, timeout = %s", zkName, url.Location, timeout.String())
newClient, cltErr := gxzookeeper.NewZookeeperClient(zkName, zkAddresses, true, gxzookeeper.WithZkTimeOut(timeout))
if cltErr != nil {
logger.Warnf("newZookeeperClient(name{%s}, zk address{%v}, timeout{%d}) = error{%v}",
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index a7f7be8..7bf4382 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -45,7 +45,7 @@ var defaultTTL = 10 * time.Minute
// nolint
type ZkEventListener struct {
- client *gxzookeeper.ZookeeperClient
+ Client *gxzookeeper.ZookeeperClient
pathMapLock sync.Mutex
pathMap map[string]*uatomic.Int32
wg sync.WaitGroup
@@ -55,17 +55,12 @@ type ZkEventListener struct {
// NewZkEventListener returns a EventListener instance
func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
- client: client,
+ Client: client,
pathMap: make(map[string]*uatomic.Int32),
exit: make(chan struct{}),
}
}
-// nolint
-func (l *ZkEventListener) SetClient(client *gxzookeeper.ZookeeperClient) {
- l.client = client
-}
-
// ListenServiceNodeEvent listen a path node event
func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remoting.DataListener) {
// listen l service node
@@ -81,6 +76,57 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
}(zkPath, listener)
}
+// ListenConfigurationEvent listen a path node event
+func (l *ZkEventListener) ListenConfigurationEvent(zkPath string, listener remoting.DataListener) {
+ l.wg.Add(1)
+ go func(zkPath string, listener remoting.DataListener) {
+ var eventChan = make(chan zk.Event, 16)
+ l.Client.RegisterEvent(zkPath, eventChan)
+ for {
+ select {
+ case event := <-eventChan:
+ logger.Infof("[ZkEventListener]Receive configuration change event:%#v", event)
+ if event.Type == zk.EventNodeChildrenChanged || event.Type == zk.EventNotWatching {
+ continue
+ }
+ // 1. Re-set watcher for the zk node
+ _, _, _, err := l.Client.Conn.ExistsW(event.Path)
+ if err != nil {
+ logger.Warnf("[ZkEventListener]Re-set watcher error, the reason is %+v", err)
+ continue
+ }
+
+ action := remoting.EventTypeAdd
+ var content string
+ if event.Type == zk.EventNodeDeleted {
+ action = remoting.EventTypeDel
+ } else {
+ // 2. Try to get new configuration value of the zk node
+ // Notice: The order of step 1 and step 2 cannot be swapped, if you get value(with timestamp t1)
+ // before re-set the watcher(with timestamp t2), and some one change the data of the zk node after
+ // t2 but before t1, you may get the old value, and the new value will not trigger the event.
+ contentBytes, _, err := l.Client.Conn.Get(event.Path)
+ if err != nil {
+ logger.Warnf("[ListenConfigurationEvent]Get config value error, the reason is %+v", err)
+ continue
+ }
+ content = string(contentBytes)
+ logger.Debugf("[ZkEventListener]Successfully get new config value: %s", string(content))
+ }
+
+ listener.DataChange(remoting.Event{
+ Path: event.Path,
+ Action: remoting.EventType(action),
+ Content: content,
+ })
+ case <-l.exit:
+ return
+ }
+ }
+
+ }(zkPath, listener)
+}
+
// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()
@@ -97,7 +143,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
var zkEvent zk.Event
for {
- keyEventCh, err := l.client.ExistW(zkPath)
+ keyEventCh, err := l.Client.ExistW(zkPath)
if err != nil {
logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
return false
@@ -111,7 +157,7 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
case zk.EventNodeDataChanged:
logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDataChanged}", zkPath)
if len(listener) > 0 {
- content, _, err := l.client.Conn.Get(zkEvent.Path)
+ content, _, err := l.Client.Conn.Get(zkEvent.Path)
if err != nil {
logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err)
return false
@@ -119,9 +165,9 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeUpdate, Content: string(content)})
}
case zk.EventNodeCreated:
- logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeCreated}", zkPath)
+ logger.Warnf("[ZkEventListener][listenServiceNodeEvent]Get a EventNodeCreated event for path {%s}", zkPath)
if len(listener) > 0 {
- content, _, err := l.client.Conn.Get(zkEvent.Path)
+ content, _, err := l.Client.Conn.Get(zkEvent.Path)
if err != nil {
logger.Warnf("zk.Conn.Get{key:%s} = error{%v}", zkPath, err)
return false
@@ -129,9 +175,9 @@ func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remo
listener[0].DataChange(remoting.Event{Path: zkEvent.Path, Action: remoting.EventTypeAdd, Content: string(content)})
}
case zk.EventNotWatching:
- logger.Warnf("zk.ExistW(key{%s}) = event{EventNotWatching}", zkPath)
+ logger.Infof("[ZkEventListener][listenServiceNodeEvent]Get a EventNotWatching event for path {%s}", zkPath)
case zk.EventNodeDeleted:
- logger.Warnf("zk.ExistW(key{%s}) = event{EventNodeDeleted}", zkPath)
+ logger.Infof("[ZkEventListener][listenServiceNodeEvent]Get a EventNodeDeleted event for path {%s}", zkPath)
return true
}
case <-l.exit:
@@ -150,11 +196,11 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
return false
}
- newChildren, err := l.client.GetChildren(zkPath)
+ newChildren, err := l.Client.GetChildren(zkPath)
if err != nil {
// TODO need to ignore this error in gost
if err == gxzookeeper.ErrNilChildren {
- content, _, connErr := l.client.Conn.Get(zkPath)
+ content, _, connErr := l.Client.Conn.Get(zkPath)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
zkPath, perrors.WithStack(connErr))
@@ -176,7 +222,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newNode = path.Join(zkPath, n)
logger.Debugf("[Zookeeper Listener] add zkNode{%s}", newNode)
- content, _, connErr := l.client.Conn.Get(newNode)
+ content, _, connErr := l.Client.Conn.Get(newNode)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
newNode, perrors.WithStack(connErr))
@@ -231,7 +277,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
for {
// Get current children with watcher for the zkRootPath
- children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
+ children, childEventCh, err := l.Client.GetChildrenW(zkRootPath)
if err != nil {
failTimes++
if MaxFailTimes <= failTimes {
@@ -277,14 +323,14 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
- l.client.RLock()
- if l.client.Conn == nil {
- l.client.RUnlock()
+ l.Client.RLock()
+ if l.Client.Conn == nil {
+ l.Client.RUnlock()
break
}
- content, _, err := l.client.Conn.Get(zkNodePath)
+ content, _, err := l.Client.Conn.Get(zkNodePath)
- l.client.RUnlock()
+ l.Client.RUnlock()
if err != nil {
logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get content of the child node {%v} failed, the error is %+v", zkNodePath, perrors.WithStack(err))
}
@@ -304,17 +350,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
}(zkNodePath, listener)
-
- // listen sub path recursive
- // if zkPath is end of "providers/ & consumers/" we do not listen children dir
- if strings.LastIndex(zkRootPath, constant.ProviderCategory) == -1 &&
- strings.LastIndex(zkRootPath, constant.ConsumerCategory) == -1 {
- l.wg.Add(1)
- go func(zkPath string, listener remoting.DataListener) {
- l.listenDirEvent(conf, zkPath, listener)
- logger.Warnf("listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
- }(zkNodePath, listener)
- }
}
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return