You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/07/11 03:57:46 UTC
[dubbo-go] branch 3.0 updated: Support Key generate Func in
ServiceEvent (#1286)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 103aa72 Support Key generate Func in ServiceEvent (#1286)
103aa72 is described below
commit 103aa72faee6e826cf5c1cc3f3969668bf768595
Author: cvictory <sh...@gmail.com>
AuthorDate: Sun Jul 11 11:57:34 2021 +0800
Support Key generate Func in ServiceEvent (#1286)
* support Key Func in ServiceEvent
* FIX review issue and add unit test
* fix : make all getCacheKey from ServiceEvent
* fix override url notify bug
---
registry/directory/directory.go | 24 +++++++++++----------
registry/event.go | 9 +++++++-
registry/event_test.go | 47 +++++++++++++++++++++++++++++++++++++++++
3 files changed, 68 insertions(+), 12 deletions(-)
diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 2b0ff05..7974ad4 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -193,7 +193,7 @@ func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent
if event != nil && event.Service != nil && constant.ROUTER_PROTOCOL == event.Service.Protocol {
dir.configRouters()
}
- if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {
+ if oldInvoker, _ := dir.doCacheInvoker(event.Service, event); oldInvoker != nil {
oldInvokers = append(oldInvokers, oldInvoker)
}
}
@@ -224,7 +224,7 @@ func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) stri
referenceUrl := dir.GetDirectoryUrl().SubURL
newUrl := common.MergeURL(event.Service, referenceUrl)
event.Update(newUrl)
- return newUrl.GetCacheInvokerMapKey()
+ return event.Key()
}
// setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
@@ -240,17 +240,18 @@ func (dir *RegistryDirectory) setNewInvokers() {
func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
// judge is override or others
if event != nil {
- u := dir.convertUrl(event)
+
switch event.Action {
case remoting.EventTypeAdd, remoting.EventTypeUpdate:
+ u := dir.convertUrl(event)
logger.Infof("selector add service url{%s}", event.Service)
if u != nil && constant.ROUTER_PROTOCOL == u.Protocol {
dir.configRouters()
}
- return dir.cacheInvoker(u), nil
+ return dir.cacheInvoker(u, event), nil
case remoting.EventTypeDel:
logger.Infof("selector delete service url{%s}", event.Service)
- return dir.uncacheInvoker(u), nil
+ return dir.uncacheInvoker(event), nil
default:
return nil, fmt.Errorf("illegal event type: %v", event.Action)
}
@@ -316,8 +317,8 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
}
// uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
-func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
- return dir.uncacheInvokerWithKey(url.GetCacheInvokerMapKey())
+func (dir *RegistryDirectory) uncacheInvoker(event *registry.ServiceEvent) protocol.Invoker {
+ return dir.uncacheInvokerWithKey(event.Key())
}
func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
@@ -331,7 +332,7 @@ func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker
}
// cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
-func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
+func (dir *RegistryDirectory) cacheInvoker(url *common.URL, event *registry.ServiceEvent) protocol.Invoker {
dir.overrideUrl(dir.GetDirectoryUrl())
referenceUrl := dir.GetDirectoryUrl().SubURL
@@ -348,15 +349,16 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
newUrl := common.MergeURL(url, referenceUrl)
dir.overrideUrl(newUrl)
- if v, ok := dir.doCacheInvoker(newUrl); ok {
+ event.Update(newUrl)
+ if v, ok := dir.doCacheInvoker(newUrl, event); ok {
return v
}
}
return nil
}
-func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) {
- key := newUrl.GetCacheInvokerMapKey()
+func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL, event *registry.ServiceEvent) (protocol.Invoker, bool) {
+ key := event.Key()
if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok {
logger.Debugf("service will be added in cache invokers: invokers url is %s!", newUrl)
newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
diff --git a/registry/event.go b/registry/event.go
index 41a8995..76a9600 100644
--- a/registry/event.go
+++ b/registry/event.go
@@ -29,6 +29,8 @@ import (
"dubbo.apache.org/dubbo-go/v3/remoting"
)
+type KeyFunc func(*common.URL) string
+
func init() {
rand.Seed(time.Now().UnixNano())
}
@@ -45,6 +47,7 @@ type ServiceEvent struct {
key string
// If the url is updated, such as Merged.
updated bool
+ KeyFunc KeyFunc
}
// String return the description of event
@@ -69,7 +72,11 @@ func (e *ServiceEvent) Key() string {
if len(e.key) > 0 {
return e.key
}
- e.key = e.Service.GetCacheInvokerMapKey()
+ if e.KeyFunc == nil {
+ e.key = e.Service.GetCacheInvokerMapKey()
+ } else {
+ e.key = e.KeyFunc(e.Service)
+ }
return e.key
}
diff --git a/registry/event_test.go b/registry/event_test.go
new file mode 100644
index 0000000..349dc3d
--- /dev/null
+++ b/registry/event_test.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package registry
+
+import (
+ "testing"
+)
+
+import (
+ "github.com/stretchr/testify/assert"
+)
+
+import (
+ "dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func TestKey(t *testing.T) {
+ u1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.0")
+ se := ServiceEvent{
+ Service: u1,
+ }
+ assert.Equal(t, se.Key(), "dubbo://:@127.0.0.1:20000/?interface=com.ikurento.user.UserProvider&group=&version=2.0×tamp=")
+
+ se2 := ServiceEvent{
+ Service: u1,
+ KeyFunc: defineKey,
+ }
+ assert.Equal(t, se2.Key(), "Hello Key")
+}
+
+func defineKey(url *common.URL) string {
+ return "Hello Key"
+}