You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/08 10:19:47 UTC
[dubbo-go] branch 1.5 updated: Fix: as a path for pr 1225 (#1233)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/1.5 by this push:
new 37229e3 Fix: as a path for pr 1225 (#1233)
37229e3 is described below
commit 37229e30c658aecd5a10353b90c5c0a95f3d9890
Author: Xin.Zh <dr...@foxmail.com>
AuthorDate: Tue Jun 8 18:19:38 2021 +0800
Fix: as a path for pr 1225 (#1233)
* defeat too much node listen goroutine
* fix dead lock (#1247)
* Fix: assertion bug for asynchronous test for getty (#1248)
* fix dead lock
* Temporarily fix error occurred in getty unit test
* fix testClient_AsyncCall assertion bug
Co-authored-by: XavierNiu <a...@nxw.name>
---
registry/zookeeper/service_discovery_test.go | 26 ++++++++++++++++----------
remoting/getty/getty_client_test.go | 10 ++++++----
remoting/zookeeper/listener.go | 27 +++++++++++++++++++++------
3 files changed, 43 insertions(+), 20 deletions(-)
diff --git a/registry/zookeeper/service_discovery_test.go b/registry/zookeeper/service_discovery_test.go
index a73ecc9..d53b798 100644
--- a/registry/zookeeper/service_discovery_test.go
+++ b/registry/zookeeper/service_discovery_test.go
@@ -35,16 +35,15 @@ import (
"github.com/apache/dubbo-go/registry"
)
-var testName = "test"
-
-var tc *zk.TestCluster
+const testName = "test"
func prepareData(t *testing.T) *zk.TestCluster {
var err error
- tc, err = zk.StartTestCluster(1, nil, nil)
+ tc, err := zk.StartTestCluster(1, nil, nil)
assert.NoError(t, err)
assert.NotNil(t, tc.Servers[0])
address := "127.0.0.1:" + strconv.Itoa(tc.Servers[0].Port)
+ //address := "127.0.0.1:2181"
config.GetBaseConfig().ServiceDiscoveries[testName] = &config.ServiceDiscoveryConfig{
Protocol: "zookeeper",
@@ -63,6 +62,7 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err := newZookeeperServiceDiscovery(name)
// the ServiceDiscoveryConfig not found
+ // err: could not init the instance because the config is invalid
assert.NotNil(t, err)
sdc := &config.ServiceDiscoveryConfig{
@@ -73,11 +73,20 @@ func TestNewZookeeperServiceDiscovery(t *testing.T) {
_, err = newZookeeperServiceDiscovery(name)
// RemoteConfig not found
+ // err: could not find the remote config for name: mock
assert.NotNil(t, err)
}
-func TestCURDZookeeperServiceDiscovery(t *testing.T) {
- prepareData(t)
+func TestZookeeperServiceDiscovery_CURDAndListener(t *testing.T) {
+ tc := prepareData(t)
+ defer func() {
+ _ = tc.Stop()
+ }()
+ t.Run("testCURDZookeeperServiceDiscovery", testCURDZookeeperServiceDiscovery)
+ t.Run("testAddListenerZookeeperServiceDiscovery", testAddListenerZookeeperServiceDiscovery)
+}
+
+func testCURDZookeeperServiceDiscovery(t *testing.T) {
sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
defer func() {
@@ -142,10 +151,7 @@ func TestCURDZookeeperServiceDiscovery(t *testing.T) {
assert.Nil(t, err)
}
-func TestAddListenerZookeeperServiceDiscovery(t *testing.T) {
- defer func() {
- _ = tc.Stop()
- }()
+func testAddListenerZookeeperServiceDiscovery(t *testing.T) {
sd, err := newZookeeperServiceDiscovery(testName)
assert.Nil(t, err)
defer func() {
diff --git a/remoting/getty/getty_client_test.go b/remoting/getty/getty_client_test.go
index eb72b1a..a03fd52 100644
--- a/remoting/getty/getty_client_test.go
+++ b/remoting/getty/getty_client_test.go
@@ -310,7 +310,7 @@ func testGetUser61(t *testing.T, c *Client) {
func testClient_AsyncCall(t *testing.T, client *Client) {
user := &User{}
- lock := sync.Mutex{}
+ wg := sync.WaitGroup{}
request := remoting.NewRequest("2.0.2")
invocation := createInvocation("GetUser0", nil, nil, []interface{}{"4", nil, "username"},
[]reflect.Value{reflect.ValueOf("4"), reflect.ValueOf(nil), reflect.ValueOf("username")})
@@ -327,13 +327,13 @@ func testClient_AsyncCall(t *testing.T, client *Client) {
r := response.(remoting.AsyncCallbackResponse)
rst := *r.Reply.(*remoting.Response).Result.(*protocol.RPCResult)
assert.Equal(t, User{Id: "4", Name: "username"}, *(rst.Rest.(*User)))
- lock.Unlock()
+ wg.Done()
}
- lock.Lock()
+ wg.Add(1)
err := client.Request(request, 3*time.Second, rsp)
assert.NoError(t, err)
assert.Equal(t, User{}, *user)
- time.Sleep(1 * time.Second)
+ wg.Wait()
}
func InitTest(t *testing.T) (*Server, *common.URL) {
@@ -450,6 +450,8 @@ func (u *UserProvider) GetUser(ctx context.Context, req []interface{}, rsp *User
}
func (u *UserProvider) GetUser0(id string, k *User, name string) (User, error) {
+ // fix testClient_AsyncCall assertion
+ time.Sleep(1 * time.Second)
return User{Id: id, Name: name}, nil
}
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 66885e9..a708b83 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -29,6 +29,8 @@ import (
"github.com/dubbogo/go-zookeeper/zk"
gxzookeeper "github.com/dubbogo/gost/database/kv/zk"
perrors "github.com/pkg/errors"
+
+ uatomic "go.uber.org/atomic"
)
import (
@@ -39,14 +41,14 @@ import (
)
var (
- defaultTTL = 15 * time.Minute
+ defaultTTL = 10 * time.Minute
)
// nolint
type ZkEventListener struct {
client *gxzookeeper.ZookeeperClient
pathMapLock sync.Mutex
- pathMap map[string]struct{}
+ pathMap map[string]*uatomic.Int32
wg sync.WaitGroup
exit chan struct{}
}
@@ -55,7 +57,7 @@ type ZkEventListener struct {
func NewZkEventListener(client *gxzookeeper.ZookeeperClient) *ZkEventListener {
return &ZkEventListener{
client: client,
- pathMap: make(map[string]struct{}),
+ pathMap: make(map[string]*uatomic.Int32),
exit: make(chan struct{}),
}
}
@@ -83,6 +85,17 @@ func (l *ZkEventListener) ListenServiceNodeEvent(zkPath string, listener remotin
// nolint
func (l *ZkEventListener) listenServiceNodeEvent(zkPath string, listener ...remoting.DataListener) bool {
defer l.wg.Done()
+
+ l.pathMapLock.Lock()
+ a, ok := l.pathMap[zkPath]
+ if !ok || a.Load() > 1 {
+ l.pathMapLock.Unlock()
+ return false
+ }
+ a.Inc()
+ l.pathMapLock.Unlock()
+ defer a.Dec()
+
var zkEvent zk.Event
for {
keyEventCh, err := l.client.ExistW(zkPath)
@@ -174,6 +187,7 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
// listen l service node
l.wg.Add(1)
go func(node string, listener remoting.DataListener) {
+ // invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(node, listener) {
logger.Warnf("delete zkNode{%s}", node)
listener.DataChange(remoting.Event{Path: node, Action: remoting.EventTypeDel})
@@ -271,15 +285,15 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
// Save the path to avoid listen repeatedly
l.pathMapLock.Lock()
_, ok := l.pathMap[dubboPath]
+ if !ok {
+ l.pathMap[dubboPath] = uatomic.NewInt32(0)
+ }
l.pathMapLock.Unlock()
if ok {
logger.Warnf("@zkPath %s has already been listened.", dubboPath)
continue
}
- l.pathMapLock.Lock()
- l.pathMap[dubboPath] = struct{}{}
- l.pathMapLock.Unlock()
// When Zk disconnected, the Conn will be set to nil, so here need check the value of Conn
l.client.RLock()
if l.client.Conn == nil {
@@ -298,6 +312,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkPath string, listen
logger.Infof("listen dubbo service key{%s}", dubboPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
+ // invoker l.wg.Done() in l.listenServiceNodeEvent
if l.listenServiceNodeEvent(zkPath, listener) {
listener.DataChange(remoting.Event{Path: zkPath, Action: remoting.EventTypeDel})
l.pathMapLock.Lock()