You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/12/04 04:22:27 UTC

[dubbo-go] branch 3.0 updated: avoid the problem of cpu idling & fix wrong definition of error (#1629)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new ec43e22  avoid the problem of cpu idling & fix wrong definition of error (#1629)
ec43e22 is described below

commit ec43e227cf1d0112eb13ca7a5702b1e184f818b1
Author: Mulavar <97...@qq.com>
AuthorDate: Sat Dec 4 12:20:13 2021 +0800

    avoid the problem of cpu idling & fix wrong definition of error (#1629)
    
    * avoid the problem of cpu idling & fix wrong definition of error
    
    * replace literal "/providers/" with constant
    
    * add l.exit flag
    
    Co-authored-by: dongjianhui03 <do...@meituan.com>
---
 cluster/cluster/base/cluster_invoker.go      |  2 +-
 cluster/cluster/failback/cluster_invoker.go  |  2 +-
 cluster/cluster/failsafe/cluster_invoker.go  |  2 +-
 cluster/directory/base/directory_test.go     |  2 +-
 common/constant/default.go                   | 12 ++----------
 config_center/configurator/override.go       |  6 +++---
 config_center/parser/configuration_parser.go |  4 ++--
 registry/zookeeper/listener.go               | 17 ++++++++++-------
 remoting/zookeeper/client.go                 |  6 ------
 remoting/zookeeper/listener.go               | 16 +++++++---------
 10 files changed, 28 insertions(+), 41 deletions(-)

diff --git a/cluster/cluster/base/cluster_invoker.go b/cluster/cluster/base/cluster_invoker.go
index cb7de4f..43318db 100644
--- a/cluster/cluster/base/cluster_invoker.go
+++ b/cluster/cluster/base/cluster_invoker.go
@@ -174,7 +174,7 @@ func GetLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) lo
 
 	methodName := invocation.MethodName()
 	// Get the service loadbalance config
-	lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
+	lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
 
 	// Get the service method loadbalance config if have
 	if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); len(v) > 0 {
diff --git a/cluster/cluster/failback/cluster_invoker.go b/cluster/cluster/failback/cluster_invoker.go
index 6d55ae4..8987494 100644
--- a/cluster/cluster/failback/cluster_invoker.go
+++ b/cluster/cluster/failback/cluster_invoker.go
@@ -141,7 +141,7 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I
 
 	// Get the service loadbalance config
 	url := invokers[0].GetURL()
-	lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
+	lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
 	// Get the service method loadbalance config if have
 	methodName := invocation.MethodName()
 	if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
diff --git a/cluster/cluster/failsafe/cluster_invoker.go b/cluster/cluster/failsafe/cluster_invoker.go
index c129e43..016616f 100644
--- a/cluster/cluster/failsafe/cluster_invoker.go
+++ b/cluster/cluster/failsafe/cluster_invoker.go
@@ -58,7 +58,7 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I
 	url := invokers[0].GetURL()
 	methodName := invocation.MethodName()
 	// Get the service loadbalance config
-	lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
+	lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
 	// Get the service method loadbalance config if have
 	if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
 		lb = v
diff --git a/cluster/directory/base/directory_test.go b/cluster/directory/base/directory_test.go
index b4feb44..55f1d2c 100644
--- a/cluster/directory/base/directory_test.go
+++ b/cluster/directory/base/directory_test.go
@@ -36,7 +36,7 @@ import (
 var (
 	url, _ = common.NewURL(
 		fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LocalHostValue, constant.DefaultPort))
-	anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.AnyhostValue))
+	anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.AnyHostValue))
 )
 
 func TestNewBaseDirectory(t *testing.T) {
diff --git a/common/constant/default.go b/common/constant/default.go
index 58784e7..b0c3874 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -32,7 +32,7 @@ const (
 )
 
 const (
-	DefaultLoadbalance      = "random"
+	DefaultLoadBalance      = "random"
 	DefaultRetries          = "2"
 	DefaultRetriesInt       = 2
 	DefaultProtocol         = "dubbo"
@@ -45,7 +45,6 @@ const (
 	DefaultRestClient       = "resty"
 	DefaultRestServer       = "go-restful"
 	DefaultPort             = 20000
-	DefaultMetadataport     = 20005
 )
 
 const (
@@ -60,7 +59,7 @@ const (
 
 const (
 	AnyValue          = "*"
-	AnyhostValue      = "0.0.0.0"
+	AnyHostValue      = "0.0.0.0"
 	LocalHostValue    = "192.168.1.1"
 	RemoveValuePrefix = "-"
 )
@@ -87,10 +86,3 @@ const (
 const (
 	ServiceDiscoveryDefaultGroup = "DEFAULT_GROUP"
 )
-
-const (
-	DefaultProviderConfFilePath = "../profiles/dev/server.yml"
-	DefaultConsumerConfFilePath = "../profiles/dev/client.yml"
-	DefaultLogConfFilePath      = "../profiles/dev/log.yml"
-	DefaultRouterConfFilePath   = "../profiles/dev/router.yml"
-)
diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go
index 8c73b66..b393225 100644
--- a/config_center/configurator/override.go
+++ b/config_center/configurator/override.go
@@ -107,9 +107,9 @@ func (c *overrideConfigurator) configureIfMatchInternal(url *common.URL) {
 
 // configureIfMatch translate from java, compatible rules in java
 func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) {
-	if constant.AnyhostValue == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
+	if constant.AnyHostValue == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
 		providers := c.configuratorUrl.GetParam(constant.OverrideProvidersKey, "")
-		if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.AnyhostValue) {
+		if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.AnyHostValue) {
 			c.configureIfMatchInternal(url)
 		}
 	}
@@ -129,7 +129,7 @@ func (c *overrideConfigurator) configureDeprecated(url *common.URL) {
 			localIP := common.GetLocalIp()
 			c.configureIfMatch(localIP, url)
 		} else {
-			c.configureIfMatch(constant.AnyhostValue, url)
+			c.configureIfMatch(constant.AnyHostValue, url)
 		}
 	}
 }
diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go
index 1f3f317..6097f95 100644
--- a/config_center/parser/configuration_parser.go
+++ b/config_center/parser/configuration_parser.go
@@ -116,7 +116,7 @@ func (parser *DefaultConfigurationParser) ParseToUrls(content string) ([]*common
 func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) {
 	addresses := item.Addresses
 	if len(addresses) == 0 {
-		addresses = append(addresses, constant.AnyhostValue)
+		addresses = append(addresses, constant.AnyHostValue)
 	}
 	var urls []*common.URL
 	for _, v := range addresses {
@@ -163,7 +163,7 @@ func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.UR
 func appItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) {
 	addresses := item.Addresses
 	if len(addresses) == 0 {
-		addresses = append(addresses, constant.AnyhostValue)
+		addresses = append(addresses, constant.AnyHostValue)
 	}
 	var urls []*common.URL
 	for _, v := range addresses {
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 5de11d6..51a4346 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -31,6 +31,7 @@ import (
 
 import (
 	"dubbo.apache.org/dubbo-go/v3/common"
+	"dubbo.apache.org/dubbo-go/v3/common/constant"
 	"dubbo.apache.org/dubbo-go/v3/common/logger"
 	"dubbo.apache.org/dubbo-go/v3/config_center"
 	"dubbo.apache.org/dubbo-go/v3/registry"
@@ -71,17 +72,19 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
 }
 
 // DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
-func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
+func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
+	providersPath := constant.PathSeparator + constant.ProviderCategory + constant.PathSeparator
 	// Intercept the last bit
-	index := strings.Index(eventType.Path, "/providers/")
+	index := strings.Index(event.Path, providersPath)
 	if index == -1 {
-		logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
+		logger.Warnf("[RegistryDataListener][DataChange]Listen error zk node path {%s}, "+
+			"this listener is used to listen services which under the directory of providers/", event.Path)
 		return false
 	}
-	url := eventType.Path[index+len("/providers/"):]
+	url := event.Path[index+len(providersPath):]
 	serviceURL, err := common.NewURL(url)
 	if err != nil {
-		logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
+		logger.Errorf("[RegistryDataListener][DataChange]Listen NewURL({%s}) = error{%+v} event.Path={%s}", url, err, event.Path)
 		return false
 	}
 	l.mutex.Lock()
@@ -93,9 +96,9 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
 		if serviceURL.ServiceKey() == serviceKey {
 			listener.Process(
 				&config_center.ConfigChangeEvent{
-					Key:        eventType.Path,
+					Key:        event.Path,
 					Value:      serviceURL,
-					ConfigType: eventType.Action,
+					ConfigType: event.Action,
 				},
 			)
 			return true
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 7d7437c..5c955da 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -39,12 +39,6 @@ const (
 	MaxFailTimes = 3
 )
 
-var (
-	errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
-	errNilChildren     = perrors.Errorf("has none children")
-	errNilNode         = perrors.Errorf("node does not exist")
-)
-
 // ValidateZookeeperClient validates client and sets options
 func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
 	lock := container.ZkClientLock()
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index f438792..a7f7be8 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -152,16 +152,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 
 	newChildren, err := l.client.GetChildren(zkPath)
 	if err != nil {
-		// FIXME always false
-		if err == errNilChildren {
+		// TODO need to ignore this error in gost
+		if err == gxzookeeper.ErrNilChildren {
 			content, _, connErr := l.client.Conn.Get(zkPath)
 			if connErr != nil {
 				logger.Errorf("Get new node path {%v} 's content error,message is  {%v}",
 					zkPath, perrors.WithStack(connErr))
 			} else {
+				// TODO this if for config center listener, and will be removed when we refactor config center listener
 				listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
 			}
-
 		} else {
 			logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
 		}
@@ -219,9 +219,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
 	var (
 		failTimes int
 		ttl       time.Duration
-		event     chan struct{}
 	)
-	event = make(chan struct{}, 4)
 	ttl = defaultTTL
 	if conf != nil {
 		timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
@@ -231,7 +229,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
 			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
 		}
 	}
-	defer close(event)
 	for {
 		// Get current children with watcher for the zkRootPath
 		children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
@@ -247,12 +244,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
 			select {
 			case <-after:
 				continue
+			case <-l.exit:
+				return
 			}
 		}
 		failTimes = 0
 		if len(children) == 0 {
-			logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
-			continue
+			logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
 		}
 		for _, c := range children {
 			// Only need to compare Path when subscribing to provider
@@ -324,10 +322,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
 	}
 }
 
+// startScheduleWatchTask periodically update provider information, return true when receive exit signal
 func (l *ZkEventListener) startScheduleWatchTask(
 	zkRootPath string, children []string, ttl time.Duration,
 	listener remoting.DataListener, childEventCh <-chan zk.Event) bool {
-	// Periodically update provider information
 	tickerTTL := ttl
 	if tickerTTL > 20e9 {
 		tickerTTL = 20e9