You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/09/08 03:16:49 UTC
[dubbo-go] branch config-enhance updated: Add get consumer service
by InterfaceName api. (#1443)
This is an automated email from the ASF dual-hosted git repository.
laurence pushed a commit to branch config-enhance
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/config-enhance by this push:
new f556336 Add get consumer service by InterfaceName api. (#1443)
f556336 is described below
commit f556336c4ba6cb21c4140437073219f444badf1a
Author: Laurence <45...@users.noreply.github.com>
AuthorDate: Wed Sep 8 11:15:51 2021 +0800
Add get consumer service by InterfaceName api. (#1443)
* fix: add get dynamic configuration api
* fix: change zk config center data to base64
* fix: add ConsumerServiceByInterfaceName
* fix: ut
* fix: add lock
* fix: fix grpc ut
* fix: add comment
* fix: change comment
---
config/reference_config.go | 3 ++-
config/service.go | 38 +++++++++++++++++++++++------
protocol/dubbo3/dubbo3_invoker.go | 6 +++--
protocol/dubbo3/internal/client.go | 2 ++
protocol/grpc/client.go | 4 +--
protocol/grpc/grpc_invoker_test.go | 2 +-
protocol/grpc/internal/helloworld/client.go | 1 +
protocol/grpc/internal/routeguide/client.go | 1 +
8 files changed, 43 insertions(+), 14 deletions(-)
diff --git a/config/reference_config.go b/config/reference_config.go
index a4142bc..58d860d 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -94,7 +94,7 @@ func (cc *ReferenceConfig) Init(rc *RootConfig) error {
}
// Refer ...
-func (rc *ReferenceConfig) Refer(_ interface{}) {
+func (rc *ReferenceConfig) Refer(srv interface{}) {
cfgURL := common.NewURLWithOptions(
common.WithPath(rc.InterfaceName),
common.WithProtocol(rc.Protocol),
@@ -102,6 +102,7 @@ func (rc *ReferenceConfig) Refer(_ interface{}) {
common.WithParamsValue(constant.BEAN_NAME_KEY, rc.id),
common.WithParamsValue(constant.METADATATYPE_KEY, rc.metaDataType),
)
+ SetConsumerServiceByInterfaceName(rc.InterfaceName, srv)
if rc.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
diff --git a/config/service.go b/config/service.go
index 8786867..0f039fd 100644
--- a/config/service.go
+++ b/config/service.go
@@ -19,43 +19,65 @@ package config
import (
"dubbo.apache.org/dubbo-go/v3/common"
+ "sync"
)
var (
- conServices = map[string]common.RPCService{} // service name -> service
- proServices = map[string]common.RPCService{} // service name -> service
+ // conServicesLock is used to guard conServices map.
+ conServicesLock = sync.Mutex{}
+ conServices = map[string]common.RPCService{} // service name -> service
+
+ // proServicesLock is used to guard proServices map
+ proServicesLock = sync.Mutex{}
+ proServices = map[string]common.RPCService{} // service name -> service
+
+ // interfaceNameConServicesLock is used to guard interfaceNameConServices map
+ interfaceNameConServicesLock = sync.Mutex{}
+ interfaceNameConServices = map[string]common.RPCService{} // interfaceName -> service
)
// SetConsumerService is called by init() of implement of RPCService
func SetConsumerService(service common.RPCService) {
ref := common.GetReference(service)
+ conServicesLock.Lock()
+ defer conServicesLock.Unlock()
conServices[ref] = service
}
// SetProviderService is called by init() of implement of RPCService
func SetProviderService(service common.RPCService) {
ref := common.GetReference(service)
+ proServicesLock.Lock()
+ defer proServicesLock.Unlock()
proServices[ref] = service
}
// GetConsumerService gets ConsumerService by @name
func GetConsumerService(name string) common.RPCService {
+ conServicesLock.Lock()
+ defer conServicesLock.Unlock()
return conServices[name]
}
// GetProviderService gets ProviderService by @name
func GetProviderService(name string) common.RPCService {
+ proServicesLock.Lock()
+ defer proServicesLock.Unlock()
return proServices[name]
}
-// GetAllProviderService gets all ProviderService
-func GetAllProviderService() map[string]common.RPCService {
- return proServices
+// SetConsumerServiceByInterfaceName is used by pb serialization
+func SetConsumerServiceByInterfaceName(interfaceName string, srv common.RPCService) {
+ interfaceNameConServicesLock.Lock()
+ defer interfaceNameConServicesLock.Unlock()
+ interfaceNameConServices[interfaceName] = srv
}
-// GetAllConsumerService gets all ConsumerService
-func GetAllConsumerService() map[string]common.RPCService {
- return conServices
+// GetConsumerServiceByInterfaceName is used by pb serialization
+func GetConsumerServiceByInterfaceName(interfaceName string) common.RPCService {
+ interfaceNameConServicesLock.Lock()
+ defer interfaceNameConServicesLock.Unlock()
+ return interfaceNameConServices[interfaceName]
}
// GetCallback gets CallbackResponse by @name
diff --git a/protocol/dubbo3/dubbo3_invoker.go b/protocol/dubbo3/dubbo3_invoker.go
index e2eb80d..a55ee54 100644
--- a/protocol/dubbo3/dubbo3_invoker.go
+++ b/protocol/dubbo3/dubbo3_invoker.go
@@ -59,8 +59,10 @@ func NewDubboInvoker(url *common.URL) (*DubboInvoker, error) {
rt := config.GetConsumerConfig().RequestTimeout
timeout := url.GetParamDuration(constant.TIMEOUT_KEY, rt)
- key := url.GetParam(constant.BEAN_NAME_KEY, "")
- consumerService := config.GetConsumerService(key)
+ // for triple pb serialization. The bean name from provider is the provider reference key,
+ // which can't locate the target consumer stub, so we use interface key..
+ interfaceKey := url.GetParam(constant.INTERFACE_KEY, "")
+ consumerService := config.GetConsumerServiceByInterfaceName(interfaceKey)
dubboSerializaerType := url.GetParam(constant.SERIALIZATION_KEY, constant.PROTOBUF_SERIALIZATION)
triCodecType := tripleConstant.CodecType(dubboSerializaerType)
diff --git a/protocol/dubbo3/internal/client.go b/protocol/dubbo3/internal/client.go
index a182054..06f9f04 100644
--- a/protocol/dubbo3/internal/client.go
+++ b/protocol/dubbo3/internal/client.go
@@ -22,5 +22,7 @@ import (
)
func init() {
+ // for pb client
+ config.SetConsumerServiceByInterfaceName("org.apache.dubbo.DubboGreeterImpl", &GreeterClientImpl{})
config.SetConsumerService(&GreeterClientImpl{})
}
diff --git a/protocol/grpc/client.go b/protocol/grpc/client.go
index 60a51dd..ab8c0db 100644
--- a/protocol/grpc/client.go
+++ b/protocol/grpc/client.go
@@ -83,8 +83,8 @@ func NewClient(url *common.URL) (*Client, error) {
return nil, err
}
- key := url.GetParam(constant.BEAN_NAME_KEY, "")
- impl := config.GetConsumerService(key)
+ key := url.GetParam(constant.INTERFACE_KEY, "")
+ impl := config.GetConsumerServiceByInterfaceName(key)
invoker := getInvoker(impl, conn)
return &Client{
diff --git a/protocol/grpc/grpc_invoker_test.go b/protocol/grpc/grpc_invoker_test.go
index 273f702..b936f3f 100644
--- a/protocol/grpc/grpc_invoker_test.go
+++ b/protocol/grpc/grpc_invoker_test.go
@@ -42,7 +42,7 @@ const (
"&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider×tamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!"
routeguideURL = "grpc://127.0.0.1:30000/routeGuideImpl?accesslog=&anyhost=true&app.version=0.0.1&application=BDTService&async=false&bean.name=routeGuideImpl" +
- "&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24IGreeter" +
+ "&category=providers&cluster=failover&dubbo=dubbo-provider-golang-2.6.0&environment=dev&execute.limit=&execute.limit.rejected.handler=&generic=false&group=&interface=io.grpc.examples.helloworld.GreeterGrpc%24RouteGuide" +
"&ip=192.168.1.106&loadbalance=random&methods.SayHello.loadbalance=random&methods.SayHello.retries=1&methods.SayHello.tps.limit.interval=&methods.SayHello.tps.limit.rate=&methods.SayHello.tps.limit.strategy=" +
"&methods.SayHello.weight=0&module=dubbogo+say-hello+client&name=BDTService&organization=ikurento.com&owner=ZX&pid=49427&reference.filter=cshutdown®istry.role=3&remote.timestamp=1576923717&retries=" +
"&service.filter=echo%2Ctoken%2Caccesslog%2Ctps%2Cexecute%2Cpshutdown&side=provider×tamp=1576923740&tps.limit.interval=&tps.limit.rate=&tps.limit.rejected.handler=&tps.limit.strategy=&tps.limiter=&version=&warmup=100!"
diff --git a/protocol/grpc/internal/helloworld/client.go b/protocol/grpc/internal/helloworld/client.go
index ec45dd5..444424d 100644
--- a/protocol/grpc/internal/helloworld/client.go
+++ b/protocol/grpc/internal/helloworld/client.go
@@ -30,6 +30,7 @@ import (
)
func init() {
+ config.SetConsumerServiceByInterfaceName("io.grpc.examples.helloworld.GreeterGrpc$IGreeter", &GrpcGreeterImpl{})
config.SetConsumerService(&GrpcGreeterImpl{})
}
diff --git a/protocol/grpc/internal/routeguide/client.go b/protocol/grpc/internal/routeguide/client.go
index 5f79ef0..dd870b7 100644
--- a/protocol/grpc/internal/routeguide/client.go
+++ b/protocol/grpc/internal/routeguide/client.go
@@ -29,6 +29,7 @@ import (
)
func init() {
+ config.SetConsumerServiceByInterfaceName("io.grpc.examples.helloworld.GreeterGrpc$RouteGuide", &RouteGuideClientImpl{})
config.SetConsumerService(&RouteGuideClientImpl{})
}