You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/01 14:41:01 UTC

[dubbo-go] branch fix/1166-2 created (now 838094b)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a change to branch fix/1166-2
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git.


      at 838094b  defeat too much node listen goroutine

This branch includes the following new commits:

     new 838094b  defeat too much node listen goroutine

The 1 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


[dubbo-go] 01/01: defeat too much node listen goroutine

Posted by al...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch fix/1166-2
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git

commit 838094bea5483a233981ead6e2a9e8f4fe30f448
Author: AlexStocks <al...@foxmail.com>
AuthorDate: Tue Jun 1 22:40:48 2021 +0800

    defeat too much node listen goroutine
---
 remoting/zookeeper/listener.go | 48 ++++++++++++++++++++++++++++++++++--------
 1 file changed, 39 insertions(+), 9 deletions(-)

diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 66885e9..7a863ab 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -29,6 +29,8 @@ import (
 	"github.com/dubbogo/go-zookeeper/zk"
 	gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
 	perrors "github.com/pkg/errors"
+
+	uatomic "go.uber.org/atomic"
 )
 
 import (
@@ -46,7 +48,7 @@ var (
 type ZkEventListener struct {
 	client      *gxzookeeper.ZookeeperClient
 	pathMapLock sync.Mutex
-	pathMap     map[string]struct{}
+	pathMap     map[string]*uatomic.Int32
 	wg          sync.WaitGroup
 	exit        chan struct{}
 }
@@ -55,7 +57,7 @@ type ZkEventListener struct {
 func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
 	return &ZkEventListener{
 		client:  client,
-		pathMap: make(map[string]struct{}),
+		pathMap: make(map[string]*uatomic.Int32),
 		exit:    make(chan struct{}),
 	}
 }
@@ -70,21 +72,44 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
 	// listen l service node
 	l.wg.Add(1)
 	go func(zkPath string, listener remoting.DataListener) {
+		var delFlag bool
 		if l.listenServiceNodeEvent(zkPath, listener) {
 			listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
 			l.pathMapLock.Lock()
-			delete(l.pathMap, zkPath)
+			if a, ok := l.pathMap[zkPath]; ok && a.Load() <= 0 {
+				delFlag = true
+				delete(l.pathMap, zkPath)
+			}
 			l.pathMapLock.Unlock()
 		}
-		logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
+		logger.Warnf("ListenServiceNodeEvent->listenSelf(zk path{%s}) goroutine exit now, delFlag:%v", zkPath, delFlag)
 	}(zkPath, listener)
 }
 
 // nolint
 func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
 	defer l.wg.Done()
+
+	l.pathMapLock.Lock()
+	a, ok := l.pathMap[zkPath]
+	if ok {
+		a.Inc()
+	}
+	l.pathMapLock.Unlock()
+
 	var zkEvent zk.Event
+	var num int32
 	for {
+		l.pathMapLock.Lock()
+		a, ok := l.pathMap[zkPath]
+		if ok {
+			num = a.Load()
+		}
+		l.pathMapLock.Unlock()
+		if num > 1 {
+			return false
+		}
+
 		keyEventCh, err := l.client.ExistW(zkPath)
 		if err != nil {
 			logger.Warnf("existW{key:%s} = error{%v}", zkPath, err)
@@ -174,6 +199,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		// listen l service node
 		l.wg.Add(1)
 		go func(node string, listener remoting.DataListener) {
+			// invoker l.wg.Done() in l.listenServiceNodeEvent
 			if l.listenServiceNodeEvent(node, listener) {
 				logger.Warnf("delete zkNode{%s}", node)
 				listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
@@ -271,15 +297,15 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
 			// Save the path to avoid listen repeatedly
 			l.pathMapLock.Lock()
 			_, ok := l.pathMap[dubboPath]
+			if !ok {
+				l.pathMap[dubboPath] = uatomic.NewInt32(0)
+			}
 			l.pathMapLock.Unlock()
 			if ok {
 				logger.Warnf("@zkPath %s has already been listened.", dubboPath)
 				continue
 			}
 
-			l.pathMapLock.Lock()
-			l.pathMap[dubboPath] = struct{}{}
-			l.pathMapLock.Unlock()
 			// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
 			l.client.RLock()
 			if l.client.Conn == nil {
@@ -298,13 +324,17 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
 			logger.Infof("listen dubbo service key{%s}", dubboPath)
 			l.wg.Add(1)
 			go func(zkPath string, listener remoting.DataListener) {
+				var delFlag bool
 				if l.listenServiceNodeEvent(zkPath, listener) {
 					listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
 					l.pathMapLock.Lock()
-					delete(l.pathMap, zkPath)
+					if a, ok := l.pathMap[zkPath]; ok && a.Load() <= 0 {
+						delFlag = true
+						delete(l.pathMap, zkPath)
+					}
 					l.pathMapLock.Unlock()
 				}
-				logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now", zkPath)
+				logger.Warnf("listenDirEvent->listenSelf(zk path{%s}) goroutine exit now, delFlag:%v", zkPath, delFlag)
 			}(dubboPath, listener)
 
 			// listen sub path recursive