You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/05/26 13:36:30 UTC
[dubbo-go] branch 3.0 updated: Enhancements: Make 'group' and 'version' flag of app level works. (#1880)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 1e872372f Enhancements: Make 'group' and 'version' flag of app level works. (#1880)
1e872372f is described below
commit 1e872372f844e5833cfda51e2aa3c89dfd5d2a90
Author: Laurence <45...@users.noreply.github.com>
AuthorDate: Thu May 26 21:36:25 2022 +0800
Enhancements: Make 'group' and 'version' flag of app level works. (#1880)
* Fix: reflection and auto config service
* fix: add xds sniffing logs
* fix: add xds timeout to 10s
* fix: add debug info
* Fix: large xds timeout
* fix: add xds logs
* Fix: add xds wrapper client api
* fix: update get metadata logic
* Fix: enhance xds client api
* Fix: add license
* fix: typo
* Fix: not register yaml tag
* Fix: add default protocol
* fix: logs
---
common/constant/key.go | 6 +
config/application_config.go | 5 +-
config/config_utils.go | 4 +-
config/consumer_config.go | 41 ++++---
config/custom_config_test.go | 2 +-
config/provider_config.go | 54 ++++++---
config/reference_config.go | 15 ++-
config/service.go | 10 ++
config/service_config.go | 25 ++++-
config_center/nacos/impl_test.go | 10 +-
.../triple_reflection_v1alpha/reflection.pb.go | 8 +-
.../reflection_triple.pb.go | 19 +++-
registry/xds/registry.go | 19 +++-
remoting/xds/client.go | 63 +++++++++--
remoting/xds/client_test.go | 57 ++++++++--
remoting/xds/{error.go => config.go} | 17 ++-
remoting/xds/error.go | 4 +-
remoting/xds/mapping/handler.go | 31 +++++-
remoting/xds/mapping/handler_test.go | 6 +-
remoting/xds/mapping/mocks/InterfaceMapHandler.go | 21 ++++
remoting/xds/mocks/client.go | 5 +
test/xds/main.go | 121 ---------------------
22 files changed, 326 insertions(+), 217 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index 6bb6365df..7cea915e7 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -367,3 +367,9 @@ const (
AdaptiveServiceEnabledKey = "adaptive-service.enabled"
AdaptiveServiceIsEnabled = "1"
)
+
+// reflection service
+const (
+ ReflectionServiceTypeName = "XXX_serverReflectionServer"
+ ReflectionServiceInterface = "grpc.reflection.v1alpha.ServerReflection"
+)
diff --git a/config/application_config.go b/config/application_config.go
index 1520e8e4f..13e00d5d1 100644
--- a/config/application_config.go
+++ b/config/application_config.go
@@ -32,9 +32,10 @@ type ApplicationConfig struct {
Organization string `default:"dubbo-go" yaml:"organization" json:"organization,omitempty" property:"organization"`
Name string `default:"dubbo.io" yaml:"name" json:"name,omitempty" property:"name"`
Module string `default:"sample" yaml:"module" json:"module,omitempty" property:"module"`
- Version string `default:"3.0.0" yaml:"version" json:"version,omitempty" property:"version"`
+ Group string `yaml:"group" json:"group,omitempty" property:"module"`
+ Version string `yaml:"version" json:"version,omitempty" property:"version"`
Owner string `default:"dubbo-go" yaml:"owner" json:"owner,omitempty" property:"owner"`
- Environment string `default:"dev" yaml:"environment" json:"environment,omitempty" property:"environment"`
+ Environment string `yaml:"environment" json:"environment,omitempty" property:"environment"`
// the metadata type. remote or local
MetadataType string `default:"local" yaml:"metadata-type" json:"metadataType,omitempty" property:"metadataType"`
}
diff --git a/config/config_utils.go b/config/config_utils.go
index be1cf9be5..f08d1b901 100644
--- a/config/config_utils.go
+++ b/config/config_utils.go
@@ -90,8 +90,8 @@ func removeDuplicateElement(items []string) []string {
return result
}
-// translateRegistryIds string "nacos,zk" => ["nacos","zk"]
-func translateRegistryIds(registryIds []string) []string {
+// translateIds string "nacos,zk" => ["nacos","zk"]
+func translateIds(registryIds []string) []string {
ids := make([]string, 0)
for _, id := range registryIds {
diff --git a/config/consumer_config.go b/config/consumer_config.go
index 4badfcd25..af04775d7 100644
--- a/config/consumer_config.go
+++ b/config/consumer_config.go
@@ -24,13 +24,14 @@ import (
import (
"github.com/creasty/defaults"
+
+ tripleConstant "github.com/dubbogo/triple/pkg/common/constant"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/logger"
- "dubbo.apache.org/dubbo-go/v3/config/generic"
)
const (
@@ -41,6 +42,7 @@ const (
type ConsumerConfig struct {
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"`
+ Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
RequestTimeout string `default:"3s" yaml:"request-timeout" json:"request-timeout,omitempty" property:"request-timeout"`
ProxyFactory string `default:"default" yaml:"proxy" json:"proxy,omitempty" property:"proxy"`
Check bool `yaml:"check" json:"check,omitempty" property:"check"`
@@ -61,7 +63,7 @@ func (cc *ConsumerConfig) Init(rc *RootConfig) error {
if cc == nil {
return nil
}
- cc.RegistryIDs = translateRegistryIds(cc.RegistryIDs)
+ cc.RegistryIDs = translateIds(cc.RegistryIDs)
if len(cc.RegistryIDs) <= 0 {
cc.RegistryIDs = rc.getRegistryIds()
}
@@ -103,19 +105,30 @@ func (cc *ConsumerConfig) Init(rc *RootConfig) error {
}
func (cc *ConsumerConfig) Load() {
- for key, ref := range cc.References {
- if ref.Generic != "" {
- genericService := generic.NewGenericService(key)
- SetConsumerService(genericService)
- }
- rpcService := GetConsumerService(key)
- if rpcService == nil {
- logger.Warnf("%s does not exist!", key)
- continue
+ for registeredTypeName, refRPCService := range GetConsumerServiceMap() {
+ refConfig, ok := cc.References[registeredTypeName]
+ if !ok {
+ // not found configuration, now new a configuration with default.
+ refConfig = NewReferenceConfigBuilder().SetProtocol(tripleConstant.TRIPLE).Build()
+ triplePBService, ok := refRPCService.(common.TriplePBService)
+ if !ok {
+ logger.Errorf("Dubbo-go cannot get interface name with registeredTypeName = %s."+
+ "Please run the command 'go install github.com/dubbogo/tools/cmd/protoc-gen-go-triple@latest' to get the latest "+
+ "protoc-gen-go-triple, and then re-generate your pb file again by this tool."+
+ "If you are not using pb serialization, please set 'interfaceName' field in reference config to let dubbogo get the interface name.", registeredTypeName)
+ continue
+ } else {
+ // use interface name defined by pb
+ refConfig.InterfaceName = triplePBService.XXX_InterfaceName()
+ }
+ if err := refConfig.Init(rootConfig); err != nil {
+ logger.Errorf(fmt.Sprintf("reference with registeredTypeName = %s init failed! err: %#v", registeredTypeName, err))
+ continue
+ }
}
- ref.id = key
- ref.Refer(rpcService)
- ref.Implement(rpcService)
+ refConfig.id = registeredTypeName
+ refConfig.Refer(refRPCService)
+ refConfig.Implement(refRPCService)
}
var maxWait int
diff --git a/config/custom_config_test.go b/config/custom_config_test.go
index 72193a244..ccf3c58eb 100644
--- a/config/custom_config_test.go
+++ b/config/custom_config_test.go
@@ -79,7 +79,7 @@ func TestConfigUtils(t *testing.T) {
assert.Equal(t, id, strings.Join([]string{constant.RegistryConfigPrefix, "nacos", "127.0.0.1:8848"}, "-"))
- ids := translateRegistryIds([]string{"nacos,zk"})
+ ids := translateIds([]string{"nacos,zk"})
assert.Equal(t, ids[0], "nacos")
assert.Equal(t, ids[1], "zk")
diff --git a/config/provider_config.go b/config/provider_config.go
index cc7212042..0c024947c 100644
--- a/config/provider_config.go
+++ b/config/provider_config.go
@@ -43,6 +43,8 @@ type ProviderConfig struct {
Register bool `yaml:"register" json:"register" property:"register"`
// RegistryIDs is registry ids list
RegistryIDs []string `yaml:"registry-ids" json:"registry-ids" property:"registry-ids"`
+ // protocol
+ ProtocolIDs []string `yaml:"protocol-ids" json:"protocol-ids" property:"protocol-ids"`
// TracingKey is tracing ids list
TracingKey string `yaml:"tracing-key" json:"tracing-key" property:"tracing-key"`
// Services services
@@ -72,10 +74,12 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
if c == nil {
return nil
}
- c.RegistryIDs = translateRegistryIds(c.RegistryIDs)
+ c.RegistryIDs = translateIds(c.RegistryIDs)
if len(c.RegistryIDs) <= 0 {
c.RegistryIDs = rc.getRegistryIds()
}
+ c.ProtocolIDs = translateIds(c.ProtocolIDs)
+
if c.TracingKey == "" && len(rc.Tracing) > 0 {
for k, _ := range rc.Tracing {
c.TracingKey = k
@@ -107,14 +111,16 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
for k, v := range rc.Protocols {
if v.Name == tripleConstant.TRIPLE {
+ // Auto create reflection service configure only when provider with triple service is configured.
tripleReflectionService := NewServiceConfigBuilder().
SetProtocolIDs(k).
- SetInterface("grpc.reflection.v1alpha.ServerReflection").
+ SetNotRegister(true).
+ SetInterface(constant.ReflectionServiceInterface).
Build()
if err := tripleReflectionService.Init(rc); err != nil {
return err
}
- c.Services["XXX_serverReflectionServer"] = tripleReflectionService
+ c.Services[constant.ReflectionServiceTypeName] = tripleReflectionService
}
}
@@ -135,21 +141,38 @@ func (c *ProviderConfig) Init(rc *RootConfig) error {
}
func (c *ProviderConfig) Load() {
- for key, svs := range c.Services {
- rpcService := GetProviderService(key)
- if rpcService == nil {
- logger.Warnf("Service reference key %s does not exist, please check if this key "+
- "matches your provider struct type name, or matches the returned valued of your provider struct's Reference() function."+
- "View https://www.yuque.com/u772707/eqpff0/pqfgz3#zxdw0 for details", key)
- continue
+ for registeredTypeName, service := range GetProviderServiceMap() {
+ serviceConfig, ok := c.Services[registeredTypeName]
+ if !ok {
+ if registeredTypeName == constant.ReflectionServiceTypeName {
+ // do not auto generate reflection server's configuration.
+ continue
+ }
+ // service doesn't config in config file, create one with default
+ logger.Warnf("Dubbogo can not find service with registeredTypeName %s in configuration. Use the default configuration instead.", registeredTypeName)
+ supportPBPackagerNameSerivce, ok := service.(common.TriplePBService)
+ serviceConfig = NewServiceConfigBuilder().Build()
+ if !ok {
+ logger.Errorf("Dubbogo do not read service interface name with registeredTypeName = %s."+
+ "Please run go install github.com/dubbogo/tools/cmd/protoc-gen-go-triple@latest to update your "+
+ "protoc-gen-go-triple and re-generate your pb file again."+
+ "If you are not using pb serialization, please set 'interface' field in service config.", registeredTypeName)
+ continue
+ } else {
+ // use interface name defined by pb
+ serviceConfig.Interface = supportPBPackagerNameSerivce.XXX_InterfaceName()
+ }
+ if err := serviceConfig.Init(rootConfig); err != nil {
+ logger.Errorf("Service with refKey = %s init failed with error = %s")
+ }
+ serviceConfig.adaptiveService = c.AdaptiveService
}
- svs.id = key
- svs.Implement(rpcService)
- if err := svs.Export(); err != nil {
- logger.Errorf(fmt.Sprintf("service %s export failed! err: %#v", key, err))
+ serviceConfig.id = registeredTypeName
+ serviceConfig.Implement(service)
+ if err := serviceConfig.Export(); err != nil {
+ logger.Errorf(fmt.Sprintf("service with registeredTypeName = %s export failed! err: %#v", registeredTypeName, err))
}
}
-
}
// newEmptyProviderConfig returns ProviderConfig with default ApplicationConfig
@@ -157,6 +180,7 @@ func newEmptyProviderConfig() *ProviderConfig {
newProviderConfig := &ProviderConfig{
Services: make(map[string]*ServiceConfig),
RegistryIDs: make([]string, 8),
+ ProtocolIDs: make([]string, 8),
}
return newProviderConfig
}
diff --git a/config/reference_config.go b/config/reference_config.go
index af8f48d8d..4c443f7e4 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -49,7 +49,7 @@ type ReferenceConfig struct {
Check *bool `yaml:"check" json:"check,omitempty" property:"check"`
URL string `yaml:"url" json:"url,omitempty" property:"url"`
Filter string `yaml:"filter" json:"filter,omitempty" property:"filter"`
- Protocol string `default:"tri" yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
+ Protocol string `yaml:"protocol" json:"protocol,omitempty" property:"protocol"`
RegistryIDs []string `yaml:"registry-ids" json:"registry-ids,omitempty" property:"registry-ids"`
Cluster string `yaml:"cluster" json:"cluster,omitempty" property:"cluster"`
Loadbalance string `yaml:"loadbalance" json:"loadbalance,omitempty" property:"loadbalance"`
@@ -89,14 +89,25 @@ func (rc *ReferenceConfig) Init(root *RootConfig) error {
rc.rootConfig = root
if root.Application != nil {
rc.metaDataType = root.Application.MetadataType
+ if rc.Group == "" {
+ rc.Group = root.Application.Group
+ }
+ if rc.Version == "" {
+ rc.Version = root.Application.Version
+ }
}
if rc.Cluster == "" {
rc.Cluster = "failover"
}
- rc.RegistryIDs = translateRegistryIds(rc.RegistryIDs)
+ rc.RegistryIDs = translateIds(rc.RegistryIDs)
if len(rc.RegistryIDs) <= 0 {
rc.RegistryIDs = root.Consumer.RegistryIDs
}
+
+ if rc.Protocol == "" {
+ rc.Protocol = root.Consumer.Protocol
+ }
+
if rc.TracingKey == "" {
rc.TracingKey = root.Consumer.TracingKey
}
diff --git a/config/service.go b/config/service.go
index 03aa99c5c..723847fca 100644
--- a/config/service.go
+++ b/config/service.go
@@ -64,6 +64,16 @@ func GetProviderService(name string) common.RPCService {
return proServices[name]
}
+// GetProviderServiceMap gets ProviderServiceMap
+func GetProviderServiceMap() map[string]common.RPCService {
+ return proServices
+}
+
+// GetConsumerServiceMap gets ProviderServiceMap
+func GetConsumerServiceMap() map[string]common.RPCService {
+ return conServices
+}
+
// SetConsumerServiceByInterfaceName is used by pb serialization
func SetConsumerServiceByInterfaceName(interfaceName string, srv common.RPCService) {
interfaceNameConServicesLock.Lock()
diff --git a/config/service_config.go b/config/service_config.go
index 516736fe4..9627f2b91 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -73,6 +73,7 @@ type ServiceConfig struct {
ExecuteLimit string `yaml:"execute.limit" json:"execute.limit,omitempty" property:"execute.limit"`
ExecuteLimitRejectedHandler string `yaml:"execute.limit.rejected.handler" json:"execute.limit.rejected.handler,omitempty" property:"execute.limit.rejected.handler"`
Auth string `yaml:"auth" json:"auth,omitempty" property:"auth"`
+ NotRegister bool `yaml:"not_register" json:"not_register,omitempty" property:"not_register"`
ParamSign string `yaml:"param.sign" json:"param.sign,omitempty" property:"param.sign"`
Tag string `yaml:"tag" json:"tag,omitempty" property:"tag"`
GrpcMaxMessageSize int `default:"4" yaml:"max_message_size" json:"max_message_size,omitempty"`
@@ -108,6 +109,12 @@ func (s *ServiceConfig) Init(rc *RootConfig) error {
}
s.exported = atomic.NewBool(false)
s.metadataType = rc.Application.MetadataType
+ if s.Version == "" {
+ s.Version = rc.Application.Version
+ }
+ if s.Group == "" {
+ s.Group = rc.Application.Group
+ }
s.unexported = atomic.NewBool(false)
if len(s.RCRegistriesMap) == 0 {
s.RCRegistriesMap = rc.Registries
@@ -118,10 +125,15 @@ func (s *ServiceConfig) Init(rc *RootConfig) error {
if rc.Provider != nil {
s.ProxyFactoryKey = rc.Provider.ProxyFactory
}
- s.RegistryIDs = translateRegistryIds(s.RegistryIDs)
+ s.RegistryIDs = translateIds(s.RegistryIDs)
if len(s.RegistryIDs) <= 0 {
s.RegistryIDs = rc.Provider.RegistryIDs
}
+
+ s.ProtocolIDs = translateIds(s.ProtocolIDs)
+ if len(s.ProtocolIDs) <= 0 {
+ s.ProtocolIDs = rc.Provider.ProtocolIDs
+ }
if len(s.ProtocolIDs) <= 0 {
for k, _ := range rc.Protocols {
s.ProtocolIDs = append(s.ProtocolIDs, k)
@@ -222,7 +234,11 @@ func (s *ServiceConfig) Export() error {
return nil
}
- regUrls := loadRegistries(s.RegistryIDs, s.RCRegistriesMap, common.PROVIDER)
+ regUrls := make([]*common.URL, 0)
+ if !s.NotRegister {
+ regUrls = loadRegistries(s.RegistryIDs, s.RCRegistriesMap, common.PROVIDER)
+ }
+
urlMap := s.getUrlMap()
protocolConfigs := loadProtocol(s.ProtocolIDs, s.RCProtocolsMap)
if len(protocolConfigs) == 0 {
@@ -595,6 +611,11 @@ func (pcb *ServiceConfigBuilder) SetServiceID(id string) *ServiceConfigBuilder {
return pcb
}
+func (pcb *ServiceConfigBuilder) SetNotRegister(notRegister bool) *ServiceConfigBuilder {
+ pcb.serviceConfig.NotRegister = notRegister
+ return pcb
+}
+
func (pcb *ServiceConfigBuilder) Build() *ServiceConfig {
return pcb.serviceConfig
}
diff --git a/config_center/nacos/impl_test.go b/config_center/nacos/impl_test.go
index 209fbaf45..8d7a08aaa 100644
--- a/config_center/nacos/impl_test.go
+++ b/config_center/nacos/impl_test.go
@@ -20,7 +20,6 @@ package nacos
import (
"reflect"
- "sync"
"testing"
)
@@ -156,11 +155,8 @@ type fields struct {
BaseDynamicConfiguration config_center.BaseDynamicConfiguration
url *common.URL
rootPath string
- wg sync.WaitGroup
- cltLock sync.Mutex
done chan struct{}
client *nacosClient.NacosConfigClient
- keyListeners sync.Map
parser parser.ConfigurationParser
}
type args struct {
@@ -169,7 +165,7 @@ type args struct {
value string
}
-func newnNacosDynamicConfiguration(f fields) *nacosDynamicConfiguration {
+func newnNacosDynamicConfiguration(f *fields) *nacosDynamicConfiguration {
return &nacosDynamicConfiguration{
BaseDynamicConfiguration: f.BaseDynamicConfiguration,
url: f.url,
@@ -208,7 +204,7 @@ func Test_nacosDynamicConfiguration_PublishConfig(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- n := newnNacosDynamicConfiguration(tt.fields)
+ n := newnNacosDynamicConfiguration(&tt.fields)
if err := n.PublishConfig(tt.args.key, tt.args.group, tt.args.value); (err != nil) != tt.wantErr {
t.Errorf("PublishConfig() error = %v, wantErr %v", err, tt.wantErr)
}
@@ -253,7 +249,7 @@ func Test_nacosDynamicConfiguration_GetConfigKeysByGroup(t *testing.T) {
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- n := newnNacosDynamicConfiguration(tt.fields)
+ n := newnNacosDynamicConfiguration(&tt.fields)
got, err := n.GetConfigKeysByGroup(tt.args.group)
if (err != nil) != tt.wantErr {
t.Errorf("GetConfigKeysByGroup() error = %v, wantErr %v", err, tt.wantErr)
diff --git a/protocol/dubbo3/reflection/triple_reflection_v1alpha/reflection.pb.go b/protocol/dubbo3/reflection/triple_reflection_v1alpha/reflection.pb.go
index 334a88b17..7ad066379 100644
--- a/protocol/dubbo3/reflection/triple_reflection_v1alpha/reflection.pb.go
+++ b/protocol/dubbo3/reflection/triple_reflection_v1alpha/reflection.pb.go
@@ -19,7 +19,7 @@
// Code generated by protoc-gen-go. DO NOT EDIT.
// versions:
-// protoc-gen-go v1.25.0
+// protoc-gen-go v1.26.0
// protoc v3.14.0
// source: reflection.proto
@@ -31,8 +31,6 @@ import (
)
import (
- proto "github.com/golang/protobuf/proto"
-
protoreflect "google.golang.org/protobuf/reflect/protoreflect"
protoimpl "google.golang.org/protobuf/runtime/protoimpl"
@@ -45,10 +43,6 @@ const (
_ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20)
)
-// This is a compile-time assertion that a sufficiently up-to-date version
-// of the legacy proto package is being used.
-const _ = proto.ProtoPackageIsVersion4
-
// The message sent by the client when calling ServerReflectionInfo method.
type ServerReflectionRequest struct {
state protoimpl.MessageState
diff --git a/protocol/dubbo3/reflection/triple_reflection_v1alpha/reflection_triple.pb.go b/protocol/dubbo3/reflection/triple_reflection_v1alpha/reflection_triple.pb.go
index 97edf8f1d..d85ad7661 100644
--- a/protocol/dubbo3/reflection/triple_reflection_v1alpha/reflection_triple.pb.go
+++ b/protocol/dubbo3/reflection/triple_reflection_v1alpha/reflection_triple.pb.go
@@ -17,7 +17,7 @@
// Code generated by protoc-gen-go-triple. DO NOT EDIT.
// versions:
-// - protoc-gen-go-triple v1.0.2
+// - protoc-gen-go-triple v1.0.8
// - protoc v3.14.0
// source: reflection.proto
@@ -31,6 +31,7 @@ import (
import (
grpc_go "github.com/dubbogo/grpc-go"
codes "github.com/dubbogo/grpc-go/codes"
+ metadata "github.com/dubbogo/grpc-go/metadata"
status "github.com/dubbogo/grpc-go/status"
constant "github.com/dubbogo/triple/pkg/common/constant"
@@ -38,6 +39,7 @@ import (
)
import (
+ constant1 "dubbo.apache.org/dubbo-go/v3/common/constant"
protocol "dubbo.apache.org/dubbo-go/v3/protocol"
dubbo3 "dubbo.apache.org/dubbo-go/v3/protocol/dubbo3"
invocation "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
@@ -68,6 +70,10 @@ func (c *ServerReflectionClientImpl) GetDubboStub(cc *triple.TripleConn) ServerR
return NewServerReflectionClient(cc)
}
+func (c *ServerReflectionClientImpl) XXX_InterfaceName() string {
+ return "triple.reflection.v1alpha.ServerReflection"
+}
+
func NewServerReflectionClient(cc *triple.TripleConn) ServerReflectionClient {
return &serverReflectionClient{cc}
}
@@ -133,6 +139,10 @@ func (s *UnimplementedServerReflectionServer) XXX_GetProxyImpl() protocol.Invoke
func (s *UnimplementedServerReflectionServer) XXX_ServiceDesc() *grpc_go.ServiceDesc {
return &ServerReflection_ServiceDesc
}
+func (s *UnimplementedServerReflectionServer) XXX_InterfaceName() string {
+ return "triple.reflection.v1alpha.ServerReflection"
+}
+
func (UnimplementedServerReflectionServer) mustEmbedUnimplementedServerReflectionServer() {}
// UnsafeServerReflectionServer may be embedded to opt out of forward compatibility for this service.
@@ -148,6 +158,13 @@ func RegisterServerReflectionServer(s grpc_go.ServiceRegistrar, srv ServerReflec
func _ServerReflection_ServerReflectionInfo_Handler(srv interface{}, stream grpc_go.ServerStream) error {
_, ok := srv.(dubbo3.Dubbo3GrpcService)
+ ctx := stream.Context()
+ md, _ := metadata.FromIncomingContext(ctx)
+ invAttachment := make(map[string]interface{}, len(md))
+ for k, v := range md {
+ invAttachment[k] = v
+ }
+ stream.(grpc_go.CtxSetterStream).SetContext(context.WithValue(ctx, constant1.AttachmentKey, invAttachment))
invo := invocation.NewRPCInvocation("ServerReflectionInfo", nil, nil)
if !ok {
fmt.Println(invo)
diff --git a/registry/xds/registry.go b/registry/xds/registry.go
index f6d91dbb3..b3f71b04f 100644
--- a/registry/xds/registry.go
+++ b/registry/xds/registry.go
@@ -39,6 +39,7 @@ import (
)
var localIP = ""
+var DefaultXDSSniffingTimeoutStr = "5s"
func init() {
localIP = common.GetLocalIp()
@@ -60,24 +61,25 @@ func getCategory(url *common.URL) string {
return category
}
-// getServiceName return serviceName $(providers_or_consumers):$(interfaceName)
+// getServiceName return serviceName $(providers_or_consumers):$(interfaceName):$(version):$(group)
func getServiceName(url *common.URL) string {
var buffer bytes.Buffer
buffer.Write([]byte(getCategory(url)))
appendParam(&buffer, url, constant.InterfaceKey)
+ appendParam(&buffer, url, constant.VersionKey)
+ appendParam(&buffer, url, constant.GroupKey)
return buffer.String()
}
-// getSubscribeName returns subscribeName is providers:$(interfaceName)
+// getSubscribeName returns subscribeName is providers:$(interfaceName):$(version):$(group)
func getSubscribeName(url *common.URL) string {
var buffer bytes.Buffer
buffer.Write([]byte(common.DubboNodes[common.PROVIDER]))
appendParam(&buffer, url, constant.InterfaceKey)
- // We would not append group or version to this name, as istio ecosystem only cares about 'hostname' during cds procedure.
- // The subscribe name is used to find the real hostName.
- // Group or version are managed by traffic policy, not dubbo-go.
+ appendParam(&buffer, url, constant.VersionKey)
+ appendParam(&buffer, url, constant.GroupKey)
return buffer.String()
}
@@ -164,7 +166,12 @@ func newXDSRegistry(url *common.URL) (registry.Registry, error) {
constant.PodNameEnvKey, constant.PodNamespaceEnvKey)
}
- wrappedXDSClient, err := xds.NewXDSWrappedClient(pn, ns, localIP, common2.NewHostNameOrIPAddr(url.Ip+":"+url.Port))
+ wrappedXDSClient, err := xds.NewXDSWrappedClient(xds.Config{
+ PodName: pn,
+ Namespace: ns,
+ LocalIP: localIP,
+ IstioAddr: common2.NewHostNameOrIPAddr(url.Ip + ":" + url.Port),
+ })
if err != nil {
return nil, err
}
diff --git a/remoting/xds/client.go b/remoting/xds/client.go
index 7c23f5743..3cdffc10d 100644
--- a/remoting/xds/client.go
+++ b/remoting/xds/client.go
@@ -29,6 +29,7 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/registry"
xdsCommon "dubbo.apache.org/dubbo-go/v3/remoting/xds/common"
@@ -127,6 +128,11 @@ type WrappedClientImpl struct {
subscribeStopChMap stores subscription stop chan
*/
subscribeStopChMap sync.Map
+
+ /*
+ xdsSniffingTimeout stores xds sniffing timeout duration
+ */
+ xdsSniffingTimeout time.Duration
}
func GetXDSWrappedClient() *WrappedClientImpl {
@@ -134,18 +140,24 @@ func GetXDSWrappedClient() *WrappedClientImpl {
}
// NewXDSWrappedClient create or get singleton xdsWrappedClient
-func NewXDSWrappedClient(podName, namespace, localIP string, istioAddr xdsCommon.HostAddr) (XDSWrapperClient, error) {
+func NewXDSWrappedClient(config Config) (XDSWrapperClient, error) {
// todo @(laurence) safety problem? what if to concurrent 'new' both create new client?
if xdsWrappedClient != nil {
return xdsWrappedClient, nil
}
+ if config.SniffingTimeout == 0 {
+ config.SniffingTimeout, _ = time.ParseDuration(constant.DefaultRegTimeout)
+ }
+ if config.DebugPort == "" {
+ config.DebugPort = "8080"
+ }
// write param
newClient := &WrappedClientImpl{
- podName: podName,
- namespace: namespace,
- localIP: localIP,
- istiodAddr: istioAddr,
+ podName: config.PodName,
+ namespace: config.Namespace,
+ localIP: config.LocalIP,
+ istiodAddr: config.IstioAddr,
rdsMap: make(map[string]resource.RouteConfigUpdate),
cdsMap: make(map[string]resource.ClusterUpdate),
@@ -155,6 +167,8 @@ func NewXDSWrappedClient(podName, namespace, localIP string, istioAddr xdsCommon
cdsUpdateEventChan: make(chan struct{}),
cdsUpdateEventHandlers: make([]func(), 0),
+
+ xdsSniffingTimeout: config.SniffingTimeout,
}
// 1. init xdsclient
@@ -166,7 +180,7 @@ func NewXDSWrappedClient(podName, namespace, localIP string, istioAddr xdsCommon
go newClient.runWatchingCdsUpdateEvent()
// 3. load basic info from istiod and start listening cds
- if err := newClient.startWatchingAllClusterAndLoadLocalHostAddrAndIstioPodIP(); err != nil {
+ if err := newClient.startWatchingAllClusterAndLoadLocalHostAddrAndIstioPodIP(config.LocalDebugMode); err != nil {
return nil, err
}
@@ -174,8 +188,8 @@ func NewXDSWrappedClient(podName, namespace, localIP string, istioAddr xdsCommon
newClient.interfaceMapHandler = mapping.NewInterfaceMapHandlerImpl(
newClient.xdsClient,
defaultIstiodTokenPath,
- xdsCommon.NewHostNameOrIPAddr(newClient.istiodPodIP+":"+defaultIstiodDebugPort),
- newClient.hostAddr)
+ xdsCommon.NewHostNameOrIPAddr(newClient.istiodPodIP+":"+config.DebugPort),
+ newClient.hostAddr, config.LocalDebugMode)
xdsWrappedClient = newClient
return newClient, nil
@@ -186,6 +200,11 @@ func (w *WrappedClientImpl) GetHostAddrByServiceUniqueKey(serviceUniqueKey strin
return w.interfaceMapHandler.GetHostAddrMap(serviceUniqueKey)
}
+// GetDubboGoMetadata get all registered metadata of dubbogo
+func (w *WrappedClientImpl) GetDubboGoMetadata() (map[string]string, error) {
+ return w.interfaceMapHandler.GetDubboGoMetadata()
+}
+
// ChangeInterfaceMap change the map of serviceUniqueKey -> appname, if add is true, register, else unregister
func (w *WrappedClientImpl) ChangeInterfaceMap(serviceUniqueKey string, add bool) error {
if add {
@@ -371,17 +390,19 @@ func (w *WrappedClientImpl) initXDSClient() error {
// 1. start watching all cluster by cds
// 2. discovery local pod's hostAddr by cds and eds
// 3. discovery istiod pod ip by cds and eds
-func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioPodIP() error {
+func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioPodIP(localDebugMode bool) error {
// call watch and refresh istiod debug interface
foundLocalStopCh := make(chan struct{})
foundIstiodStopCh := make(chan struct{})
discoveryFinishedStopCh := make(chan struct{})
// todo timeout configure
- timeoutCh := time.After(time.Second * 3)
+ timeoutCh := time.After(w.xdsSniffingTimeout)
foundLocal := false
foundIstiod := false
var cancel1 func()
var cancel2 func()
+ logger.Infof("[XDS Wrapped Client] Start sniffing with istio hostname = %s, localIp = %s",
+ w.istiodAddr.HostnameOrIP, w.localIP)
// todo @(laurence) here, if istiod is unhealthy, here should be timeout and tell developer.
_ = w.xdsClient.WatchCluster("*", func(update resource.ClusterUpdate, err error) {
@@ -393,6 +414,7 @@ func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioP
w.cdsMapLock.Lock()
defer w.cdsMapLock.Unlock()
delete(w.cdsMap, update.ClusterName[1:])
+ logger.Infof("[XDS Wrapped Client] Delete cluster %s", update.ClusterName)
w.cdsUpdateEventChan <- struct{}{} // send update event
return
}
@@ -404,18 +426,22 @@ func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioP
if foundLocal && foundIstiod {
return
}
+ logger.Infof("[XDS Wrapped Client] Sniffing with cluster name = %s", update.ClusterName)
// only into here during start sniffing istiod/service prcedure
cluster := xdsCommon.NewCluster(update.ClusterName)
if cluster.Addr.HostnameOrIP == w.istiodAddr.HostnameOrIP {
// 1. find istiod podIP
// todo: When would eds level watch be canceled?
+ logger.Info("[XDS Wrapped Client] Sniffing get istiod cluster")
cancel1 = w.xdsClient.WatchEndpoints(update.ClusterName, func(endpoint resource.EndpointsUpdate, err error) {
if foundIstiod {
return
}
+ logger.Infof("[XDS Wrapped Client] Sniffing get istiod endpoint = %+v, localities = %+v", endpoint, endpoint.Localities)
for _, v := range endpoint.Localities {
for _, e := range v.Endpoints {
w.istiodPodIP = xdsCommon.NewHostNameOrIPAddr(e.Address).HostnameOrIP
+ logger.Infof("[XDS Wrapped Client] Sniffing found istiod podIP = %s", w.istiodPodIP)
foundIstiod = true
close(foundIstiodStopCh)
}
@@ -431,6 +457,7 @@ func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioP
}
for _, v := range endpoint.Localities {
for _, e := range v.Endpoints {
+ logger.Infof("[XDS Wrapped Client] Sniffing Found eds endpoint = %+v", e)
if xdsCommon.NewHostNameOrIPAddr(e.Address).HostnameOrIP == w.localIP {
cluster := xdsCommon.NewCluster(update.ClusterName)
w.hostAddr = cluster.Addr
@@ -441,6 +468,17 @@ func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioP
}
})
})
+
+ if localDebugMode {
+ go func() {
+ <-foundIstiodStopCh
+ <-foundLocalStopCh
+ cancel1()
+ cancel2()
+ }()
+ return nil
+ }
+
go func() {
<-foundIstiodStopCh
<-foundLocalStopCh
@@ -454,8 +492,10 @@ func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioP
time.Sleep(time.Second)
cancel1()
cancel2()
+ logger.Infof("[XDS Wrapper Client] Sniffing Finished with host addr = %s, istiod pod ip = %s", w.hostAddr, w.istiodPodIP)
return nil
case <-timeoutCh:
+ logger.Warnf("[XDS Wrapper Client] Sniffing timeout with duration = %v", w.xdsSniffingTimeout)
if cancel1 != nil {
cancel1()
}
@@ -466,7 +506,7 @@ func (w *WrappedClientImpl) startWatchingAllClusterAndLoadLocalHostAddrAndIstioP
case <-foundIstiodStopCh:
return DiscoverLocalError
default:
- return DiscoverIstiodPodError
+ return DiscoverIstiodPodIpError
}
}
}
@@ -528,6 +568,7 @@ type XDSWrapperClient interface {
UnSubscribe(svcUniqueName string)
GetRouterConfig(hostAddr string) resource.RouteConfigUpdate
GetHostAddrByServiceUniqueKey(serviceUniqueKey string) (string, error)
+ GetDubboGoMetadata() (map[string]string, error)
ChangeInterfaceMap(serviceUniqueKey string, add bool) error
GetClusterUpdateIgnoreVersion(hostAddr string) resource.ClusterUpdate
GetHostAddress() xdsCommon.HostAddr
diff --git a/remoting/xds/client_test.go b/remoting/xds/client_test.go
index dea3a9f43..23bd845e8 100644
--- a/remoting/xds/client_test.go
+++ b/remoting/xds/client_test.go
@@ -168,13 +168,18 @@ func testWithDiscoverySuccess(t *testing.T) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
- xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
+ xdsWrappedClient, err := NewXDSWrappedClient(Config{
+ PodName: podNameFoo,
+ Namespace: localNamespaceFoo,
+ LocalIP: localIPFoo,
+ IstioAddr: common.NewHostNameOrIPAddr(istioHostAddrFoo),
+ })
assert.Nil(t, err)
assert.NotNil(t, xdsWrappedClient)
// assert eds cancel is called
assert.Equal(t, int32(2), cancelCalledCounter.Load())
- // discovery p
+ // discovery istiod pod ip
assert.Equal(t, istioIPFoo, xdsWrappedClient.GetIstioPodIP())
address := xdsWrappedClient.GetHostAddress()
assert.Equal(t, localHostAddrFoo, address.String())
@@ -249,8 +254,13 @@ func testFailedWithIstioCDS(t *testing.T) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
- xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
- assert.Equal(t, DiscoverIstiodPodError, err)
+ xdsWrappedClient, err := NewXDSWrappedClient(Config{
+ PodName: podNameFoo,
+ Namespace: localNamespaceFoo,
+ LocalIP: localIPFoo,
+ IstioAddr: common.NewHostNameOrIPAddr(istioHostAddrFoo),
+ })
+ assert.Equal(t, DiscoverIstiodPodIpError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(1), cancelCalledCounter.Load())
}
@@ -324,7 +334,12 @@ func testFailedWithLocalCDS(t *testing.T) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
- xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
+ xdsWrappedClient, err := NewXDSWrappedClient(Config{
+ PodName: podNameFoo,
+ Namespace: localNamespaceFoo,
+ LocalIP: localIPFoo,
+ IstioAddr: common.NewHostNameOrIPAddr(istioHostAddrFoo),
+ })
assert.Equal(t, DiscoverLocalError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(1), cancelCalledCounter.Load())
@@ -399,8 +414,13 @@ func testFailedWithNoneCDS(t *testing.T) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
- xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
- assert.Equal(t, DiscoverIstiodPodError, err)
+ xdsWrappedClient, err := NewXDSWrappedClient(Config{
+ PodName: podNameFoo,
+ Namespace: localNamespaceFoo,
+ LocalIP: localIPFoo,
+ IstioAddr: common.NewHostNameOrIPAddr(istioHostAddrFoo),
+ })
+ assert.Equal(t, DiscoverIstiodPodIpError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(0), cancelCalledCounter.Load())
}
@@ -474,7 +494,12 @@ func testFailedWithLocalEDSFailed(t *testing.T) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
- xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
+ xdsWrappedClient, err := NewXDSWrappedClient(Config{
+ PodName: podNameFoo,
+ Namespace: localNamespaceFoo,
+ LocalIP: localIPFoo,
+ IstioAddr: common.NewHostNameOrIPAddr(istioHostAddrFoo),
+ })
assert.Equal(t, DiscoverLocalError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(2), cancelCalledCounter.Load())
@@ -549,8 +574,13 @@ func testFailedWithIstioEDSFailed(t *testing.T) {
xdsClientFactoryFunction = func(localIP, podName, namespace string, istioAddr common.HostAddr) (client.XDSClient, error) {
return mockXDSClient, nil
}
- xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
- assert.Equal(t, DiscoverIstiodPodError, err)
+ xdsWrappedClient, err := NewXDSWrappedClient(Config{
+ PodName: podNameFoo,
+ Namespace: localNamespaceFoo,
+ LocalIP: localIPFoo,
+ IstioAddr: common.NewHostNameOrIPAddr(istioHostAddrFoo),
+ })
+ assert.Equal(t, DiscoverIstiodPodIpError, err)
assert.Nil(t, xdsWrappedClient)
assert.Equal(t, int32(2), cancelCalledCounter.Load())
}
@@ -722,7 +752,12 @@ func testSubscribe(t *testing.T) {
}
xdsWrappedClient = nil
- xdsWrappedClient, err := NewXDSWrappedClient(podNameFoo, localNamespaceFoo, localIPFoo, common.NewHostNameOrIPAddr(istioHostAddrFoo))
+ xdsWrappedClient, err := NewXDSWrappedClient(Config{
+ PodName: podNameFoo,
+ Namespace: localNamespaceFoo,
+ LocalIP: localIPFoo,
+ IstioAddr: common.NewHostNameOrIPAddr(istioHostAddrFoo),
+ })
assert.Nil(t, err)
assert.NotNil(t, xdsWrappedClient)
diff --git a/remoting/xds/error.go b/remoting/xds/config.go
similarity index 74%
copy from remoting/xds/error.go
copy to remoting/xds/config.go
index 84325bc5f..92df233e6 100644
--- a/remoting/xds/error.go
+++ b/remoting/xds/config.go
@@ -18,10 +18,19 @@
package xds
import (
- "errors"
+ "time"
)
-var (
- DiscoverLocalError = errors.New("Discovery local Pod's host from xds, failed please register service with endpoint to k8s ")
- DiscoverIstiodPodError = errors.New("Discovery istiod Pod's host from xds failed, please register service with endpoint to k8s ")
+import (
+ xdsCommon "dubbo.apache.org/dubbo-go/v3/remoting/xds/common"
)
+
+type Config struct {
+ PodName string
+ Namespace string
+ IstioAddr xdsCommon.HostAddr
+ DebugPort string
+ LocalIP string
+ LocalDebugMode bool
+ SniffingTimeout time.Duration
+}
diff --git a/remoting/xds/error.go b/remoting/xds/error.go
index 84325bc5f..2711cf7a7 100644
--- a/remoting/xds/error.go
+++ b/remoting/xds/error.go
@@ -22,6 +22,6 @@ import (
)
var (
- DiscoverLocalError = errors.New("Discovery local Pod's host from xds, failed please register service with endpoint to k8s ")
- DiscoverIstiodPodError = errors.New("Discovery istiod Pod's host from xds failed, please register service with endpoint to k8s ")
+ DiscoverLocalError = errors.New("Discovery local Pod's host from xds, failed please register service with endpoint to k8s ")
+ DiscoverIstiodPodIpError = errors.New("Discovery istiod Pod's ip from xds failed, please register service with endpoint to k8s ")
)
diff --git a/remoting/xds/mapping/handler.go b/remoting/xds/mapping/handler.go
index b20431787..36489a91e 100644
--- a/remoting/xds/mapping/handler.go
+++ b/remoting/xds/mapping/handler.go
@@ -28,6 +28,8 @@ import (
import (
structpb "github.com/golang/protobuf/ptypes/struct"
+
+ perrors "github.com/pkg/errors"
)
import (
@@ -62,6 +64,7 @@ type InterfaceMapHandlerImpl struct {
*/
interfaceNameHostAddrMap map[string]string
interfaceNameHostAddrMapLock sync.RWMutex
+ localDebugMode bool
}
func (i *InterfaceMapHandlerImpl) UnRegister(serviceUniqueKey string) error {
@@ -78,6 +81,10 @@ func (i *InterfaceMapHandlerImpl) Register(serviceUniqueKey string) error {
return i.xdsClient.SetMetadata(i.interfaceAppNameMap2DubboGoMetadata())
}
+func (i *InterfaceMapHandlerImpl) GetDubboGoMetadata() (map[string]string, error) {
+ return i.getServiceUniqueKeyHostAddrMapFromPilot()
+}
+
func (i *InterfaceMapHandlerImpl) GetHostAddrMap(serviceUniqueKey string) (string, error) {
i.interfaceNameHostAddrMapLock.RLock()
if hostAddr, ok := i.interfaceNameHostAddrMap[serviceUniqueKey]; ok {
@@ -86,6 +93,8 @@ func (i *InterfaceMapHandlerImpl) GetHostAddrMap(serviceUniqueKey string) (strin
}
i.interfaceNameHostAddrMapLock.RUnlock()
+ retryCount := 0
+ maxRetries := 30
for {
if interfaceHostAddrMap, err := i.getServiceUniqueKeyHostAddrMapFromPilot(); err != nil {
return "", err
@@ -95,8 +104,14 @@ func (i *InterfaceMapHandlerImpl) GetHostAddrMap(serviceUniqueKey string) (strin
i.interfaceNameHostAddrMapLock.Unlock()
hostName, ok := interfaceHostAddrMap[serviceUniqueKey]
if !ok {
- logger.Infof("[XDS Wrapped Client] Try getting interface %s 's host from istio %d:8080\n", serviceUniqueKey, i.istioDebugAddr)
+ logger.Infof("[XDS Wrapped Client] Try getting interface %s 's host from istio %s:8080\n", serviceUniqueKey, i.istioDebugAddr)
time.Sleep(time.Millisecond * 100)
+ retryCount++
+ if retryCount > maxRetries {
+ err := perrors.Errorf("[XDS Wrapped Client] Try getting interface %s 's host from istio %s:8080 failed. Please check if provider's service resource is deployed correctly.\n", serviceUniqueKey, i.istioDebugAddr)
+ logger.Error(err)
+ return "", err
+ }
continue
}
return hostName, nil
@@ -108,11 +123,13 @@ func (i *InterfaceMapHandlerImpl) GetHostAddrMap(serviceUniqueKey string) (strin
// 'dubbo-go-app.default.svc.cluster.local:20000'
func (i *InterfaceMapHandlerImpl) getServiceUniqueKeyHostAddrMapFromPilot() (map[string]string, error) {
req, _ := http.NewRequest(http.MethodGet, fmt.Sprintf("http://%s/debug/adsz", i.istioDebugAddr.String()), nil)
- token, err := ioutil.ReadFile(i.istioTokenPath)
- if err != nil {
- return nil, err
+ if !i.localDebugMode {
+ token, err := ioutil.ReadFile(i.istioTokenPath)
+ if err != nil {
+ return nil, err
+ }
+ req.Header.Add(authorizationHeader, istiodTokenPrefix+string(token))
}
- req.Header.Add(authorizationHeader, istiodTokenPrefix+string(token))
rsp, err := http.DefaultClient.Do(req)
if err != nil {
logger.Infof("[XDS Wrapped Client] Try getting interface host map from istio IP %s with error %s\n",
@@ -138,7 +155,7 @@ func (i *InterfaceMapHandlerImpl) interfaceAppNameMap2DubboGoMetadata() *structp
return GetDubboGoMetadata(string(data))
}
-func NewInterfaceMapHandlerImpl(xdsClient client.XDSClient, istioTokenPath string, istioDebugAddr, hostAddr common.HostAddr) InterfaceMapHandler {
+func NewInterfaceMapHandlerImpl(xdsClient client.XDSClient, istioTokenPath string, istioDebugAddr, hostAddr common.HostAddr, localDebugMode bool) InterfaceMapHandler {
return &InterfaceMapHandlerImpl{
xdsClient: xdsClient,
interfaceAppNameMap: map[string]string{},
@@ -146,6 +163,7 @@ func NewInterfaceMapHandlerImpl(xdsClient client.XDSClient, istioTokenPath strin
istioDebugAddr: istioDebugAddr,
hostAddr: hostAddr,
istioTokenPath: istioTokenPath,
+ localDebugMode: localDebugMode,
}
}
@@ -153,4 +171,5 @@ type InterfaceMapHandler interface {
Register(string) error
UnRegister(string) error
GetHostAddrMap(string) (string, error)
+ GetDubboGoMetadata() (map[string]string, error)
}
diff --git a/remoting/xds/mapping/handler_test.go b/remoting/xds/mapping/handler_test.go
index 2db03511f..9e1a0d8c5 100644
--- a/remoting/xds/mapping/handler_test.go
+++ b/remoting/xds/mapping/handler_test.go
@@ -53,7 +53,7 @@ const (
func TestNewInterfaceMapHandler(t *testing.T) {
mockXDSClient := &mocks.XDSClient{}
- interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr))
+ interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr), false)
assert.NotNil(t, interfaceMapHandler)
}
@@ -61,7 +61,7 @@ func TestInterfaceMapHandlerRegisterAndUnregister(t *testing.T) {
mockXDSClient := &mocks.XDSClient{}
mockXDSClient.On("SetMetadata", mock.AnythingOfType("*structpb.Struct")).Return(nil)
- interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr))
+ interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr), false)
assert.Nil(t, interfaceMapHandler.Register(serviceKey1))
assert.Nil(t, interfaceMapHandler.Register(serviceKey2))
@@ -76,7 +76,7 @@ func TestInterfaceMapHandlerRegisterAndUnregister(t *testing.T) {
func TestGetServiceUniqueKeyHostAddrMapFromPilot(t *testing.T) {
mockXDSClient := &mocks.XDSClient{}
- interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr))
+ interfaceMapHandler := NewInterfaceMapHandlerImpl(mockXDSClient, istioTokenPathFoo, common.NewHostNameOrIPAddr(istiodDebugAddrStrFoo), common.NewHostNameOrIPAddr(localPodServiceAddr), false)
assert.Nil(t, generateMockToken())
// 1. start mock http server
diff --git a/remoting/xds/mapping/mocks/InterfaceMapHandler.go b/remoting/xds/mapping/mocks/InterfaceMapHandler.go
index fd2247550..c98a89b6b 100644
--- a/remoting/xds/mapping/mocks/InterfaceMapHandler.go
+++ b/remoting/xds/mapping/mocks/InterfaceMapHandler.go
@@ -76,3 +76,24 @@ func (_m *InterfaceMapHandler) UnRegister(_a0 string) error {
return r0
}
+
+// GetDubboGoMetadata provides a mock function
+func (_m *InterfaceMapHandler) GetDubboGoMetadata() (map[string]string, error) {
+ ret := _m.Called()
+
+ var r0 map[string]string
+ if rf, ok := ret.Get(0).(func() map[string]string); ok {
+ r0 = rf()
+ } else {
+ r0 = ret.Get(0).(map[string]string)
+ }
+
+ var r1 error
+ if rf, ok := ret.Get(1).(func() error); ok {
+ r1 = rf()
+ } else {
+ r1 = ret.Error(1)
+ }
+
+ return r0, r1
+}
diff --git a/remoting/xds/mocks/client.go b/remoting/xds/mocks/client.go
index 02c7f93b9..65f4b305a 100644
--- a/remoting/xds/mocks/client.go
+++ b/remoting/xds/mocks/client.go
@@ -33,6 +33,11 @@ type WrappedClientMock struct {
mock.Mock
}
+func (m *WrappedClientMock) GetDubboGoMetadata() (map[string]string, error) {
+ args := m.Called()
+ return args.Get(0).(map[string]string), args.Error(1)
+}
+
func (m *WrappedClientMock) Subscribe(svcUniqueName, interfaceName, hostAddr string, lst registry.NotifyListener) error {
args := m.Called(svcUniqueName, interfaceName, hostAddr, lst)
return args.Error(0)
diff --git a/test/xds/main.go b/test/xds/main.go
deleted file mode 100644
index 85b661f32..000000000
--- a/test/xds/main.go
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package main
-
-import (
- "fmt"
-)
-
-import (
- v3corepb "github.com/envoyproxy/go-control-plane/envoy/config/core/v3"
-
- structpb "github.com/golang/protobuf/ptypes/struct"
-
- "google.golang.org/grpc"
- "google.golang.org/grpc/credentials/insecure"
-)
-
-import (
- "dubbo.apache.org/dubbo-go/v3/xds/client"
- "dubbo.apache.org/dubbo-go/v3/xds/client/bootstrap"
- _ "dubbo.apache.org/dubbo-go/v3/xds/client/controller/version/v2"
- _ "dubbo.apache.org/dubbo-go/v3/xds/client/controller/version/v3"
- "dubbo.apache.org/dubbo-go/v3/xds/client/resource"
- "dubbo.apache.org/dubbo-go/v3/xds/client/resource/version"
-)
-
-const (
- gRPCUserAgentName = "gRPC Go"
- clientFeatureNoOverprovisioning = "envoy.lb.does_not_support_overprovisioning"
-)
-
-// ATTENTION! export GRPC_XDS_EXPERIMENTAL_SECURITY_SUPPORT=false
-func main() {
- v3NodeProto := &v3corepb.Node{
- Id: "sidecar~172.1.1.1~sleep-55b5877479-rwcct.default~default.svc.cluster.local",
- UserAgentName: gRPCUserAgentName,
- Cluster: "testCluster",
- UserAgentVersionType: &v3corepb.Node_UserAgentVersion{UserAgentVersion: "1.45.0"},
- ClientFeatures: []string{clientFeatureNoOverprovisioning},
- Metadata: &structpb.Struct{
- Fields: map[string]*structpb.Value{
- "CLUSTER_ID": {
- Kind: &structpb.Value_StringValue{StringValue: "Kubernetes"},
- },
- "LABELS": {
- Kind: &structpb.Value_StructValue{StructValue: &structpb.Struct{
- Fields: map[string]*structpb.Value{
- "label1": {
- Kind: &structpb.Value_StringValue{StringValue: "val1"},
- },
- "label2": {
- Kind: &structpb.Value_StringValue{StringValue: "val2"},
- },
- },
- }},
- },
- },
- },
- }
-
- nonNilCredsConfigV2 := &bootstrap.Config{
- XDSServer: &bootstrap.ServerConfig{
- ServerURI: "localhost:15010",
- Creds: grpc.WithTransportCredentials(insecure.NewCredentials()),
- //CredsType: "google_default",
- TransportAPI: version.TransportV3,
- NodeProto: v3NodeProto,
- },
- ClientDefaultListenerResourceNameTemplate: "%s",
- }
-
- xdsClient, err := client.NewWithConfig(nonNilCredsConfigV2)
- if err != nil {
- panic(err)
- }
-
- //clusterName := "outbound|20000||dubbo-go-app.default.svc.cluster.local" //
- //clusterName := "outbound|8848||nacos.default.svc.cluster.local"
- //endpointClusterMap := sync.Map{}
- //xdsClient.WatchCluster("*", func(update resource.ClusterUpdate, err error) {
- // xdsClient.WatchEndpoints(update.ClusterName, func(endpoint resource.EndpointsUpdate, err error) {
- // for _, v := range endpoint.Localities {
- // for _, e := range v.Endpoints {
- // endpointClusterMap.Store(e.Address, update.ClusterName)
- // }
- // }
- // })
- //})
-
- //
- //xdsClient.WatchEndpoints("outbound|15010||istiod.istio-system.svc.cluster.local", func(update resource.EndpointsUpdate, err error) {
- // fmt.Printf("%+v\n err = %s", update, err)
- //})
-
- xdsClient.WatchCluster("*", func(update resource.ClusterUpdate, err error) {
- fmt.Println(update)
- })
-
- //xdsClient.WatchCluster("*", func(update resource.ClusterUpdate, err error) {
- // fmt.Printf("%+v\n err = %s", update, err)
- //
- //})
- //
-
- select {}
-}