You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by zt...@apache.org on 2022/02/26 02:09:55 UTC

[dubbo-go-pixiu] branch develop-0.5.0 updated: Fix: spring cloud error and refactor event callback (#367)

This is an automated email from the ASF dual-hosted git repository.

ztelur pushed a commit to branch develop-0.5.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go-pixiu.git


The following commit(s) were added to refs/heads/develop-0.5.0 by this push:
     new dcb7ae1  Fix: spring cloud error and refactor event callback (#367)
dcb7ae1 is described below

commit dcb7ae1834e0bc6961123fb20f645bcb0741de75
Author: phil <ph...@yeah.net>
AuthorDate: Sat Feb 26 10:07:04 2022 +0800

    Fix: spring cloud error and refactor event callback (#367)
    
    * rm all output file when make clean
    
    * 1. fix error log and event callback
    
    2. add zookeeper root path config
    
    * fmt...
    
    * close the service instance listener when it not working and after retry
    
    * fix the CI
    
    * clean
    
    Co-authored-by: Randy <zt...@gmail.com>
    Co-authored-by: Xin.Zh <dr...@foxmail.com>
---
 igt/Makefile                                       |  2 +-
 pkg/adapter/springcloud/cloud.go                   |  2 +
 .../zookeeper/application_listener.go              | 64 ++++++++++++++--------
 .../servicediscovery/zookeeper/service_listener.go | 62 +++++++++++----------
 .../servicediscovery/zookeeper/zk_discovery.go     | 44 ++++++++-------
 pkg/model/remote.go                                |  1 +
 samples/springcloud/zookeeper/pixiu/conf.yaml      |  1 +
 7 files changed, 105 insertions(+), 71 deletions(-)

diff --git a/igt/Makefile b/igt/Makefile
index 2a8dfa3..72e9465 100644
--- a/igt/Makefile
+++ b/igt/Makefile
@@ -119,7 +119,7 @@ docker-down:
 .PHONY: clean
 clean: stop
 	$(info   >  Cleanning up $(OUT_DIR))
-# 	@-rm -rf $(OUT_DIR)
+	@-rm -rf $(OUT_DIR)
 	@-cat $(PID) | awk '{print $1}' | xargs kill -9
 	@-cat $(PIXIU_PID) | awk '{print $1}' | xargs kill -9
 
diff --git a/pkg/adapter/springcloud/cloud.go b/pkg/adapter/springcloud/cloud.go
index 52860b3..f88208e 100644
--- a/pkg/adapter/springcloud/cloud.go
+++ b/pkg/adapter/springcloud/cloud.go
@@ -73,6 +73,8 @@ type (
 		Registry      *model.RemoteConfig `yaml:"registry" json:"registry" default:"registry"`
 		FreshInterval time.Duration       `yaml:"freshInterval" json:"freshInterval" default:"freshInterval"`
 		Services      []string            `yaml:"services" json:"services" default:"services"`
+		// todo configuration the discovery config, like `zookeeper.discovery.root = "/services"`
+		//Discovery	  *model.DiscoveryConfig `yaml:"discovery" json:"discovery" default:"discovery"`
 	}
 
 	Service struct {
diff --git a/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go b/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
index 4d89d0b..e3f4949 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/application_listener.go
@@ -29,6 +29,7 @@ import (
 )
 
 import (
+	"github.com/apache/dubbo-go-pixiu/pkg/adapter/springcloud/common"
 	"github.com/apache/dubbo-go-pixiu/pkg/common/constant"
 	"github.com/apache/dubbo-go-pixiu/pkg/logger"
 	"github.com/apache/dubbo-go-pixiu/pkg/remote/zookeeper"
@@ -78,16 +79,13 @@ func (z *zkAppListener) watch() {
 		children, e, err := z.ds.getClient().GetChildrenW(z.servicesPath)
 		if err != nil {
 			failTimes++
-			logger.Infof("watching (path{%s}) = error{%v}", z.servicesPath, err)
-			if err == gzk.ErrNilNode {
-				logger.Errorf("watching (path{%s}) got errNilNode,so exit listen", z.servicesPath)
-				return
-			}
-			if failTimes > MaxFailTimes {
-				logger.Errorf("Error happens on (path{%s}) exceed max fail times: %s,so exit listen",
-					z.servicesPath, MaxFailTimes)
-				return
+			logger.Debugf("watching (path{%s}) = error{%v}", z.servicesPath, err)
+
+			if failTimes >= MaxFailTimes {
+				logger.Debugf("Error happens on (path{%s}) exceed max fail times: %s,so exit listen", z.servicesPath, MaxFailTimes)
+				failTimes = MaxFailTimes
 			}
+
 			delayTimer.Reset(ConnDelay * time.Duration(failTimes))
 			<-delayTimer.C
 			continue
@@ -111,9 +109,6 @@ func (z *zkAppListener) watchEventHandle(children []string, e <-chan zk.Event) b
 		case zkEvent := <-e:
 			logger.Debugf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
 				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gzk.StateToString(zkEvent.State), zkEvent.Err)
-			if zkEvent.Type != zk.EventNodeChildrenChanged {
-				return true
-			}
 			z.handleEvent(children)
 			return true
 		case <-z.exit:
@@ -124,29 +119,43 @@ func (z *zkAppListener) watchEventHandle(children []string, e <-chan zk.Event) b
 }
 
 func (z *zkAppListener) handleEvent(children []string) {
+
 	fetchChildren, err := z.ds.getClient().GetChildren(z.servicesPath)
+
 	if err != nil {
-		logger.Warnf("Error when retrieving newChildren in path: %s, Error:%s", z.servicesPath, err.Error())
+		// todo refactor gost zk, make it return the definite err
+		if strings.Contains(err.Error(), "none children") {
+			logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
+		} else {
+			logger.Warnf("Error when retrieving newChildren in path: %s, Error:%s", z.servicesPath, err.Error())
+		}
 	}
 
 	discovery := z.ds
+	serviceMap := discovery.getServiceMap()
+
+	// del services
+	for sn := range serviceMap {
 
-	del := func() {
-		keys := Keys(discovery.getServiceMap())
-		diff := Diff(keys, fetchChildren)
-		if diff != nil {
-			logger.Debugf("Del the service %s", diff)
-			for _, sn := range diff {
-				for _, instance := range discovery.getServiceMap()[sn] {
-					discovery.delServiceInstance(instance)
-				}
+		if !contains(fetchChildren, sn) {
+
+			// service zk event listener
+			serviceNodePath := strings.Join([]string{z.servicesPath, sn}, constant.PathSlash)
+			z.svcListeners.RemoveListener(serviceNodePath)
+
+			// service cluster
+			for _, instance := range serviceMap[sn] {
+				_, _ = discovery.delServiceInstance(instance)
 			}
 		}
 	}
-	del()
 
 	for _, serviceName := range fetchChildren {
 		serviceNodePath := strings.Join([]string{z.servicesPath, serviceName}, constant.PathSlash)
+		instances := serviceMap[serviceName]
+		if len(instances) == 0 {
+			z.svcListeners.RemoveListener(serviceNodePath)
+		}
 		if z.svcListeners.GetListener(serviceNodePath) != nil {
 			continue
 		}
@@ -189,3 +198,12 @@ func (s *SvcListeners) GetAllListener() map[string]zookeeper.Listener {
 	defer s.listenerLock.Unlock()
 	return s.listeners
 }
+
+func contains(elems []string, v string) bool {
+	for _, s := range elems {
+		if v == s {
+			return true
+		}
+	}
+	return false
+}
diff --git a/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go b/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
index 086f3be..e7ebc79 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/service_listener.go
@@ -18,6 +18,7 @@
 package zookeeper
 
 import (
+	"strings"
 	"sync"
 	"time"
 )
@@ -62,16 +63,9 @@ func (asl *applicationServiceListener) WatchAndHandle() {
 		children, e, err := asl.ds.getClient().GetChildrenW(asl.servicePath)
 		if err != nil {
 			failTimes++
-			logger.Infof("watching (path{%s}) = error{%v}", asl.servicePath, err)
-			if err == gzk.ErrNilChildren {
-				return
-			}
-			if err == gzk.ErrNilNode {
-				logger.Errorf("watching (path{%s}) got errNilNode,so exit listen", asl.servicePath)
-				return
-			}
-			if failTimes > MaxFailTimes {
-				logger.Errorf("Error happens on (path{%s}) exceed max fail times: %v,so exit listen", asl.servicePath, MaxFailTimes)
+			logger.Debugf("watching (path{%s}) = error{%v}", asl.servicePath, err)
+			if failTimes >= MaxFailTimes {
+				logger.Warnf("Error happens on (path{%s}) exceed max fail times: %v,so exit listen", asl.servicePath, MaxFailTimes)
 				return
 			}
 			delayTimer.Reset(ConnDelay * time.Duration(failTimes))
@@ -82,7 +76,6 @@ func (asl *applicationServiceListener) WatchAndHandle() {
 		if continueLoop := asl.watchEventHandle(children, e); !continueLoop {
 			return
 		}
-
 	}
 }
 
@@ -96,11 +89,8 @@ func (asl *applicationServiceListener) watchEventHandle(children []string, e <-c
 		case <-ticker.C:
 			asl.handleEvent(children)
 		case zkEvent := <-e:
-			logger.Warnf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
+			logger.Debugf("get a zookeeper e{type:%s, server:%s, path:%s, state:%d-%s, err:%s}",
 				zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gzk.StateToString(zkEvent.State), zkEvent.Err)
-			if zkEvent.Type != zk.EventNodeChildrenChanged {
-				return true
-			}
 			asl.handleEvent(children)
 			return true
 		case <-asl.exit:
@@ -114,28 +104,44 @@ func (asl *applicationServiceListener) handleEvent(children []string) {
 
 	fetchChildren, err := asl.ds.getClient().GetChildren(asl.servicePath)
 	if err != nil {
-		logger.Warnf("%s Error when retrieving service node [%s] in path: %s, Error:%s", common.ZKLogDiscovery, asl.serviceName, asl.servicePath, err.Error())
-		return
+		// todo refactor gost zk, make it return the definite err
+		if strings.Contains(err.Error(), "none children") {
+			logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
+		} else {
+			logger.Warnf("%s Error when retrieving service node [%s] in path: %s, Error:%s", common.ZKLogDiscovery, asl.serviceName, asl.servicePath, err.Error())
+		}
 	}
 	discovery := asl.ds
+	serviceMap := discovery.getServiceMap()
 	instanceMap := discovery.instanceMap
-	addf := func() {
-		if addInstanceIds := Diff(fetchChildren, children); addInstanceIds != nil {
-			for _, id := range addInstanceIds {
-				discovery.addServiceInstance(instanceMap[id])
+
+	func() {
+		for _, id := range fetchChildren {
+			serviceInstance, err := discovery.queryForInstance(asl.serviceName, id)
+			if err != nil {
+				logger.Errorf("add service: %s, instance: %s has error: ", asl.serviceName, id, err.Error())
+				continue
+			}
+			if instanceMap[id] == nil {
+				_, _ = discovery.addServiceInstance(serviceInstance)
+			} else {
+				_, _ = discovery.updateServiceInstance(serviceInstance)
 			}
 		}
-	}
-	addf()
+	}()
 
-	delf := func() {
-		if delInstanceIds := Diff(children, fetchChildren); delInstanceIds != nil {
+	func() {
+		serviceInstances := serviceMap[asl.serviceName]
+		oldInsId := []string{}
+		for _, instance := range serviceInstances {
+			oldInsId = append(oldInsId, instance.ID)
+		}
+		if delInstanceIds := Diff(oldInsId, fetchChildren); delInstanceIds != nil {
 			for _, id := range delInstanceIds {
-				discovery.delServiceInstance(instanceMap[id])
+				_, _ = discovery.delServiceInstance(instanceMap[id])
 			}
 		}
-	}
-	delf()
+	}()
 }
 
 // Close closes this listener
diff --git a/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go b/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go
index a2429c4..844fac0 100644
--- a/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go
+++ b/pkg/adapter/springcloud/servicediscovery/zookeeper/zk_discovery.go
@@ -20,6 +20,7 @@ package zookeeper
 import (
 	"encoding/json"
 	"path"
+	"strings"
 	"sync"
 	"time"
 )
@@ -40,10 +41,10 @@ const (
 	ZKRootPath     = "/services"
 	ZKName         = "SpringCloud-Zookeeper"
 	StatUP         = "UP"
-	MaxFailTimes   = 2
+	MaxFailTimes   = 3
 	DefaultTimeout = "3s"
 	ConnDelay      = 3 * time.Second
-	defaultTTL     = 30 * time.Second
+	defaultTTL     = 3 * time.Second
 )
 
 type zookeeperDiscovery struct {
@@ -70,8 +71,13 @@ func NewZKServiceDiscovery(targetService []string, config *model.RemoteConfig, l
 		return nil, err
 	}
 
+	rootPath := ZKRootPath
+	if len(strings.TrimSpace(config.Root)) > 0 {
+		rootPath = strings.TrimSpace(config.Root)
+	}
+
 	z := &zookeeperDiscovery{
-		basePath:      ZKRootPath,
+		basePath:      rootPath,
 		listener:      listener,
 		targetService: targetService,
 		instanceMap:   make(map[string]*servicediscovery.ServiceInstance),
@@ -93,6 +99,7 @@ func (sd *zookeeperDiscovery) QueryAllServices() ([]servicediscovery.ServiceInst
 	serviceNames, err := sd.queryForNames()
 	logger.Debugf("%s get all services by root path %s, services %v", common.ZKLogDiscovery, sd.basePath, serviceNames)
 	if err != nil {
+		logger.Errorf("get all services error: %v", err.Error())
 		return nil, err
 	}
 	return sd.QueryServicesByName(serviceNames)
@@ -103,10 +110,16 @@ func (sd *zookeeperDiscovery) QueryServicesByName(serviceNames []string) ([]serv
 	var instancesAll []servicediscovery.ServiceInstance
 	for _, s := range serviceNames {
 
-		ids, err := sd.getClient().GetChildren(sd.pathForName(s))
+		pathForName := sd.pathForName(s)
+		ids, err := sd.getClient().GetChildren(pathForName)
 		logger.Debugf("%s get services %s, services instanceIds %s", common.ZKLogDiscovery, s, ids)
 		if err != nil {
-			logger.Errorf("%s get services [%s] nodes from zookeeper fail: %s", common.ZKLogDiscovery, s, err.Error())
+			// todo refactor gost zk, make it return the definite err
+			if strings.Contains(err.Error(), "none children") {
+				logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
+			} else {
+				logger.Errorf("%s get services [%s] nodes from zookeeper fail: %s", common.ZKLogDiscovery, s, err.Error())
+			}
 			continue
 		}
 
@@ -251,12 +264,14 @@ func (sd *zookeeperDiscovery) addServiceInstance(instance *servicediscovery.Serv
 	return true, nil
 }
 
-func (sd *zookeeperDiscovery) queryByServiceName() ([]string, error) {
-	return sd.getClient().GetChildren(sd.basePath)
-}
-
 func (sd *zookeeperDiscovery) queryForNames() ([]string, error) {
-	return sd.getClient().GetChildren(sd.basePath)
+	children, err := sd.getClient().GetChildren(sd.basePath)
+	// todo refactor gost zk, make it return the definite err
+	if err != nil && strings.Contains(err.Error(), "none children") {
+		logger.Debugf("%s get nodes from zookeeper fail: %s", common.ZKLogDiscovery, err.Error())
+		return nil, nil
+	}
+	return children, err
 }
 
 func (sd *zookeeperDiscovery) pathForInstance(name, id string) string {
@@ -267,15 +282,6 @@ func (sd *zookeeperDiscovery) pathForName(name string) string {
 	return path.Join(sd.basePath, name)
 }
 
-// ZkServiceInstance unused!
-type ZkServiceInstance struct {
-	servicediscovery.ServiceInstance
-}
-
-func (i *ZkServiceInstance) GetUniqKey() string {
-	return i.ID
-}
-
 type SpringCloudZKInstance struct {
 	Name    string      `json:"name"`
 	ID      string      `json:"id"`
diff --git a/pkg/model/remote.go b/pkg/model/remote.go
index 7553fe9..c022c78 100644
--- a/pkg/model/remote.go
+++ b/pkg/model/remote.go
@@ -25,4 +25,5 @@ type RemoteConfig struct {
 	Username string `yaml:"username" json:"username"`
 	Password string `yaml:"password" json:"password"`
 	Group    string `yaml:"group" json:"group"`
+	Root     string `yaml:"root" json:"root" default:"/services"`
 }
diff --git a/samples/springcloud/zookeeper/pixiu/conf.yaml b/samples/springcloud/zookeeper/pixiu/conf.yaml
index 26fad40..7797e92 100644
--- a/samples/springcloud/zookeeper/pixiu/conf.yaml
+++ b/samples/springcloud/zookeeper/pixiu/conf.yaml
@@ -62,3 +62,4 @@ static_resources:
           protocol: zookeeper
           address: 127.0.0.1:2181
           timeout: 100000000s
+          root: "/services"