You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/09/08 03:16:49 UTC

[dubbo-go] branch config-enhance updated: Add get consumer service by InterfaceName api. (#1443)

This is an automated email from the ASF dual-hosted git repository.

laurence pushed a commit to branch config-enhance
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/config-enhance by this push:
     new f556336  Add get  consumer service by InterfaceName api. (#1443)
f556336 is described below

commit f556336c4ba6cb21c4140437073219f444badf1a
Author: Laurence <45...@users.noreply.github.com>
AuthorDate: Wed Sep 8 11:15:51 2021 +0800

    Add get  consumer service by InterfaceName api. (#1443)
    
    * fix: add get dynamic configuration api
    
    * fix: change zk config center data to base64
    
    * fix: add ConsumerServiceByInterfaceName
    
    * fix: ut
    
    * fix: add lock
    
    * fix: fix grpc ut
    
    * fix: add comment
    
    * fix: change comment
---
 config/reference_config.go                  |  3 ++-
 config/service.go                           | 38 +++++++++++++++++++++++------
 protocol/dubbo3/dubbo3_invoker.go           |  6 +++--
 protocol/dubbo3/internal/client.go          |  2 ++
 protocol/grpc/client.go                     |  4 +--
 protocol/grpc/grpc_invoker_test.go          |  2 +-
 protocol/grpc/internal/helloworld/client.go |  1 +
 protocol/grpc/internal/routeguide/client.go |  1 +
 8 files changed, 43 insertions(+), 14 deletions(-)

diff --git a/config/reference_config.go b/config/reference_config.go
index a4142bc..58d860d 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -94,7 +94,7 @@ func (cc *ReferenceConfig) Init(rc *RootConfig) error {
 }
 
 // Refer ...
-func (rc *ReferenceConfig) Refer(_ interface{}) {
+func (rc *ReferenceConfig) Refer(srv interface{}) {
 	cfgURL := common.NewURLWithOptions(
 		common.WithPath(rc.InterfaceName),
 		common.WithProtocol(rc.Protocol),
@@ -102,6 +102,7 @@ func (rc *ReferenceConfig) Refer(_ interface{}) {
 		common.WithParamsValue(constant.BEAN_NAME_KEY, rc.id),
 		common.WithParamsValue(constant.METADATATYPE_KEY, rc.metaDataType),
 	)
+	SetConsumerServiceByInterfaceName(rc.InterfaceName, srv)
 	if rc.ForceTag {
 		cfgURL.AddParam(constant.ForceUseTag, "true")
 	}
diff --git a/config/service.go b/config/service.go
index 8786867..0f039fd 100644
--- a/config/service.go
+++ b/config/service.go
@@ -19,43 +19,65 @@ package config
 
 import (
 	"dubbo.apache.org/dubbo-go/v3/common"
+	"sync"
 )
 
 var (
-	conServices = map[string]common.RPCService{} // service name -> service
-	proServices = map[string]common.RPCService{} // service name -> service
+	// conServicesLock is used to guard conServices map.
+	conServicesLock = sync.Mutex{}
+	conServices     = map[string]common.RPCService{} // service name -> service
+
+	// proServicesLock is used to guard proServices map
+	proServicesLock = sync.Mutex{}
+	proServices     = map[string]common.RPCService{} // service name -> service
+
+	// interfaceNameConServicesLock is used to guard interfaceNameConServices map
+	interfaceNameConServicesLock = sync.Mutex{}
+	interfaceNameConServices     = map[string]common.RPCService{} // interfaceName -> service
 )
 
 // SetConsumerService is called by init() of implement of RPCService
 func SetConsumerService(service common.RPCService) {
 	ref := common.GetReference(service)
+	conServicesLock.Lock()
+	defer conServicesLock.Unlock()
 	conServices[ref] = service
 }
 
 // SetProviderService is called by init() of implement of RPCService
 func SetProviderService(service common.RPCService) {
 	ref := common.GetReference(service)
+	proServicesLock.Lock()
+	defer proServicesLock.Unlock()
 	proServices[ref] = service
 }
 
 // GetConsumerService gets ConsumerService by @name
 func GetConsumerService(name string) common.RPCService {
+	conServicesLock.Lock()
+	defer conServicesLock.Unlock()
 	return conServices[name]
 }
 
 // GetProviderService gets ProviderService by @name
 func GetProviderService(name string) common.RPCService {
+	proServicesLock.Lock()
+	defer proServicesLock.Unlock()
 	return proServices[name]
 }
 
-// GetAllProviderService gets all ProviderService
-func GetAllProviderService() map[string]common.RPCService {
-	return proServices
+// SetConsumerServiceByInterfaceName is used by pb serialization
+func SetConsumerServiceByInterfaceName(interfaceName string, srv common.RPCService) {
+	interfaceNameConServicesLock.Lock()
+	defer interfaceNameConServicesLock.Unlock()
+	interfaceNameConServices[interfaceName] = srv
 }
 
-// GetAllConsumerService gets all ConsumerService
-func GetAllConsumerService() map[string]common.RPCService {
-	return conServices
+// GetConsumerServiceByInterfaceName is used by pb serialization
+func GetConsumerServiceByInterfaceName(interfaceName string) common.RPCService {
+	interfaceNameConServicesLock.Lock()
+	defer interfaceNameConServicesLock.Unlock()
+	return interfaceNameConServices[interfaceName]
 }
 
 // GetCallback gets CallbackResponse by @name
diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go
index e2eb80d..a55ee54 100644
--- a/protocol/dubbo3/dubbo3_invoker.go
+++ b/protocol/dubbo3/dubbo3_invoker.go
@@ -59,8 +59,10 @@ func NewDubboInvoker(url *common.URL) (*DubboInvoker, error) {
 	rt := config.GetConsumerConfig().RequestTimeout
 
 	timeout := url.GetParamDuration(constant.TIMEOUT_KEY, rt)
-	key := url.GetParam(constant.BEAN_NAME_KEY, "")
-	consumerService := config.GetConsumerService(key)
+	// for triple pb serialization. The bean name from provider is the provider reference key,
+	// which can't locate the target consumer stub, so we use interface key..
+	interfaceKey := url.GetParam(constant.INTERFACE_KEY, "")
+	consumerService := config.GetConsumerServiceByInterfaceName(interfaceKey)
 
 	dubboSerializaerType := url.GetParam(constant.SERIALIZATION_KEY, constant.PROTOBUF_SERIALIZATION)
 	triCodecType := tripleConstant.CodecType(dubboSerializaerType)
diff --git a/protocol/dubbo3/internal/client.go b/protocol/dubbo3/internal/client.go
index a182054..06f9f04 100644
--- a/protocol/dubbo3/internal/client.go
+++ b/protocol/dubbo3/internal/client.go
@@ -22,5 +22,7 @@ import (
 )
 
 func init() {
+	// for pb client
+	config.SetConsumerServiceByInterfaceName("org.apache.dubbo.DubboGreeterImpl", &GreeterClientImpl{})
 	config.SetConsumerService(&GreeterClientImpl{})
 }
diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go
index 60a51dd..ab8c0db 100644
--- a/protocol/grpc/client.go
+++ b/protocol/grpc/client.go
@@ -83,8 +83,8 @@ func NewClient(url *common.URL) (*Client, error) {
 		return nil, err
 	}
 
-	key := url.GetParam(constant.BEAN_NAME_KEY, "")
-	impl := config.GetConsumerService(key)
+	key := url.GetParam(constant.INTERFACE_KEY, "")
+	impl := config.GetConsumerServiceByInterfaceName(key)
 	invoker := getInvoker(impl, conn)
 
 	return &Client{
diff --git a/protocol/grpc/grpc_invoker_test.go b/protocol/grpc/grpc_invoker_test.go
index 273f702..b936f3f 100644
--- a/protocol/grpc/grpc_invoker_test.go
+++ b/protocol/grpc/grpc_invoker_test.go
@@ -42,7 +42,7 @@ const (
 		"&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!"
 
 	routeguideURL = "grpc://127.0.0.1:30000/routeGuideImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=routeGuideImpl" +
-		"&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter" +
+		"&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24RouteGuide" +
 		"&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=" +
 		"&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown&registry.role=3&remote.timestamp=1576923717&retries=" +
 		"&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider&timestamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!"
diff --git a/protocol/grpc/internal/helloworld/client.go b/protocol/grpc/internal/helloworld/client.go
index ec45dd5..444424d 100644
--- a/protocol/grpc/internal/helloworld/client.go
+++ b/protocol/grpc/internal/helloworld/client.go
@@ -30,6 +30,7 @@ import (
 )
 
 func init() {
+	config.SetConsumerServiceByInterfaceName("io.grpc.examples.helloworld.GreeterGrpc$IGreeter", &GrpcGreeterImpl{})
 	config.SetConsumerService(&GrpcGreeterImpl{})
 }
 
diff --git a/protocol/grpc/internal/routeguide/client.go b/protocol/grpc/internal/routeguide/client.go
index 5f79ef0..dd870b7 100644
--- a/protocol/grpc/internal/routeguide/client.go
+++ b/protocol/grpc/internal/routeguide/client.go
@@ -29,6 +29,7 @@ import (
 )
 
 func init() {
+	config.SetConsumerServiceByInterfaceName("io.grpc.examples.helloworld.GreeterGrpc$RouteGuide", &RouteGuideClientImpl{})
 	config.SetConsumerService(&RouteGuideClientImpl{})
 }