You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by zt...@apache.org on 2022/02/26 02:09:55 UTC
[dubbo-go-pixiu] branch develop-0.5.0 updated: Fix: spring cloud error and refactor event callback (#367)
This is an automated email from the ASF dual-hosted git repository.
ztelur pushed a commit to branch develop-0.5.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git
The following commit(s) were added to refs/heads/develop-0.5.0 by this push:
new dcb7ae1 Fix: spring cloud error and refactor event callback (#367)
dcb7ae1 is described below
commit dcb7ae1834e0bc6961123fb20f645bcb0741de75
Author: phil <ph...@yeah.net>
AuthorDate: Sat Feb 26 10:07:04 2022 +0800
Fix: spring cloud error and refactor event callback (#367)
* rm all output file when make clean
* 1. fix error log and event callback
2. add zookeeper root path config
* fmt...
* close the service instance listener when it not working and after retry
* fix the CI
* clean
Co-authored-by: Randy <zt...@gmail.com>
Co-authored-by: Xin.Zh <dr...@foxmail.com>
---
igt/Makefile | 2 +-
pkg/adapter/springcloud/cloud.go | 2 +
.../zookeeper/application_listener.go | 64 ++++++++++++++--------
.../servicediscovery/zookeeper/service_listener.go | 62 +++++++++++----------
.../servicediscovery/zookeeper/zk_discovery.go | 44 ++++++++-------
pkg/model/remote.go | 1 +
samples/springcloud/zookeeper/pixiu/conf.yaml | 1 +
7 files changed, 105 insertions(+), 71 deletions(-)
diff --git a/igt/Makefile b/igt/Makefile
index 2a8dfa3..72e9465 100644
--- a/igt/Makefile
+++ b/igt/Makefile
@@ -119,7 +119,7 @@ docker-down:
.PHONY: clean
clean: stop
$(info > Cleanning up $(OUT_DIR))
-# @-rm -rf $(OUT_DIR)
+ @-rm -rf $(OUT_DIR)
@-cat $(PID) | awk '{print $1}' | xargs kill -9
@-cat $(PIXIU_PID) | awk '{print $1}' | xargs kill -9
diff --git a/pkg/adapter/springcloud/cloud.go b/pkg/adapter/springcloud/cloud.go
index 52860b3..f88208e 100644
--- a/pkg/adapter/springcloud/cloud.go
+++ b/pkg/adapter/springcloud/cloud.go
@@ -73,6 +73,8 @@ type (
Registry *model.RemoteConfig `yaml:"registry" json:"registry" default:"registry"`
FreshInterval time.Duration `yaml:"freshInterval" json:"freshInterval" default:"freshInterval"`
Services []string `yaml:"services" json:"services" default:"services"`
+ // todo configuration the discovery config, like `zookeeper.discovery.root = "/services"`
+ //Discovery *model.DiscoveryConfig `yaml:"discovery" json:"discovery" default:"discovery"`
}
Service struct {
diff --git a/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go b/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
index 4d89d0b..e3f4949 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
@@ -29,6 +29,7 @@ import (
)
import (
+ "github.com/apache/dubbo-go-pixiu/pkg/adapter/springcloud/common"
"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
"github.com/apache/dubbo-go-pixiu/pkg/logger"
"github.com/apache/dubbo-go-pixiu/pkg/remote/zookeeper"
@@ -78,16 +79,13 @@ func (z *zkAppListener) watch() {
children, e, err := z.ds.getClient().GetChildrenW(z.servicesPath)
if err != nil {
failTimes++
- logger.Infof("watching (path{%s}) = error{%v}", z.servicesPath, err)
- if err == gzk.ErrNilNode {
- logger.Errorf("watching (path{%s}) got errNilNode,so exit listen", z.servicesPath)
- return
- }
- if failTimes > MaxFailTimes {
- logger.Errorf("Error happens on (path{%s}) exceed max fail times: %s,so exit listen",
- z.servicesPath, MaxFailTimes)
- return
+ logger.Debugf("watching (path{%s}) = error{%v}", z.servicesPath, err)
+
+ if failTimes >= MaxFailTimes {
+ logger.Debugf("Error happens on (path{%s}) exceed max fail times: %s,so exit listen", z.servicesPath, MaxFailTimes)
+ failTimes = MaxFailTimes
}
+
delayTimer.Reset(ConnDelay * time.Duration(failTimes))
<-delayTimer.C
continue
@@ -111,9 +109,6 @@ func (z *zkAppListener) watchEventHandle(children []string, e <-chan zk.Event) b
case zkEvent := <-e:
logger.Debugf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gzk.StateToString(zkEvent.State), zkEvent.Err)
- if zkEvent.Type != zk.EventNodeChildrenChanged {
- return true
- }
z.handleEvent(children)
return true
case <-z.exit:
@@ -124,29 +119,43 @@ func (z *zkAppListener) watchEventHandle(children []string, e <-chan zk.Event) b
}
func (z *zkAppListener) handleEvent(children []string) {
+
fetchChildren, err := z.ds.getClient().GetChildren(z.servicesPath)
+
if err != nil {
- logger.Warnf("Error when retrieving newChildren in path: %s, Error:%s", z.servicesPath, err.Error())
+ // todo refactor gost zk, make it return the definite err
+ if strings.Contains(err.Error(), "none children") {
+ logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
+ } else {
+ logger.Warnf("Error when retrieving newChildren in path: %s, Error:%s", z.servicesPath, err.Error())
+ }
}
discovery := z.ds
+ serviceMap := discovery.getServiceMap()
+
+ // del services
+ for sn := range serviceMap {
- del := func() {
- keys := Keys(discovery.getServiceMap())
- diff := Diff(keys, fetchChildren)
- if diff != nil {
- logger.Debugf("Del the service %s", diff)
- for _, sn := range diff {
- for _, instance := range discovery.getServiceMap()[sn] {
- discovery.delServiceInstance(instance)
- }
+ if !contains(fetchChildren, sn) {
+
+ // service zk event listener
+ serviceNodePath := strings.Join([]string{z.servicesPath, sn}, constant.PathSlash)
+ z.svcListeners.RemoveListener(serviceNodePath)
+
+ // service cluster
+ for _, instance := range serviceMap[sn] {
+ _, _ = discovery.delServiceInstance(instance)
}
}
}
- del()
for _, serviceName := range fetchChildren {
serviceNodePath := strings.Join([]string{z.servicesPath, serviceName}, constant.PathSlash)
+ instances := serviceMap[serviceName]
+ if len(instances) == 0 {
+ z.svcListeners.RemoveListener(serviceNodePath)
+ }
if z.svcListeners.GetListener(serviceNodePath) != nil {
continue
}
@@ -189,3 +198,12 @@ func (s *SvcListeners) GetAllListener() map[string]zookeeper.Listener {
defer s.listenerLock.Unlock()
return s.listeners
}
+
+func contains(elems []string, v string) bool {
+ for _, s := range elems {
+ if v == s {
+ return true
+ }
+ }
+ return false
+}
diff --git a/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go b/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
index 086f3be..e7ebc79 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
@@ -18,6 +18,7 @@
package zookeeper
import (
+ "strings"
"sync"
"time"
)
@@ -62,16 +63,9 @@ func (asl *applicationServiceListener) WatchAndHandle() {
children, e, err := asl.ds.getClient().GetChildrenW(asl.servicePath)
if err != nil {
failTimes++
- logger.Infof("watching (path{%s}) = error{%v}", asl.servicePath, err)
- if err == gzk.ErrNilChildren {
- return
- }
- if err == gzk.ErrNilNode {
- logger.Errorf("watching (path{%s}) got errNilNode,so exit listen", asl.servicePath)
- return
- }
- if failTimes > MaxFailTimes {
- logger.Errorf("Error happens on (path{%s}) exceed max fail times: %v,so exit listen", asl.servicePath, MaxFailTimes)
+ logger.Debugf("watching (path{%s}) = error{%v}", asl.servicePath, err)
+ if failTimes >= MaxFailTimes {
+ logger.Warnf("Error happens on (path{%s}) exceed max fail times: %v,so exit listen", asl.servicePath, MaxFailTimes)
return
}
delayTimer.Reset(ConnDelay * time.Duration(failTimes))
@@ -82,7 +76,6 @@ func (asl *applicationServiceListener) WatchAndHandle() {
if continueLoop := asl.watchEventHandle(children, e); !continueLoop {
return
}
-
}
}
@@ -96,11 +89,8 @@ func (asl *applicationServiceListener) watchEventHandle(children []string, e <-c
case <-ticker.C:
asl.handleEvent(children)
case zkEvent := <-e:
- logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
+ logger.Debugf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gzk.StateToString(zkEvent.State), zkEvent.Err)
- if zkEvent.Type != zk.EventNodeChildrenChanged {
- return true
- }
asl.handleEvent(children)
return true
case <-asl.exit:
@@ -114,28 +104,44 @@ func (asl *applicationServiceListener) handleEvent(children []string) {
fetchChildren, err := asl.ds.getClient().GetChildren(asl.servicePath)
if err != nil {
- logger.Warnf("%s Error when retrieving service node [%s] in path: %s, Error:%s", common.ZKLogDiscovery, asl.serviceName, asl.servicePath, err.Error())
- return
+ // todo refactor gost zk, make it return the definite err
+ if strings.Contains(err.Error(), "none children") {
+ logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
+ } else {
+ logger.Warnf("%s Error when retrieving service node [%s] in path: %s, Error:%s", common.ZKLogDiscovery, asl.serviceName, asl.servicePath, err.Error())
+ }
}
discovery := asl.ds
+ serviceMap := discovery.getServiceMap()
instanceMap := discovery.instanceMap
- addf := func() {
- if addInstanceIds := Diff(fetchChildren, children); addInstanceIds != nil {
- for _, id := range addInstanceIds {
- discovery.addServiceInstance(instanceMap[id])
+
+ func() {
+ for _, id := range fetchChildren {
+ serviceInstance, err := discovery.queryForInstance(asl.serviceName, id)
+ if err != nil {
+ logger.Errorf("add service: %s, instance: %s has error: ", asl.serviceName, id, err.Error())
+ continue
+ }
+ if instanceMap[id] == nil {
+ _, _ = discovery.addServiceInstance(serviceInstance)
+ } else {
+ _, _ = discovery.updateServiceInstance(serviceInstance)
}
}
- }
- addf()
+ }()
- delf := func() {
- if delInstanceIds := Diff(children, fetchChildren); delInstanceIds != nil {
+ func() {
+ serviceInstances := serviceMap[asl.serviceName]
+ oldInsId := []string{}
+ for _, instance := range serviceInstances {
+ oldInsId = append(oldInsId, instance.ID)
+ }
+ if delInstanceIds := Diff(oldInsId, fetchChildren); delInstanceIds != nil {
for _, id := range delInstanceIds {
- discovery.delServiceInstance(instanceMap[id])
+ _, _ = discovery.delServiceInstance(instanceMap[id])
}
}
- }
- delf()
+ }()
}
// Close closes this listener
diff --git a/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go b/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go
index a2429c4..844fac0 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go
@@ -20,6 +20,7 @@ package zookeeper
import (
"encoding/json"
"path"
+ "strings"
"sync"
"time"
)
@@ -40,10 +41,10 @@ const (
ZKRootPath = "/services"
ZKName = "SpringCloud-Zookeeper"
StatUP = "UP"
- MaxFailTimes = 2
+ MaxFailTimes = 3
DefaultTimeout = "3s"
ConnDelay = 3 * time.Second
- defaultTTL = 30 * time.Second
+ defaultTTL = 3 * time.Second
)
type zookeeperDiscovery struct {
@@ -70,8 +71,13 @@ func NewZKServiceDiscovery(targetService []string, config *model.RemoteConfig, l
return nil, err
}
+ rootPath := ZKRootPath
+ if len(strings.TrimSpace(config.Root)) > 0 {
+ rootPath = strings.TrimSpace(config.Root)
+ }
+
z := &zookeeperDiscovery{
- basePath: ZKRootPath,
+ basePath: rootPath,
listener: listener,
targetService: targetService,
instanceMap: make(map[string]*servicediscovery.ServiceInstance),
@@ -93,6 +99,7 @@ func (sd *zookeeperDiscovery) QueryAllServices() ([]servicediscovery.ServiceInst
serviceNames, err := sd.queryForNames()
logger.Debugf("%s get all services by root path %s, services %v", common.ZKLogDiscovery, sd.basePath, serviceNames)
if err != nil {
+ logger.Errorf("get all services error: %v", err.Error())
return nil, err
}
return sd.QueryServicesByName(serviceNames)
@@ -103,10 +110,16 @@ func (sd *zookeeperDiscovery) QueryServicesByName(serviceNames []string) ([]serv
var instancesAll []servicediscovery.ServiceInstance
for _, s := range serviceNames {
- ids, err := sd.getClient().GetChildren(sd.pathForName(s))
+ pathForName := sd.pathForName(s)
+ ids, err := sd.getClient().GetChildren(pathForName)
logger.Debugf("%s get services %s, services instanceIds %s", common.ZKLogDiscovery, s, ids)
if err != nil {
- logger.Errorf("%s get services [%s] nodes from zookeeper fail: %s", common.ZKLogDiscovery, s, err.Error())
+ // todo refactor gost zk, make it return the definite err
+ if strings.Contains(err.Error(), "none children") {
+ logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
+ } else {
+ logger.Errorf("%s get services [%s] nodes from zookeeper fail: %s", common.ZKLogDiscovery, s, err.Error())
+ }
continue
}
@@ -251,12 +264,14 @@ func (sd *zookeeperDiscovery) addServiceInstance(instance *servicediscovery.Serv
return true, nil
}
-func (sd *zookeeperDiscovery) queryByServiceName() ([]string, error) {
- return sd.getClient().GetChildren(sd.basePath)
-}
-
func (sd *zookeeperDiscovery) queryForNames() ([]string, error) {
- return sd.getClient().GetChildren(sd.basePath)
+ children, err := sd.getClient().GetChildren(sd.basePath)
+ // todo refactor gost zk, make it return the definite err
+ if err != nil && strings.Contains(err.Error(), "none children") {
+ logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
+ return nil, nil
+ }
+ return children, err
}
func (sd *zookeeperDiscovery) pathForInstance(name, id string) string {
@@ -267,15 +282,6 @@ func (sd *zookeeperDiscovery) pathForName(name string) string {
return path.Join(sd.basePath, name)
}
-// ZkServiceInstance unused!
-type ZkServiceInstance struct {
- servicediscovery.ServiceInstance
-}
-
-func (i *ZkServiceInstance) GetUniqKey() string {
- return i.ID
-}
-
type SpringCloudZKInstance struct {
Name string `json:"name"`
ID string `json:"id"`
diff --git a/pkg/model/remote.go b/pkg/model/remote.go
index 7553fe9..c022c78 100644
--- a/pkg/model/remote.go
+++ b/pkg/model/remote.go
@@ -25,4 +25,5 @@ type RemoteConfig struct {
Username string `yaml:"username" json:"username"`
Password string `yaml:"password" json:"password"`
Group string `yaml:"group" json:"group"`
+ Root string `yaml:"root" json:"root" default:"/services"`
}
diff --git a/samples/springcloud/zookeeper/pixiu/conf.yaml b/samples/springcloud/zookeeper/pixiu/conf.yaml
index 26fad40..7797e92 100644
--- a/samples/springcloud/zookeeper/pixiu/conf.yaml
+++ b/samples/springcloud/zookeeper/pixiu/conf.yaml
@@ -62,3 +62,4 @@ static_resources:
protocol: zookeeper
address: 127.0.0.1:2181
timeout: 100000000s
+ root: "/services"