You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by "justxuewei (via GitHub)" <gi...@apache.org> on 2023/04/03 13:54:39 UTC

[GitHub] [dubbo-go] justxuewei commented on a diff in pull request #2267: add subscribe any value

justxuewei commented on code in PR #2267:
URL: https://github.com/apache/dubbo-go/pull/2267#discussion_r1155984607


##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {

Review Comment:
   ```suggestion
   func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
   ```



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+	var (
+		failTimes int
+		ttl       time.Duration
+	)
+	ttl = defaultTTL
+	if conf != nil {
+		timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+		if err == nil {
+			ttl = timeout
+		} else {
+			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+		}

Review Comment:
   ```go
   if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
       ttl = timeout
   } else {
       logger.Warnf
   }



##########
remoting/zookeeper/listener.go:
##########
@@ -279,7 +363,8 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
 			// Only need to compare Path when subscribing to provider
 			if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
 				provider, _ := common.NewURL(c)
-				if provider.ServiceKey() != conf.ServiceKey() {
+				if provider.Interface() != intf || conf.Group() != constant.AnyValue && conf.Group() != provider.Group() ||
+					conf.Version() != constant.AnyValue && conf.Version() != provider.Version() {

Review Comment:
   Use `is_any_condition()` instead.



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"

Review Comment:
   ```suggestion
   // listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
   ```



##########
registry/zookeeper/listener.go:
##########
@@ -101,10 +102,25 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
 					ConfigType: event.Action,
 				},
 			)
-			return true
+			match = true
 		}
+
+		// AnyValue condition
+		intf, group, version := common.ParseServiceKey(serviceKey)
+		if intf != constant.AnyValue || group != constant.AnyValue && group != serviceURL.Group() ||
+			version != constant.AnyValue && version != serviceURL.Group() {
+			continue

Review Comment:
   I think here you should wrap them into a method, named `is_any_condition()`, and add some unit tests for it.



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+	var (
+		failTimes int
+		ttl       time.Duration
+	)
+	ttl = defaultTTL
+	if conf != nil {
+		timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+		if err == nil {
+			ttl = timeout
+		} else {
+			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+		}
+	}
+	if ttl > 20e9 {
+		ttl = 20e9
+	}
+
+	rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+	for {
+		// get all interfaces
+		children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+		if err != nil {
+			failTimes++
+			if MaxFailTimes <= failTimes {
+				failTimes = MaxFailTimes
+			}
+			logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+			// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+			after := time.After(timeSecondDuration(failTimes * ConnDelay))
+			select {
+			case <-after:
+				continue
+			case <-l.exit:
+				return
+			}
+		}
+		failTimes = 0
+		if len(children) == 0 {
+			logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
+		}
+		for _, c := range children {
+			// Build the children path
+			zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+			// Save the path to avoid listen repeatedly
+			l.pathMapLock.Lock()
+			_, ok := l.pathMap[zkRootPath]
+			if !ok {
+				l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+			}
+			l.pathMapLock.Unlock()

Review Comment:
   Remove this.



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+	var (
+		failTimes int
+		ttl       time.Duration
+	)
+	ttl = defaultTTL
+	if conf != nil {
+		timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+		if err == nil {
+			ttl = timeout
+		} else {
+			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+		}
+	}
+	if ttl > 20e9 {
+		ttl = 20e9
+	}
+
+	rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+	for {
+		// get all interfaces
+		children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+		if err != nil {
+			failTimes++
+			if MaxFailTimes <= failTimes {
+				failTimes = MaxFailTimes
+			}
+			logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+			// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+			after := time.After(timeSecondDuration(failTimes * ConnDelay))
+			select {
+			case <-after:
+				continue
+			case <-l.exit:
+				return
+			}
+		}
+		failTimes = 0
+		if len(children) == 0 {
+			logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
+		}
+		for _, c := range children {
+			// Build the children path
+			zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+			// Save the path to avoid listen repeatedly
+			l.pathMapLock.Lock()

Review Comment:
   ```suggestion
   			l.pathMapLock.Lock()
   			defer l.pathMapLock.Unlock()
   ```



##########
registry/zookeeper/listener.go:
##########
@@ -101,10 +102,25 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
 					ConfigType: event.Action,
 				},
 			)
-			return true
+			match = true
 		}
+
+		// AnyValue condition
+		intf, group, version := common.ParseServiceKey(serviceKey)
+		if intf != constant.AnyValue || group != constant.AnyValue && group != serviceURL.Group() ||
+			version != constant.AnyValue && version != serviceURL.Group() {
+			continue
+		}
+		listener.Process(
+			&config_center.ConfigChangeEvent{
+				Key:        event.Path,
+				Value:      serviceURL.Clone(),
+				ConfigType: event.Action,
+			},
+		)
+		match = true
 	}
-	return false
+	return match

Review Comment:
   The `listener.Process()` is called twice here. However, it could be reduced to call once.
   
   ```
   if (match service key) || (match any condition) {
       listener.Process()
       match = true
   }
   ```



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+	var (
+		failTimes int
+		ttl       time.Duration
+	)
+	ttl = defaultTTL
+	if conf != nil {
+		timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+		if err == nil {
+			ttl = timeout
+		} else {
+			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+		}
+	}
+	if ttl > 20e9 {
+		ttl = 20e9
+	}
+
+	rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+	for {
+		// get all interfaces
+		children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+		if err != nil {
+			failTimes++
+			if MaxFailTimes <= failTimes {
+				failTimes = MaxFailTimes
+			}
+			logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+			// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+			after := time.After(timeSecondDuration(failTimes * ConnDelay))
+			select {
+			case <-after:
+				continue
+			case <-l.exit:
+				return
+			}
+		}
+		failTimes = 0
+		if len(children) == 0 {
+			logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
+		}
+		for _, c := range children {
+			// Build the children path
+			zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+			// Save the path to avoid listen repeatedly
+			l.pathMapLock.Lock()
+			_, ok := l.pathMap[zkRootPath]
+			if !ok {
+				l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+			}
+			l.pathMapLock.Unlock()
+			if ok {
+				logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
+				continue
+			}

Review Comment:
   ```go
   if _, ok := l.pathMap[zkRootPath]; ok {
       logger.Warnf
   } else {
       l.pathMap[zkRootPath] = uatomic.NewInt32(0)
   }



##########
common/url.go:
##########
@@ -413,6 +413,31 @@ func ServiceKey(intf string, group string, version string) string {
 	return buf.String()
 }
 
+// ParseServiceKey get interface, group and version from servicekey

Review Comment:
   ```suggestion
   // ParseServiceKey gets interface, group and version from service key.
   ```



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+	var (
+		failTimes int
+		ttl       time.Duration
+	)
+	ttl = defaultTTL
+	if conf != nil {
+		timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+		if err == nil {
+			ttl = timeout
+		} else {
+			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+		}
+	}
+	if ttl > 20e9 {
+		ttl = 20e9
+	}
+
+	rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+	for {
+		// get all interfaces
+		children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+		if err != nil {
+			failTimes++
+			if MaxFailTimes <= failTimes {
+				failTimes = MaxFailTimes
+			}
+			logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+			// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+			after := time.After(timeSecondDuration(failTimes * ConnDelay))
+			select {
+			case <-after:
+				continue
+			case <-l.exit:
+				return
+			}
+		}
+		failTimes = 0
+		if len(children) == 0 {
+			logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)
+		}
+		for _, c := range children {
+			// Build the children path

Review Comment:
   ```suggestion
   			// Build the child path
   ```



##########
remoting/zookeeper/listener.go:
##########
@@ -238,8 +239,91 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
 	}
 }
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvent listen all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvent(conf *common.URL, listener remoting.DataListener) {
+	var (
+		failTimes int
+		ttl       time.Duration
+	)
+	ttl = defaultTTL
+	if conf != nil {
+		timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
+		if err == nil {
+			ttl = timeout
+		} else {
+			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+		}
+	}
+	if ttl > 20e9 {
+		ttl = 20e9
+	}
+
+	rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+	for {
+		// get all interfaces
+		children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+		if err != nil {
+			failTimes++
+			if MaxFailTimes <= failTimes {
+				failTimes = MaxFailTimes
+			}
+			logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+			// Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+			after := time.After(timeSecondDuration(failTimes * ConnDelay))
+			select {
+			case <-after:
+				continue
+			case <-l.exit:
+				return
+			}
+		}
+		failTimes = 0
+		if len(children) == 0 {
+			logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", rootPath)

Review Comment:
   ```suggestion
   			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any child from the path \"%s\", please check if the provider does ready.", rootPath)
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org