You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by "justxuewei (via GitHub)" <gi...@apache.org> on 2023/04/03 13:54:39 UTC
[GitHub] [dubbo-go] justxuewei commented on a diff in pull request #2267: add subscribe any value
justxuewei commented on code in PR #2267:
URL: https://github.com/apache/dubbo-go/pull/2267#discussion_r1155984607
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
Review Comment:
```suggestion
func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
```
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+ }
Review Comment:
```go
if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
ttl = timeout
} else {
logger.Warnf
}
##########
remoting/zookeeper/listener.go:
##########
@@ -279,7 +363,8 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
provider, _ := common.NewURL(c)
- if provider.ServiceKey() != conf.ServiceKey() {
+ if provider.Interface() != intf || conf.Group() != constant.AnyValue && conf.Group() != provider.Group() ||
+ conf.Version() != constant.AnyValue && conf.Version() != provider.Version() {
Review Comment:
Use `is_any_condition()` instead.
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
Review Comment:
```suggestion
// listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
```
##########
registry/zookeeper/listener.go:
##########
@@ -101,10 +102,25 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
ConfigType: event.Action,
},
)
- return true
+ match = true
}
+
+ // AnyValue condition
+ intf, group, version := common.ParseServiceKey(serviceKey)
+ if intf != constant.AnyValue || group != constant.AnyValue && group != serviceURL.Group() ||
+ version != constant.AnyValue && version != serviceURL.Group() {
+ continue
Review Comment:
I think here you should wrap them into a method, named `is_any_condition()`, and add some unit tests for it.
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes * ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the children path
+ zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+ // Save the path to avoid listen repeatedly
+ l.pathMapLock.Lock()
+ _, ok := l.pathMap[zkRootPath]
+ if !ok {
+ l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+ }
+ l.pathMapLock.Unlock()
Review Comment:
Remove this.
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes * ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the children path
+ zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+ // Save the path to avoid listen repeatedly
+ l.pathMapLock.Lock()
Review Comment:
```suggestion
l.pathMapLock.Lock()
defer l.pathMapLock.Unlock()
```
##########
registry/zookeeper/listener.go:
##########
@@ -101,10 +102,25 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
ConfigType: event.Action,
},
)
- return true
+ match = true
}
+
+ // AnyValue condition
+ intf, group, version := common.ParseServiceKey(serviceKey)
+ if intf != constant.AnyValue || group != constant.AnyValue && group != serviceURL.Group() ||
+ version != constant.AnyValue && version != serviceURL.Group() {
+ continue
+ }
+ listener.Process(
+ &config_center.ConfigChangeEvent{
+ Key: event.Path,
+ Value: serviceURL.Clone(),
+ ConfigType: event.Action,
+ },
+ )
+ match = true
}
- return false
+ return match
Review Comment:
The `listener.Process()` is called twice here. However, it could be reduced to call once.
```
if (match service key) || (match any condition) {
listener.Process()
match = true
}
```
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes * ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the children path
+ zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+ // Save the path to avoid listen repeatedly
+ l.pathMapLock.Lock()
+ _, ok := l.pathMap[zkRootPath]
+ if !ok {
+ l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+ }
+ l.pathMapLock.Unlock()
+ if ok {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
+ continue
+ }
Review Comment:
```go
if _, ok := l.pathMap[zkRootPath]; ok {
logger.Warnf
} else {
l.pathMap[zkRootPath] = uatomic.NewInt32(0)
}
##########
common/url.go:
##########
@@ -413,6 +413,31 @@ func ServiceKey(intf string, group string, version string) string {
return buf.String()
}
+// ParseServiceKey get interface, group and version from servicekey
Review Comment:
```suggestion
// ParseServiceKey gets interface, group and version from service key.
```
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes * ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the children path
Review Comment:
```suggestion
// Build the child path
```
##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+ if err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes * ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
Review Comment:
```suggestion
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any child from the path \"%s\", please check if the provider does ready.", rootPath)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For queries about this service, please contact Infrastructure at:
users@infra.apache.org
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org