You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/08 10:19:47 UTC

[dubbo-go] branch 1.5 updated: Fix: as a path for pr 1225 (#1233)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.5 by this push:
     new 37229e3  Fix: as a path for pr 1225 (#1233)
37229e3 is described below

commit 37229e30c658aecd5a10353b90c5c0a95f3d9890
Author: Xin.Zh <dr...@foxmail.com>
AuthorDate: Tue Jun 8 18:19:38 2021 +0800

    Fix: as a path for pr 1225 (#1233)
    
    * defeat too much node listen goroutine
    
    * fix dead lock (#1247)
    
    * Fix: assertion bug for asynchronous test for getty (#1248)
    
    * fix dead lock
    
    * Temporarily fix error occurred in getty unit test
    
    * fix testClient_AsyncCall assertion bug
    
    Co-authored-by: XavierNiu <a...@nxw.name>
---
 registry/zookeeper/service_discovery_test.go | 26 ++++++++++++++++----------
 remoting/getty/getty_client_test.go          | 10 ++++++----
 remoting/zookeeper/listener.go               | 27 +++++++++++++++++++++------
 3 files changed, 43 insertions(+), 20 deletions(-)

diff --git a/registry/zookeeper/service_discovery_test.go b/registry/zookeeper/service_discovery_test.go
index a73ecc9..d53b798 100644
--- a/registry/zookeeper/service_discovery_test.go
+++ b/registry/zookeeper/service_discovery_test.go
@@ -35,16 +35,15 @@ import (
 	"github.com/apache/dubbo-go/registry"
 )
 
-var testName = "test"
-
-var tc *zk.TestCluster
+const testName = "test"
 
 func prepareData(t *testing.T) *zk.TestCluster {
 	var err error
-	tc, err = zk.StartTestCluster(1, nil, nil)
+	tc, err := zk.StartTestCluster(1, nil, nil)
 	assert.NoError(t, err)
 	assert.NotNil(t, tc.Servers[0])
 	address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
+	//address := "127.0.0.1:2181"
 
 	config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
 		Protocol:  "zookeeper",
@@ -63,6 +62,7 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
 	_, err := newZookeeperServiceDiscovery(name)
 
 	// the ServiceDiscoveryConfig not found
+	// err: could not init the instance because the config is invalid
 	assert.NotNil(t, err)
 
 	sdc := &config.ServiceDiscoveryConfig{
@@ -73,11 +73,20 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
 	_, err = newZookeeperServiceDiscovery(name)
 
 	// RemoteConfig not found
+	// err: could not find the remote config for name: mock
 	assert.NotNil(t, err)
 }
 
-func TestCURDZookeeperServiceDiscovery(t *testing.T) {
-	prepareData(t)
+func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
+	tc := prepareData(t)
+	defer func() {
+		_ = tc.Stop()
+	}()
+	t.Run("testCURDZookeeperServiceDiscovery", testCURDZookeeperServiceDiscovery)
+	t.Run("testAddListenerZookeeperServiceDiscovery", testAddListenerZookeeperServiceDiscovery)
+}
+
+func testCURDZookeeperServiceDiscovery(t *testing.T) {
 	sd, err := newZookeeperServiceDiscovery(testName)
 	assert.Nil(t, err)
 	defer func() {
@@ -142,10 +151,7 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) {
 	assert.Nil(t, err)
 }
 
-func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
-	defer func() {
-		_ = tc.Stop()
-	}()
+func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
 	sd, err := newZookeeperServiceDiscovery(testName)
 	assert.Nil(t, err)
 	defer func() {
diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go
index eb72b1a..a03fd52 100644
--- a/remoting/getty/getty_client_test.go
+++ b/remoting/getty/getty_client_test.go
@@ -310,7 +310,7 @@ func testGetUser61(t *testing.T, c *Client) {
 
 func testClient_AsyncCall(t *testing.T, client *Client) {
 	user := &User{}
-	lock := sync.Mutex{}
+	wg := sync.WaitGroup{}
 	request := remoting.NewRequest("2.0.2")
 	invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4", nil, "username"},
 		[]reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil), reflect.ValueOf("username")})
@@ -327,13 +327,13 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
 		r := response.(remoting.AsyncCallbackResponse)
 		rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
 		assert.Equal(t, User{Id: "4", Name: "username"}, *(rst.Rest.(*User)))
-		lock.Unlock()
+		wg.Done()
 	}
-	lock.Lock()
+	wg.Add(1)
 	err := client.Request(request, 3*time.Second, rsp)
 	assert.NoError(t, err)
 	assert.Equal(t, User{}, *user)
-	time.Sleep(1 * time.Second)
+	wg.Wait()
 }
 
 func InitTest(t *testing.T) (*Server, *common.URL) {
@@ -450,6 +450,8 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User
 }
 
 func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) {
+	// fix testClient_AsyncCall assertion
+	time.Sleep(1 * time.Second)
 	return User{Id: id, Name: name}, nil
 }
 
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 66885e9..a708b83 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -29,6 +29,8 @@ import (
 	"github.com/dubbogo/go-zookeeper/zk"
 	gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
 	perrors "github.com/pkg/errors"
+
+	uatomic "go.uber.org/atomic"
 )
 
 import (
@@ -39,14 +41,14 @@ import (
 )
 
 var (
-	defaultTTL = 15 * time.Minute
+	defaultTTL = 10 * time.Minute
 )
 
 // nolint
 type ZkEventListener struct {
 	client      *gxzookeeper.ZookeeperClient
 	pathMapLock sync.Mutex
-	pathMap     map[string]struct{}
+	pathMap     map[string]*uatomic.Int32
 	wg          sync.WaitGroup
 	exit        chan struct{}
 }
@@ -55,7 +57,7 @@ type ZkEventListener struct {
 func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
 	return &ZkEventListener{
 		client:  client,
-		pathMap: make(map[string]struct{}),
+		pathMap: make(map[string]*uatomic.Int32),
 		exit:    make(chan struct{}),
 	}
 }
@@ -83,6 +85,17 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
 // nolint
 func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
 	defer l.wg.Done()
+
+	l.pathMapLock.Lock()
+	a, ok := l.pathMap[zkPath]
+	if !ok || a.Load() > 1 {
+		l.pathMapLock.Unlock()
+		return false
+	}
+	a.Inc()
+	l.pathMapLock.Unlock()
+	defer a.Dec()
+
 	var zkEvent zk.Event
 	for {
 		keyEventCh, err := l.client.ExistW(zkPath)
@@ -174,6 +187,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
 		// listen l service node
 		l.wg.Add(1)
 		go func(node string, listener remoting.DataListener) {
+			// invoker l.wg.Done() in l.listenServiceNodeEvent
 			if l.listenServiceNodeEvent(node, listener) {
 				logger.Warnf("delete zkNode{%s}", node)
 				listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
@@ -271,15 +285,15 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
 			// Save the path to avoid listen repeatedly
 			l.pathMapLock.Lock()
 			_, ok := l.pathMap[dubboPath]
+			if !ok {
+				l.pathMap[dubboPath] = uatomic.NewInt32(0)
+			}
 			l.pathMapLock.Unlock()
 			if ok {
 				logger.Warnf("@zkPath %s has already been listened.", dubboPath)
 				continue
 			}
 
-			l.pathMapLock.Lock()
-			l.pathMap[dubboPath] = struct{}{}
-			l.pathMapLock.Unlock()
 			// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
 			l.client.RLock()
 			if l.client.Conn == nil {
@@ -298,6 +312,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
 			logger.Infof("listen dubbo service key{%s}", dubboPath)
 			l.wg.Add(1)
 			go func(zkPath string, listener remoting.DataListener) {
+				// invoker l.wg.Done() in l.listenServiceNodeEvent
 				if l.listenServiceNodeEvent(zkPath, listener) {
 					listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
 					l.pathMapLock.Lock()