You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2023/04/18 02:54:12 UTC
[dubbo-go] branch main updated: Add wildcard subscription support for zookeeper registry (#2267)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/main by this push:
new 0ea60750f Add wildcard subscription support for zookeeper registry (#2267)
0ea60750f is described below
commit 0ea60750f724cdf57847e06af35a153deb763f23
Author: wudong5 <63...@users.noreply.github.com>
AuthorDate: Tue Apr 18 10:54:04 2023 +0800
Add wildcard subscription support for zookeeper registry (#2267)
---
common/url.go | 31 ++++++++++
common/url_test.go | 131 +++++++++++++++++++++++++++++++++++++++++
registry/zookeeper/listener.go | 8 ++-
remoting/zookeeper/listener.go | 94 +++++++++++++++++++++++++++--
4 files changed, 257 insertions(+), 7 deletions(-)
diff --git a/common/url.go b/common/url.go
index 058f76a89..2824ec68d 100644
--- a/common/url.go
+++ b/common/url.go
@@ -413,6 +413,37 @@ func ServiceKey(intf string, group string, version string) string {
return buf.String()
}
+// ParseServiceKey gets interface, group and version from service key
+func ParseServiceKey(serviceKey string) (string, string, string) {
+ var (
+ group string
+ version string
+ )
+ if serviceKey == "" {
+ return "", "", ""
+ }
+ // get group if it exists
+ sepIndex := strings.Index(serviceKey, constant.PathSeparator)
+ if sepIndex != -1 {
+ group = serviceKey[:sepIndex]
+ serviceKey = serviceKey[sepIndex+1:]
+ }
+ // get version if it exists
+ sepIndex = strings.LastIndex(serviceKey, constant.KeySeparator)
+ if sepIndex != -1 {
+ version = serviceKey[sepIndex+1:]
+ serviceKey = serviceKey[:sepIndex]
+ }
+
+ return serviceKey, group, version
+}
+
+// IsAnyCondition judges if is any condition
+func IsAnyCondition(intf, group, version string, serviceURL *URL) bool {
+ return intf == constant.AnyValue && (group == constant.AnyValue ||
+ group == serviceURL.Group()) && (version == constant.AnyValue || version == serviceURL.Version())
+}
+
// ColonSeparatedKey
// The format is "{interface}:[version]:[group]"
func (c *URL) ColonSeparatedKey() string {
diff --git a/common/url_test.go b/common/url_test.go
index 89953c3ab..2971e6c45 100644
--- a/common/url_test.go
+++ b/common/url_test.go
@@ -421,3 +421,134 @@ func TestCompareURLEqualFunc(t *testing.T) {
func CustomCompareURLEqual(l *URL, r *URL, execludeParam ...string) bool {
return l.PrimitiveURL == r.PrimitiveURL
}
+
+func TestParseServiceKey(t *testing.T) {
+ type args struct {
+ serviceKey string
+ }
+ tests := []struct {
+ name string
+ args args
+ want string
+ want1 string
+ want2 string
+ }{
+ {
+ name: "test1",
+ args: args{
+ serviceKey: "group/interface:version",
+ },
+ want: "interface",
+ want1: "group",
+ want2: "version",
+ },
+ {
+ name: "test2",
+ args: args{
+ serviceKey: "*/*:*",
+ },
+ want: "*",
+ want1: "*",
+ want2: "*",
+ },
+ {
+ name: "test3",
+ args: args{
+ serviceKey: "group/org.apache.dubbo.mock.api.MockService",
+ },
+ want: "org.apache.dubbo.mock.api.MockService",
+ want1: "group",
+ want2: "",
+ },
+ {
+ name: "test4",
+ args: args{
+ serviceKey: "org.apache.dubbo.mock.api.MockService",
+ },
+ want: "org.apache.dubbo.mock.api.MockService",
+ want1: "",
+ want2: "",
+ },
+ {
+ name: "test5",
+ args: args{
+ serviceKey: "group/",
+ },
+ want: "",
+ want1: "group",
+ want2: "",
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ got, got1, got2 := ParseServiceKey(tt.args.serviceKey)
+ assert.Equalf(t, tt.want, got, "ParseServiceKey(%v)", tt.args.serviceKey)
+ assert.Equalf(t, tt.want1, got1, "ParseServiceKey(%v)", tt.args.serviceKey)
+ assert.Equalf(t, tt.want2, got2, "ParseServiceKey(%v)", tt.args.serviceKey)
+ })
+ }
+}
+
+func TestIsAnyCondition(t *testing.T) {
+ type args struct {
+ intf string
+ group string
+ version string
+ serviceURL *URL
+ }
+ serviceURL, _ := NewURL(GetLocalIp()+":0", WithProtocol("admin"), WithParams(url.Values{
+ constant.GroupKey: {"group"},
+ constant.VersionKey: {"version"},
+ }))
+ tests := []struct {
+ name string
+ args args
+ want bool
+ }{
+ {
+ name: "test1",
+ args: args{
+ intf: constant.AnyValue,
+ group: constant.AnyValue,
+ version: constant.AnyValue,
+ serviceURL: serviceURL,
+ },
+ want: true,
+ },
+ {
+ name: "test2",
+ args: args{
+ intf: constant.AnyValue,
+ group: "group",
+ version: "version",
+ serviceURL: serviceURL,
+ },
+ want: true,
+ },
+ {
+ name: "test3",
+ args: args{
+ intf: "intf",
+ group: constant.AnyValue,
+ version: constant.AnyValue,
+ serviceURL: serviceURL,
+ },
+ want: false,
+ },
+ {
+ name: "test4",
+ args: args{
+ intf: constant.AnyValue,
+ group: "group1",
+ version: constant.AnyValue,
+ serviceURL: serviceURL,
+ },
+ want: false,
+ },
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ assert.Equalf(t, tt.want, IsAnyCondition(tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL), "IsAnyCondition(%v, %v, %v, %v)", tt.args.intf, tt.args.group, tt.args.version, tt.args.serviceURL)
+ })
+ }
+}
diff --git a/registry/zookeeper/listener.go b/registry/zookeeper/listener.go
index 65871adb9..c8f0f26b3 100644
--- a/registry/zookeeper/listener.go
+++ b/registry/zookeeper/listener.go
@@ -92,8 +92,10 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
if l.closed {
return false
}
+ match := false
for serviceKey, listener := range l.subscribed {
- if serviceURL.ServiceKey() == serviceKey {
+ intf, group, version := common.ParseServiceKey(serviceKey)
+ if serviceURL.ServiceKey() == serviceKey || common.IsAnyCondition(intf, group, version, serviceURL) {
listener.Process(
&config_center.ConfigChangeEvent{
Key: event.Path,
@@ -101,10 +103,10 @@ func (l *RegistryDataListener) DataChange(event remoting.Event) bool {
ConfigType: event.Action,
},
)
- return true
+ match = true
}
}
- return false
+ return match
}
// Close all RegistryConfigurationListener in subscribed
diff --git a/remoting/zookeeper/listener.go b/remoting/zookeeper/listener.go
index 498fc3309..2e9abe3c8 100644
--- a/remoting/zookeeper/listener.go
+++ b/remoting/zookeeper/listener.go
@@ -18,6 +18,7 @@
package zookeeper
import (
+ "net/url"
"path"
"strings"
"sync"
@@ -238,8 +239,89 @@ func (l *ZkEventListener) handleZkNodeEvent(zkPath string, children []string, li
listener.DataChange(remoting.Event{Path: oldNode, Action: remoting.EventTypeDel})
}
}
-func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener) {
+
+// listenerAllDirEvents listens all services when conf.InterfaceKey = "*"
+func (l *ZkEventListener) listenAllDirEvents(conf *common.URL, listener remoting.DataListener) {
+ var (
+ failTimes int
+ ttl time.Duration
+ )
+ ttl = defaultTTL
+ if conf != nil {
+ if timeout, err := time.ParseDuration(conf.GetParam(constant.RegistryTTLKey, constant.DefaultRegTTL)); err == nil {
+ ttl = timeout
+ } else {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Wrong configuration for registry.ttl, error=%+v, using default value %v instead", err, defaultTTL)
+ }
+ }
+ if ttl > 20e9 {
+ ttl = 20e9
+ }
+
+ rootPath := path.Join(constant.PathSeparator, constant.Dubbo)
+ for {
+ // get all interfaces
+ children, childEventCh, err := l.Client.GetChildrenW(rootPath)
+ if err != nil {
+ failTimes++
+ if MaxFailTimes <= failTimes {
+ failTimes = MaxFailTimes
+ }
+ logger.Errorf("[Zookeeper EventListener][listenDirEvent] Get children of path {%s} with watcher failed, the error is %+v", rootPath, err)
+ // Maybe the zookeeper does not ready yet, sleep failTimes * ConnDelay senconds to wait
+ after := time.After(timeSecondDuration(failTimes * ConnDelay))
+ select {
+ case <-after:
+ continue
+ case <-l.exit:
+ return
+ }
+ }
+ failTimes = 0
+ if len(children) == 0 {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] Can not get any children for the path \"%s\", please check if the provider does ready.", rootPath)
+ }
+ for _, c := range children {
+ // Build the child path
+ zkRootPath := path.Join(rootPath, constant.PathSeparator, url.QueryEscape(c), constant.PathSeparator, constant.ProvidersCategory)
+ // Save the path to avoid listen repeatedly
+ l.pathMapLock.Lock()
+ if _, ok := l.pathMap[zkRootPath]; ok {
+ logger.Warnf("[Zookeeper EventListener][listenDirEvent] The child with zk path {%s} has already been listened.", zkRootPath)
+ l.pathMapLock.Unlock()
+ continue
+ } else {
+ l.pathMap[zkRootPath] = uatomic.NewInt32(0)
+ }
+ l.pathMapLock.Unlock()
+ logger.Debugf("[Zookeeper EventListener][listenDirEvent] listen dubbo interface key{%s}", zkRootPath)
+ l.wg.Add(1)
+ // listen every interface
+ go l.listenDirEvent(conf, zkRootPath, listener, c)
+ }
+
+ ticker := time.NewTicker(ttl)
+ select {
+ case <-ticker.C:
+ ticker.Stop()
+ case zkEvent := <-childEventCh:
+ logger.Debugf("Get a zookeeper childEventCh{type:%s, server:%s, path:%s, state:%d-%s, err:%v}",
+ zkEvent.Type.String(), zkEvent.Server, zkEvent.Path, zkEvent.State, gxzookeeper.StateToString(zkEvent.State), zkEvent.Err)
+ ticker.Stop()
+ case <-l.exit:
+ logger.Warnf("listen(path{%s}) goroutine exit now...", rootPath)
+ ticker.Stop()
+ return
+ }
+ }
+}
+
+func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, listener remoting.DataListener, intf string) {
defer l.wg.Done()
+ if intf == constant.AnyValue {
+ l.listenAllDirEvents(conf, listener)
+ return
+ }
var (
failTimes int
ttl time.Duration
@@ -279,7 +361,7 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
// Only need to compare Path when subscribing to provider
if strings.LastIndex(zkRootPath, constant.ProviderCategory) != -1 {
provider, _ := common.NewURL(c)
- if provider.ServiceKey() != conf.ServiceKey() {
+ if provider.Interface() != intf || !common.IsAnyCondition(constant.AnyValue, conf.Group(), conf.Version(), provider) {
continue
}
}
@@ -326,7 +408,6 @@ func (l *ZkEventListener) listenDirEvent(conf *common.URL, zkRootPath string, li
}
if l.startScheduleWatchTask(zkRootPath, children, ttl, listener, childEventCh) {
return
-
}
}
}
@@ -367,6 +448,7 @@ func (l *ZkEventListener) startScheduleWatchTask(
}
}
}
+
func timeSecondDuration(sec int) time.Duration {
return time.Duration(sec) * time.Second
}
@@ -378,7 +460,11 @@ func (l *ZkEventListener) ListenServiceEvent(conf *common.URL, zkPath string, li
logger.Infof("[Zookeeper Listener] listen dubbo path{%s}", zkPath)
l.wg.Add(1)
go func(zkPath string, listener remoting.DataListener) {
- l.listenDirEvent(conf, zkPath, listener)
+ intf := ""
+ if conf != nil {
+ intf = conf.Interface()
+ }
+ l.listenDirEvent(conf, zkPath, listener, intf)
logger.Warnf("ListenServiceEvent->listenDirEvent(zkPath{%s}) goroutine exit now", zkPath)
}(zkPath, listener)
}