You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/12/04 04:22:27 UTC
[dubbo-go] branch 3.0 updated: avoid the problem of cpu idling & fix wrong definition of error (#1629)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new ec43e22 avoid the problem of cpu idling & fix wrong definition of error (#1629)
ec43e22 is described below
commit ec43e227cf1d0112eb13ca7a5702b1e184f818b1
Author: Mulavar <97...@qq.com>
AuthorDate: Sat Dec 4 12:20:13 2021 +0800
avoid the problem of cpu idling & fix wrong definition of error (#1629)
* avoid the problem of cpu idling & fix wrong definition of error
* replace literal "/providers/" with constant
* add l.exit flag
Co-authored-by: dongjianhui03 <do...@meituan.com>
---
cluster/cluster/base/cluster_invoker.go | 2 +-
cluster/cluster/failback/cluster_invoker.go | 2 +-
cluster/cluster/failsafe/cluster_invoker.go | 2 +-
cluster/directory/base/directory_test.go | 2 +-
common/constant/default.go | 12 ++----------
config_center/configurator/override.go | 6 +++---
config_center/parser/configuration_parser.go | 4 ++--
registry/zookeeper/listener.go | 17 ++++++++++-------
remoting/zookeeper/client.go | 6 ------
remoting/zookeeper/listener.go | 16 +++++++---------
10 files changed, 28 insertions(+), 41 deletions(-)
diff --git a/cluster/cluster/base/cluster_invoker.go b/cluster/cluster/base/cluster_invoker.go
index cb7de4f..43318db 100644
--- a/cluster/cluster/base/cluster_invoker.go
+++ b/cluster/cluster/base/cluster_invoker.go
@@ -174,7 +174,7 @@ func GetLoadBalance(invoker protocol.Invoker, invocation protocol.Invocation) lo
methodName := invocation.MethodName()
// Get the service loadbalance config
- lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
+ lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); len(v) > 0 {
diff --git a/cluster/cluster/failback/cluster_invoker.go b/cluster/cluster/failback/cluster_invoker.go
index 6d55ae4..8987494 100644
--- a/cluster/cluster/failback/cluster_invoker.go
+++ b/cluster/cluster/failback/cluster_invoker.go
@@ -141,7 +141,7 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I
// Get the service loadbalance config
url := invokers[0].GetURL()
- lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
+ lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
// Get the service method loadbalance config if have
methodName := invocation.MethodName()
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
diff --git a/cluster/cluster/failsafe/cluster_invoker.go b/cluster/cluster/failsafe/cluster_invoker.go
index c129e43..016616f 100644
--- a/cluster/cluster/failsafe/cluster_invoker.go
+++ b/cluster/cluster/failsafe/cluster_invoker.go
@@ -58,7 +58,7 @@ func (invoker *clusterInvoker) Invoke(ctx context.Context, invocation protocol.I
url := invokers[0].GetURL()
methodName := invocation.MethodName()
// Get the service loadbalance config
- lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadbalance)
+ lb := url.GetParam(constant.LoadbalanceKey, constant.DefaultLoadBalance)
// Get the service method loadbalance config if have
if v := url.GetMethodParam(methodName, constant.LoadbalanceKey, ""); v != "" {
lb = v
diff --git a/cluster/directory/base/directory_test.go b/cluster/directory/base/directory_test.go
index b4feb44..55f1d2c 100644
--- a/cluster/directory/base/directory_test.go
+++ b/cluster/directory/base/directory_test.go
@@ -36,7 +36,7 @@ import (
var (
url, _ = common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LocalHostValue, constant.DefaultPort))
- anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.AnyhostValue))
+ anyURL, _ = common.NewURL(fmt.Sprintf("condition://%s/com.foo.BarService", constant.AnyHostValue))
)
func TestNewBaseDirectory(t *testing.T) {
diff --git a/common/constant/default.go b/common/constant/default.go
index 58784e7..b0c3874 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -32,7 +32,7 @@ const (
)
const (
- DefaultLoadbalance = "random"
+ DefaultLoadBalance = "random"
DefaultRetries = "2"
DefaultRetriesInt = 2
DefaultProtocol = "dubbo"
@@ -45,7 +45,6 @@ const (
DefaultRestClient = "resty"
DefaultRestServer = "go-restful"
DefaultPort = 20000
- DefaultMetadataport = 20005
)
const (
@@ -60,7 +59,7 @@ const (
const (
AnyValue = "*"
- AnyhostValue = "0.0.0.0"
+ AnyHostValue = "0.0.0.0"
LocalHostValue = "192.168.1.1"
RemoveValuePrefix = "-"
)
@@ -87,10 +86,3 @@ const (
const (
ServiceDiscoveryDefaultGroup = "DEFAULT_GROUP"
)
-
-const (
- DefaultProviderConfFilePath = "../profiles/dev/server.yml"
- DefaultConsumerConfFilePath = "../profiles/dev/client.yml"
- DefaultLogConfFilePath = "../profiles/dev/log.yml"
- DefaultRouterConfFilePath = "../profiles/dev/router.yml"
-)
diff --git a/config_center/configurator/override.go b/config_center/configurator/override.go
index 8c73b66..b393225 100644
--- a/config_center/configurator/override.go
+++ b/config_center/configurator/override.go
@@ -107,9 +107,9 @@ func (c *overrideConfigurator) configureIfMatchInternal(url *common.URL) {
// configureIfMatch translate from java, compatible rules in java
func (c *overrideConfigurator) configureIfMatch(host string, url *common.URL) {
- if constant.AnyhostValue == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
+ if constant.AnyHostValue == c.configuratorUrl.Ip || host == c.configuratorUrl.Ip {
providers := c.configuratorUrl.GetParam(constant.OverrideProvidersKey, "")
- if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.AnyhostValue) {
+ if len(providers) == 0 || strings.Contains(providers, url.Location) || strings.Contains(providers, constant.AnyHostValue) {
c.configureIfMatchInternal(url)
}
}
@@ -129,7 +129,7 @@ func (c *overrideConfigurator) configureDeprecated(url *common.URL) {
localIP := common.GetLocalIp()
c.configureIfMatch(localIP, url)
} else {
- c.configureIfMatch(constant.AnyhostValue, url)
+ c.configureIfMatch(constant.AnyHostValue, url)
}
}
}
diff --git a/config_center/parser/configuration_parser.go b/config_center/parser/configuration_parser.go
index 1f3f317..6097f95 100644
--- a/config_center/parser/configuration_parser.go
+++ b/config_center/parser/configuration_parser.go
@@ -116,7 +116,7 @@ func (parser *DefaultConfigurationParser) ParseToUrls(content string) ([]*common
func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) {
addresses := item.Addresses
if len(addresses) == 0 {
- addresses = append(addresses, constant.AnyhostValue)
+ addresses = append(addresses, constant.AnyHostValue)
}
var urls []*common.URL
for _, v := range addresses {
@@ -163,7 +163,7 @@ func serviceItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.UR
func appItemToUrls(item ConfigItem, config ConfiguratorConfig) ([]*common.URL, error) {
addresses := item.Addresses
if len(addresses) == 0 {
- addresses = append(addresses, constant.AnyhostValue)
+ addresses = append(addresses, constant.AnyHostValue)
}
var urls []*common.URL
for _, v := range addresses {
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 5de11d6..51a4346 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -31,6 +31,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common"
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config_center"
"dubbo.apache.org/dubbo-go/v3/registry"
@@ -71,17 +72,19 @@ func (l *RegistryDataListener) UnSubscribeURL(url *common.URL) config_center.Con
}
// DataChange accepts all events sent from the zookeeper server and trigger the corresponding listener for processing
-func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
+func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
+ providersPath := constant.PathSeparator + constant.ProviderCategory + constant.PathSeparator
// Intercept the last bit
- index := strings.Index(eventType.Path, "/providers/")
+ index := strings.Index(event.Path, providersPath)
if index == -1 {
- logger.Warnf("Listen with no url, event.path={%v}", eventType.Path)
+ logger.Warnf("[RegistryDataListener][DataChange]Listen error zk node path {%s}, "+
+ "this listener is used to listen services which under the directory of providers/", event.Path)
return false
}
- url := eventType.Path[index+len("/providers/"):]
+ url := event.Path[index+len(providersPath):]
serviceURL, err := common.NewURL(url)
if err != nil {
- logger.Errorf("Listen NewURL(r{%s}) = error{%v} eventType.Path={%v}", url, err, eventType.Path)
+ logger.Errorf("[RegistryDataListener][DataChange]Listen NewURL({%s}) = error{%+v} event.Path={%s}", url, err, event.Path)
return false
}
l.mutex.Lock()
@@ -93,9 +96,9 @@ func (l *RegistryDataListener) DataChange(eventType remoting.Event) bool {
if serviceURL.ServiceKey() == serviceKey {
listener.Process(
&config_center.ConfigChangeEvent{
- Key: eventType.Path,
+ Key: event.Path,
Value: serviceURL,
- ConfigType: eventType.Action,
+ ConfigType: event.Action,
},
)
return true
diff --git a/remoting/zookeeper/client.go b/remoting/zookeeper/client.go
index 7d7437c..5c955da 100644
--- a/remoting/zookeeper/client.go
+++ b/remoting/zookeeper/client.go
@@ -39,12 +39,6 @@ const (
MaxFailTimes = 3
)
-var (
- errNilZkClientConn = perrors.New("zookeeper client{conn} is nil")
- errNilChildren = perrors.Errorf("has none children")
- errNilNode = perrors.Errorf("node does not exist")
-)
-
// ValidateZookeeperClient validates client and sets options
func ValidateZookeeperClient(container ZkClientFacade, zkName string) error {
lock := container.ZkClientLock()
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index f438792..a7f7be8 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -152,16 +152,16 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
newChildren, err := l.client.GetChildren(zkPath)
if err != nil {
- // FIXME always false
- if err == errNilChildren {
+ // TODO need to ignore this error in gost
+ if err == gxzookeeper.ErrNilChildren {
content, _, connErr := l.client.Conn.Get(zkPath)
if connErr != nil {
logger.Errorf("Get new node path {%v} 's content error,message is {%v}",
zkPath, perrors.WithStack(connErr))
} else {
+ // TODO this if for config center listener, and will be removed when we refactor config center listener
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeUpdate, Content: string(content)})
}
-
} else {
logger.Errorf("path{%s} child nodes changed, zk.Children() = error{%v}", zkPath, perrors.WithStack(err))
}
@@ -219,9 +219,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
var (
failTimes int
ttl time.Duration
- event chan struct{}
)
- event = make(chan struct{}, 4)
ttl = defaultTTL
if conf != nil {
timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL))
@@ -231,7 +229,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
}
}
- defer close(event)
for {
// Get current children with watcher for the zkRootPath
children, childEventCh, err := l.client.GetChildrenW(zkRootPath)
@@ -247,12 +244,13 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
select {
case <-after:
continue
+ case <-l.exit:
+ return
}
}
failTimes = 0
if len(children) == 0 {
- logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
- continue
+ logger.Debugf("[Zookeeper EventListener][listenDirEvent] Can not gey any children for the path {%s}, please check if the provider does ready.", zkRootPath)
}
for _, c := range children {
// Only need to compare Path when subscribing to provider
@@ -324,10 +322,10 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
}
+// startScheduleWatchTask periodically update provider information, return true when receive exit signal
func (l *ZkEventListener) startScheduleWatchTask(
zkRootPath string, children []string, ttl time.Duration,
listener remoting.DataListener, childEventCh <-chan zk.Event) bool {
- // Periodically update provider information
tickerTTL := ttl
if tickerTTL > 20e9 {
tickerTTL = 20e9