You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/07/11 03:57:46 UTC

[dubbo-go] branch 3.0 updated: Support Key generate Func in ServiceEvent (#1286)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 103aa72  Support Key generate Func in ServiceEvent (#1286)
103aa72 is described below

commit 103aa72faee6e826cf5c1cc3f3969668bf768595
Author: cvictory <sh...@gmail.com>
AuthorDate: Sun Jul 11 11:57:34 2021 +0800

    Support Key generate Func in ServiceEvent (#1286)
    
    * support Key Func in ServiceEvent
    
    * FIX review issue and add unit test
    
    * fix : make all getCacheKey from ServiceEvent
    
    * fix override url notify bug
---
 registry/directory/directory.go | 24 +++++++++++----------
 registry/event.go               |  9 +++++++-
 registry/event_test.go          | 47 +++++++++++++++++++++++++++++++++++++++++
 3 files changed, 68 insertions(+), 12 deletions(-)

diff --git a/registry/directory/directory.go b/registry/directory/directory.go
index 2b0ff05..7974ad4 100644
--- a/registry/directory/directory.go
+++ b/registry/directory/directory.go
@@ -193,7 +193,7 @@ func (dir *RegistryDirectory) refreshAllInvokers(events []*registry.ServiceEvent
 			if event != nil && event.Service != nil && constant.ROUTER_PROTOCOL == event.Service.Protocol {
 				dir.configRouters()
 			}
-			if oldInvoker, _ := dir.doCacheInvoker(event.Service); oldInvoker != nil {
+			if oldInvoker, _ := dir.doCacheInvoker(event.Service, event); oldInvoker != nil {
 				oldInvokers = append(oldInvokers, oldInvoker)
 			}
 		}
@@ -224,7 +224,7 @@ func (dir *RegistryDirectory) invokerCacheKey(event *registry.ServiceEvent) stri
 	referenceUrl := dir.GetDirectoryUrl().SubURL
 	newUrl := common.MergeURL(event.Service, referenceUrl)
 	event.Update(newUrl)
-	return newUrl.GetCacheInvokerMapKey()
+	return event.Key()
 }
 
 // setNewInvokers groups the invokers from the cache first, then set the result to both directory and router chain.
@@ -240,17 +240,18 @@ func (dir *RegistryDirectory) setNewInvokers() {
 func (dir *RegistryDirectory) cacheInvokerByEvent(event *registry.ServiceEvent) (protocol.Invoker, error) {
 	// judge is override or others
 	if event != nil {
-		u := dir.convertUrl(event)
+
 		switch event.Action {
 		case remoting.EventTypeAdd, remoting.EventTypeUpdate:
+			u := dir.convertUrl(event)
 			logger.Infof("selector add service url{%s}", event.Service)
 			if u != nil && constant.ROUTER_PROTOCOL == u.Protocol {
 				dir.configRouters()
 			}
-			return dir.cacheInvoker(u), nil
+			return dir.cacheInvoker(u, event), nil
 		case remoting.EventTypeDel:
 			logger.Infof("selector delete service url{%s}", event.Service)
-			return dir.uncacheInvoker(u), nil
+			return dir.uncacheInvoker(event), nil
 		default:
 			return nil, fmt.Errorf("illegal event type: %v", event.Action)
 		}
@@ -316,8 +317,8 @@ func (dir *RegistryDirectory) toGroupInvokers() []protocol.Invoker {
 }
 
 // uncacheInvoker will return abandoned Invoker, if no Invoker to be abandoned, return nil
-func (dir *RegistryDirectory) uncacheInvoker(url *common.URL) protocol.Invoker {
-	return dir.uncacheInvokerWithKey(url.GetCacheInvokerMapKey())
+func (dir *RegistryDirectory) uncacheInvoker(event *registry.ServiceEvent) protocol.Invoker {
+	return dir.uncacheInvokerWithKey(event.Key())
 }
 
 func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker {
@@ -331,7 +332,7 @@ func (dir *RegistryDirectory) uncacheInvokerWithKey(key string) protocol.Invoker
 }
 
 // cacheInvoker will return abandoned Invoker,if no Invoker to be abandoned,return nil
-func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
+func (dir *RegistryDirectory) cacheInvoker(url *common.URL, event *registry.ServiceEvent) protocol.Invoker {
 	dir.overrideUrl(dir.GetDirectoryUrl())
 	referenceUrl := dir.GetDirectoryUrl().SubURL
 
@@ -348,15 +349,16 @@ func (dir *RegistryDirectory) cacheInvoker(url *common.URL) protocol.Invoker {
 	if url.Protocol == referenceUrl.Protocol || referenceUrl.Protocol == "" {
 		newUrl := common.MergeURL(url, referenceUrl)
 		dir.overrideUrl(newUrl)
-		if v, ok := dir.doCacheInvoker(newUrl); ok {
+		event.Update(newUrl)
+		if v, ok := dir.doCacheInvoker(newUrl, event); ok {
 			return v
 		}
 	}
 	return nil
 }
 
-func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL) (protocol.Invoker, bool) {
-	key := newUrl.GetCacheInvokerMapKey()
+func (dir *RegistryDirectory) doCacheInvoker(newUrl *common.URL, event *registry.ServiceEvent) (protocol.Invoker, bool) {
+	key := event.Key()
 	if cacheInvoker, ok := dir.cacheInvokersMap.Load(key); !ok {
 		logger.Debugf("service will be added in cache invokers: invokers url is  %s!", newUrl)
 		newInvoker := extension.GetProtocol(protocolwrapper.FILTER).Refer(newUrl)
diff --git a/registry/event.go b/registry/event.go
index 41a8995..76a9600 100644
--- a/registry/event.go
+++ b/registry/event.go
@@ -29,6 +29,8 @@ import (
 	"dubbo.apache.org/dubbo-go/v3/remoting"
 )
 
+type KeyFunc func(*common.URL) string
+
 func init() {
 	rand.Seed(time.Now().UnixNano())
 }
@@ -45,6 +47,7 @@ type ServiceEvent struct {
 	key string
 	// If the url is updated, such as Merged.
 	updated bool
+	KeyFunc KeyFunc
 }
 
 // String return the description of event
@@ -69,7 +72,11 @@ func (e *ServiceEvent) Key() string {
 	if len(e.key) > 0 {
 		return e.key
 	}
-	e.key = e.Service.GetCacheInvokerMapKey()
+	if e.KeyFunc == nil {
+		e.key = e.Service.GetCacheInvokerMapKey()
+	} else {
+		e.key = e.KeyFunc(e.Service)
+	}
 	return e.key
 }
 
diff --git a/registry/event_test.go b/registry/event_test.go
new file mode 100644
index 0000000..349dc3d
--- /dev/null
+++ b/registry/event_test.go
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package registry
+
+import (
+	"testing"
+)
+
+import (
+	"github.com/stretchr/testify/assert"
+)
+
+import (
+	"dubbo.apache.org/dubbo-go/v3/common"
+)
+
+func TestKey(t *testing.T) {
+	u1, _ := common.NewURL("dubbo://127.0.0.1:20000/com.ikurento.user.UserProvider?interface=com.ikurento.user.UserProvider&group=&version=2.0")
+	se := ServiceEvent{
+		Service: u1,
+	}
+	assert.Equal(t, se.Key(), "dubbo://:@127.0.0.1:20000/?interface=com.ikurento.user.UserProvider&group=&version=2.0&timestamp=")
+
+	se2 := ServiceEvent{
+		Service: u1,
+		KeyFunc: defineKey,
+	}
+	assert.Equal(t, se2.Key(), "Hello Key")
+}
+
+func defineKey(url *common.URL) string {
+	return "Hello Key"
+}