You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/02/13 04:02:35 UTC
[dubbo-go] branch 3.0 updated: fix tps filter panic bug(pre-check the configuration of tps at the st… (#1604)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 8aea261 fix tps filter panic bug(pre-check the configuration of tps at the st… (#1604)
8aea261 is described below
commit 8aea261fa57206dd6e80a057717beddcd18f32c4
Author: Mulavar <97...@qq.com>
AuthorDate: Sun Feb 13 12:02:28 2022 +0800
fix tps filter panic bug(pre-check the configuration of tps at the st… (#1604)
* fix tps filter panic bug(pre-check the configuration of tps at the start phase)
* fix tps limit unit test
* add exists flag to tps filter extension
Co-authored-by: dongjianhui03 <do...@meituan.com>
---
common/constant/key.go | 4 +-
common/extension/tps_limit.go | 16 +-
config/graceful_shutdown.go | 2 +-
config/method_config.go | 52 +++++--
config/service_config.go | 237 ++++++++++++++++++------------
filter/tps/filter.go | 7 +-
filter/tps/limiter/method_service.go | 21 ++-
filter/tps/limiter/method_service_test.go | 3 +-
8 files changed, 221 insertions(+), 121 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index fdd9359..6480841 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -112,9 +112,9 @@ const (
TPSLimiterKey = "tps.limiter"
TPSRejectedExecutionHandlerKey = "tps.limit.rejected.handler"
TPSLimitRateKey = "tps.limit.rate"
- DefaultTPSLimitRate = "-1"
+ DefaultTPSLimitRate = -1
TPSLimitIntervalKey = "tps.limit.interval"
- DefaultTPSLimitInterval = "60000"
+ DefaultTPSLimitInterval = -1
TPSLimitStrategyKey = "tps.limit.strategy"
ExecuteLimitKey = "execute.limit"
DefaultExecuteLimit = "-1"
diff --git a/common/extension/tps_limit.go b/common/extension/tps_limit.go
index e5ff81e..8f678d1 100644
--- a/common/extension/tps_limit.go
+++ b/common/extension/tps_limit.go
@@ -18,6 +18,10 @@
package extension
import (
+ "errors"
+)
+
+import (
"dubbo.apache.org/dubbo-go/v3/filter"
)
@@ -32,13 +36,13 @@ func SetTpsLimiter(name string, creator func() filter.TpsLimiter) {
}
// GetTpsLimiter finds the TpsLimiter with @name
-func GetTpsLimiter(name string) filter.TpsLimiter {
+func GetTpsLimiter(name string) (filter.TpsLimiter, error) {
creator, ok := tpsLimiter[name]
if !ok {
- panic("TpsLimiter for " + name + " is not existing, make sure you have import the package " +
+ return nil, errors.New("TpsLimiter for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetTpsLimiter.")
}
- return creator()
+ return creator(), nil
}
// SetTpsLimitStrategy sets the TpsLimitStrategyCreator with @name
@@ -47,11 +51,11 @@ func SetTpsLimitStrategy(name string, creator filter.TpsLimitStrategyCreator) {
}
// GetTpsLimitStrategyCreator finds the TpsLimitStrategyCreator with @name
-func GetTpsLimitStrategyCreator(name string) filter.TpsLimitStrategyCreator {
+func GetTpsLimitStrategyCreator(name string) (filter.TpsLimitStrategyCreator, error) {
creator, ok := tpsLimitStrategy[name]
if !ok {
- panic("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " +
+ return nil, errors.New("TpsLimitStrategy for " + name + " is not existing, make sure you have import the package " +
"and you have register it by invoking extension.SetTpsLimitStrategy.")
}
- return creator
+ return creator, nil
}
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 98d5ac6..01ebbbc 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -195,7 +195,7 @@ func waitForSendingAndReceivingRequests() {
// ignore this step
return
}
- rootConfig.Shutdown.RejectRequest = true
+ rootConfig.Shutdown.RejectRequest.Store(true)
waitingConsumerProcessedTimeout(rootConfig.Shutdown)
}
diff --git a/config/method_config.go b/config/method_config.go
index 4e45987..093df5d 100644
--- a/config/method_config.go
+++ b/config/method_config.go
@@ -18,11 +18,17 @@
package config
import (
+ "fmt"
+ "strconv"
+)
+
+import (
"github.com/creasty/defaults"
)
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
+ "dubbo.apache.org/dubbo-go/v3/common/extension"
)
// MethodConfig defines method config
@@ -43,16 +49,16 @@ type MethodConfig struct {
}
// nolint
-func (mc *MethodConfig) Prefix() string {
- if len(mc.InterfaceId) != 0 {
- return constant.Dubbo + "." + mc.InterfaceName + "." + mc.InterfaceId + "." + mc.Name + "."
+func (m *MethodConfig) Prefix() string {
+ if len(m.InterfaceId) != 0 {
+ return constant.Dubbo + "." + m.InterfaceName + "." + m.InterfaceId + "." + m.Name + "."
}
- return constant.Dubbo + "." + mc.InterfaceName + "." + mc.Name + "."
+ return constant.Dubbo + "." + m.InterfaceName + "." + m.Name + "."
}
-func (mc *MethodConfig) Init() error {
- return mc.check()
+func (m *MethodConfig) Init() error {
+ return m.check()
}
func initProviderMethodConfig(sc *ServiceConfig) error {
@@ -70,9 +76,37 @@ func initProviderMethodConfig(sc *ServiceConfig) error {
}
// check set default value and verify
-func (mc *MethodConfig) check() error {
- if err := defaults.Set(mc); err != nil {
+func (m *MethodConfig) check() error {
+ qualifieldMethodName := m.InterfaceName + "#" + m.Name
+ if m.TpsLimitStrategy != "" {
+ _, err := extension.GetTpsLimitStrategyCreator(m.TpsLimitStrategy)
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ if m.TpsLimitInterval != "" {
+ tpsLimitInterval, err := strconv.ParseInt(m.TpsLimitInterval, 0, 0)
+ if err != nil {
+ return fmt.Errorf("[MethodConfig] Cannot parse the configuration tps.limit.interval for method %s, please check your configuration", qualifieldMethodName)
+ }
+ if tpsLimitInterval < 0 {
+ return fmt.Errorf("[MethodConfig] The configuration tps.limit.interval for %s must be positive, please check your configuration", qualifieldMethodName)
+ }
+ }
+
+ if m.TpsLimitRate != "" {
+ tpsLimitRate, err := strconv.ParseInt(m.TpsLimitRate, 0, 0)
+ if err != nil {
+ return fmt.Errorf("[MethodConfig] Cannot parse the configuration tps.limit.rate for method %s, please check your configuration", qualifieldMethodName)
+ }
+ if tpsLimitRate < 0 {
+ return fmt.Errorf("[MethodConfig] The configuration tps.limit.rate for method %s must be positive, please check your configuration", qualifieldMethodName)
+ }
+ }
+
+ if err := defaults.Set(m); err != nil {
return err
}
- return verify(mc)
+ return verify(m)
}
diff --git a/config/service_config.go b/config/service_config.go
index ff60946..a550812 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -95,49 +95,96 @@ type ServiceConfig struct {
}
// Prefix returns dubbo.service.${InterfaceName}.
-func (svc *ServiceConfig) Prefix() string {
- return strings.Join([]string{constant.ServiceConfigPrefix, svc.id}, ".")
+func (s *ServiceConfig) Prefix() string {
+ return strings.Join([]string{constant.ServiceConfigPrefix, s.id}, ".")
}
-func (svc *ServiceConfig) Init(rc *RootConfig) error {
- if err := initProviderMethodConfig(svc); err != nil {
+func (s *ServiceConfig) Init(rc *RootConfig) error {
+ if err := initProviderMethodConfig(s); err != nil {
return err
}
- if err := defaults.Set(svc); err != nil {
+ if err := defaults.Set(s); err != nil {
return err
}
- svc.exported = atomic.NewBool(false)
- svc.metadataType = rc.Application.MetadataType
- svc.unexported = atomic.NewBool(false)
- svc.RCRegistriesMap = rc.Registries
- svc.RCProtocolsMap = rc.Protocols
+ s.exported = atomic.NewBool(false)
+ s.metadataType = rc.Application.MetadataType
+ s.unexported = atomic.NewBool(false)
+ s.RCRegistriesMap = rc.Registries
+ s.RCProtocolsMap = rc.Protocols
if rc.Provider != nil {
- svc.ProxyFactoryKey = rc.Provider.ProxyFactory
+ s.ProxyFactoryKey = rc.Provider.ProxyFactory
}
- svc.RegistryIDs = translateRegistryIds(svc.RegistryIDs)
- if len(svc.RegistryIDs) <= 0 {
- svc.RegistryIDs = rc.Provider.RegistryIDs
+ s.RegistryIDs = translateRegistryIds(s.RegistryIDs)
+ if len(s.RegistryIDs) <= 0 {
+ s.RegistryIDs = rc.Provider.RegistryIDs
}
- if len(svc.ProtocolIDs) <= 0 {
+ if len(s.ProtocolIDs) <= 0 {
for k, _ := range rc.Protocols {
- svc.ProtocolIDs = append(svc.ProtocolIDs, k)
+ s.ProtocolIDs = append(s.ProtocolIDs, k)
}
}
- if svc.TracingKey == "" {
- svc.TracingKey = rc.Provider.TracingKey
+ if s.TracingKey == "" {
+ s.TracingKey = rc.Provider.TracingKey
}
- svc.export = true
- return verify(svc)
+ err := s.check()
+ if err != nil {
+ panic(err)
+ }
+ s.export = true
+ return verify(s)
+}
+
+func (s *ServiceConfig) check() error {
+ // check if the limiter has been imported
+ if s.TpsLimiter != "" {
+ _, err := extension.GetTpsLimiter(s.TpsLimiter)
+ if err != nil {
+ panic(err)
+ }
+ }
+ if s.TpsLimitStrategy != "" {
+ _, err := extension.GetTpsLimitStrategyCreator(s.TpsLimitStrategy)
+ if err != nil {
+ panic(err)
+ }
+ }
+ if s.TpsLimitRejectedHandler != "" {
+ _, err := extension.GetRejectedExecutionHandler(s.TpsLimitRejectedHandler)
+ if err != nil {
+ panic(err)
+ }
+ }
+
+ if s.TpsLimitInterval != "" {
+ tpsLimitInterval, err := strconv.ParseInt(s.TpsLimitInterval, 0, 0)
+ if err != nil {
+ return fmt.Errorf("[ServiceConfig] Cannot parse the configuration tps.limit.interval for service %s, please check your configuration", s.Interface)
+ }
+ if tpsLimitInterval < 0 {
+ return fmt.Errorf("[ServiceConfig] The configuration tps.limit.interval for service %s must be positive, please check your configuration", s.Interface)
+ }
+ }
+
+ if s.TpsLimitRate != "" {
+ tpsLimitRate, err := strconv.ParseInt(s.TpsLimitRate, 0, 0)
+ if err != nil {
+ return fmt.Errorf("[ServiceConfig] Cannot parse the configuration tps.limit.rate for service %s, please check your configuration", s.Interface)
+ }
+ if tpsLimitRate < 0 {
+ return fmt.Errorf("[ServiceConfig] The configuration tps.limit.rate for service %s must be positive, please check your configuration", s.Interface)
+ }
+ }
+ return nil
}
// InitExported will set exported as false atom bool
-func (svc *ServiceConfig) InitExported() {
- svc.exported = atomic.NewBool(false)
+func (s *ServiceConfig) InitExported() {
+ s.exported = atomic.NewBool(false)
}
// IsExport will return whether the service config is exported or not
-func (svc *ServiceConfig) IsExport() bool {
- return svc.exported.Load()
+func (s *ServiceConfig) IsExport() bool {
+ return s.exported.Load()
}
// Get Random Port
@@ -159,35 +206,35 @@ func getRandomPort(protocolConfigs []*ProtocolConfig) *list.List {
}
// Export exports the service
-func (svc *ServiceConfig) Export() error {
+func (s *ServiceConfig) Export() error {
// TODO: delay export
- if svc.unexported != nil && svc.unexported.Load() {
- err := perrors.Errorf("The service %v has already unexported!", svc.Interface)
+ if s.unexported != nil && s.unexported.Load() {
+ err := perrors.Errorf("The service %v has already unexported!", s.Interface)
logger.Errorf(err.Error())
return err
}
- if svc.exported != nil && svc.exported.Load() {
- logger.Warnf("The service %v has already exported!", svc.Interface)
+ if s.exported != nil && s.exported.Load() {
+ logger.Warnf("The service %v has already exported!", s.Interface)
return nil
}
- regUrls := loadRegistries(svc.RegistryIDs, svc.RCRegistriesMap, common.PROVIDER)
- urlMap := svc.getUrlMap()
- protocolConfigs := loadProtocol(svc.ProtocolIDs, svc.RCProtocolsMap)
+ regUrls := loadRegistries(s.RegistryIDs, s.RCRegistriesMap, common.PROVIDER)
+ urlMap := s.getUrlMap()
+ protocolConfigs := loadProtocol(s.ProtocolIDs, s.RCProtocolsMap)
if len(protocolConfigs) == 0 {
- logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs, Please check your configuration center and transfer protocol ", svc.Interface, svc.ProtocolIDs)
+ logger.Warnf("The service %v's '%v' protocols don't has right protocolConfigs, Please check your configuration center and transfer protocol ", s.Interface, s.ProtocolIDs)
return nil
}
ports := getRandomPort(protocolConfigs)
nextPort := ports.Front()
- proxyFactory := extension.GetProxyFactory(svc.ProxyFactoryKey)
+ proxyFactory := extension.GetProxyFactory(s.ProxyFactoryKey)
for _, proto := range protocolConfigs {
// registry the service reflect
- methods, err := common.ServiceMap.Register(svc.Interface, proto.Name, svc.Group, svc.Version, svc.rpcService)
+ methods, err := common.ServiceMap.Register(s.Interface, proto.Name, s.Group, s.Version, s.rpcService)
if err != nil {
formatErr := perrors.Errorf("The service %v export the protocol %v error! Error message is %v.",
- svc.Interface, proto.Name, err.Error())
+ s.Interface, proto.Name, err.Error())
logger.Errorf(formatErr.Error())
return formatErr
}
@@ -198,44 +245,44 @@ func (svc *ServiceConfig) Export() error {
nextPort = nextPort.Next()
}
ivkURL := common.NewURLWithOptions(
- common.WithPath(svc.Interface),
+ common.WithPath(s.Interface),
common.WithProtocol(proto.Name),
common.WithIp(proto.Ip),
common.WithPort(port),
common.WithParams(urlMap),
- common.WithParamsValue(constant.BeanNameKey, svc.id),
+ common.WithParamsValue(constant.BeanNameKey, s.id),
//common.WithParamsValue(constant.SslEnabledKey, strconv.FormatBool(config.GetSslEnabled())),
common.WithMethods(strings.Split(methods, ",")),
- common.WithToken(svc.Token),
- common.WithParamsValue(constant.MetadataTypeKey, svc.metadataType),
+ common.WithToken(s.Token),
+ common.WithParamsValue(constant.MetadataTypeKey, s.metadataType),
)
- if len(svc.Tag) > 0 {
- ivkURL.AddParam(constant.Tagkey, svc.Tag)
+ if len(s.Tag) > 0 {
+ ivkURL.AddParam(constant.Tagkey, s.Tag)
}
// post process the URL to be exported
- svc.postProcessConfig(ivkURL)
+ s.postProcessConfig(ivkURL)
// config post processor may set "export" to false
if !ivkURL.GetParamBool(constant.ExportKey, true) {
return nil
}
if len(regUrls) > 0 {
- svc.cacheMutex.Lock()
- if svc.cacheProtocol == nil {
+ s.cacheMutex.Lock()
+ if s.cacheProtocol == nil {
logger.Debugf(fmt.Sprintf("First load the registry protocol, url is {%v}!", ivkURL))
- svc.cacheProtocol = extension.GetProtocol("registry")
+ s.cacheProtocol = extension.GetProtocol("registry")
}
- svc.cacheMutex.Unlock()
+ s.cacheMutex.Unlock()
for _, regUrl := range regUrls {
setRegistrySubURL(ivkURL, regUrl)
invoker := proxyFactory.GetInvoker(regUrl)
- exporter := svc.cacheProtocol.Export(invoker)
+ exporter := s.cacheProtocol.Export(invoker)
if exporter == nil {
return perrors.New(fmt.Sprintf("Registry protocol new exporter error, registry is {%v}, url is {%v}", regUrl, ivkURL))
}
- svc.exporters = append(svc.exporters, exporter)
+ s.exporters = append(s.exporters, exporter)
}
} else {
if ivkURL.GetParam(constant.InterfaceKey, "") == constant.MetadataServiceName {
@@ -253,11 +300,11 @@ func (svc *ServiceConfig) Export() error {
if exporter == nil {
return perrors.New(fmt.Sprintf("Filter protocol without registry new exporter error, url is {%v}", ivkURL))
}
- svc.exporters = append(svc.exporters, exporter)
+ s.exporters = append(s.exporters, exporter)
}
publishServiceDefinition(ivkURL)
}
- svc.exported.Store(true)
+ s.exported.Store(true)
return nil
}
@@ -318,56 +365,56 @@ func loadRegistries(registryIds []string, registries map[string]*RegistryConfig,
}
// Unexport will call unexport of all exporters service config exported
-func (svc *ServiceConfig) Unexport() {
- if !svc.exported.Load() {
+func (s *ServiceConfig) Unexport() {
+ if !s.exported.Load() {
return
}
- if svc.unexported.Load() {
+ if s.unexported.Load() {
return
}
func() {
- svc.exportersLock.Lock()
- defer svc.exportersLock.Unlock()
- for _, exporter := range svc.exporters {
+ s.exportersLock.Lock()
+ defer s.exportersLock.Unlock()
+ for _, exporter := range s.exporters {
exporter.Unexport()
}
- svc.exporters = nil
+ s.exporters = nil
}()
- svc.exported.Store(false)
- svc.unexported.Store(true)
+ s.exported.Store(false)
+ s.unexported.Store(true)
}
// Implement only store the @s and return
-func (svc *ServiceConfig) Implement(s common.RPCService) {
- svc.rpcService = s
+func (s *ServiceConfig) Implement(rpcService common.RPCService) {
+ s.rpcService = rpcService
}
-func (svc *ServiceConfig) getUrlMap() url.Values {
+func (s *ServiceConfig) getUrlMap() url.Values {
urlMap := url.Values{}
// first set user params
- for k, v := range svc.Params {
+ for k, v := range s.Params {
urlMap.Set(k, v)
}
- urlMap.Set(constant.InterfaceKey, svc.Interface)
+ urlMap.Set(constant.InterfaceKey, s.Interface)
urlMap.Set(constant.TimestampKey, strconv.FormatInt(time.Now().Unix(), 10))
- urlMap.Set(constant.ClusterKey, svc.Cluster)
- urlMap.Set(constant.LoadbalanceKey, svc.Loadbalance)
- urlMap.Set(constant.WarmupKey, svc.Warmup)
- urlMap.Set(constant.RetriesKey, svc.Retries)
- if svc.Group != "" {
- urlMap.Set(constant.GroupKey, svc.Group)
+ urlMap.Set(constant.ClusterKey, s.Cluster)
+ urlMap.Set(constant.LoadbalanceKey, s.Loadbalance)
+ urlMap.Set(constant.WarmupKey, s.Warmup)
+ urlMap.Set(constant.RetriesKey, s.Retries)
+ if s.Group != "" {
+ urlMap.Set(constant.GroupKey, s.Group)
}
- if svc.Version != "" {
- urlMap.Set(constant.VersionKey, svc.Version)
+ if s.Version != "" {
+ urlMap.Set(constant.VersionKey, s.Version)
}
urlMap.Set(constant.RegistryRoleKey, strconv.Itoa(common.PROVIDER))
urlMap.Set(constant.ReleaseKey, "dubbo-golang-"+constant.Version)
urlMap.Set(constant.SideKey, (common.RoleType(common.PROVIDER)).Role())
- urlMap.Set(constant.MessageSizeKey, strconv.Itoa(svc.GrpcMaxMessageSize))
+ urlMap.Set(constant.MessageSizeKey, strconv.Itoa(s.GrpcMaxMessageSize))
// todo: move
- urlMap.Set(constant.SerializationKey, svc.Serialization)
+ urlMap.Set(constant.SerializationKey, s.Serialization)
// application config info
ac := GetApplicationConfig()
urlMap.Set(constant.ApplicationKey, ac.Name)
@@ -380,39 +427,39 @@ func (svc *ServiceConfig) getUrlMap() url.Values {
// filter
var filters string
- if svc.Filter == "" {
+ if s.Filter == "" {
filters = constant.DefaultServiceFilters
} else {
- filters = svc.Filter
+ filters = s.Filter
}
- if svc.adaptiveService {
+ if s.adaptiveService {
filters += fmt.Sprintf(",%s", constant.AdaptiveServiceProviderFilterKey)
}
urlMap.Set(constant.ServiceFilterKey, filters)
// filter special config
- urlMap.Set(constant.AccessLogFilterKey, svc.AccessLog)
+ urlMap.Set(constant.AccessLogFilterKey, s.AccessLog)
// tps limiter
- urlMap.Set(constant.TPSLimitStrategyKey, svc.TpsLimitStrategy)
- urlMap.Set(constant.TPSLimitIntervalKey, svc.TpsLimitInterval)
- urlMap.Set(constant.TPSLimitRateKey, svc.TpsLimitRate)
- urlMap.Set(constant.TPSLimiterKey, svc.TpsLimiter)
- urlMap.Set(constant.TPSRejectedExecutionHandlerKey, svc.TpsLimitRejectedHandler)
- urlMap.Set(constant.TracingConfigKey, svc.TracingKey)
+ urlMap.Set(constant.TPSLimitStrategyKey, s.TpsLimitStrategy)
+ urlMap.Set(constant.TPSLimitIntervalKey, s.TpsLimitInterval)
+ urlMap.Set(constant.TPSLimitRateKey, s.TpsLimitRate)
+ urlMap.Set(constant.TPSLimiterKey, s.TpsLimiter)
+ urlMap.Set(constant.TPSRejectedExecutionHandlerKey, s.TpsLimitRejectedHandler)
+ urlMap.Set(constant.TracingConfigKey, s.TracingKey)
// execute limit filter
- urlMap.Set(constant.ExecuteLimitKey, svc.ExecuteLimit)
- urlMap.Set(constant.ExecuteRejectedExecutionHandlerKey, svc.ExecuteLimitRejectedHandler)
+ urlMap.Set(constant.ExecuteLimitKey, s.ExecuteLimit)
+ urlMap.Set(constant.ExecuteRejectedExecutionHandlerKey, s.ExecuteLimitRejectedHandler)
// auth filter
- urlMap.Set(constant.ServiceAuthKey, svc.Auth)
- urlMap.Set(constant.ParameterSignatureEnableKey, svc.ParamSign)
+ urlMap.Set(constant.ServiceAuthKey, s.Auth)
+ urlMap.Set(constant.ParameterSignatureEnableKey, s.ParamSign)
// whether to export or not
- urlMap.Set(constant.ExportKey, strconv.FormatBool(svc.export))
+ urlMap.Set(constant.ExportKey, strconv.FormatBool(s.export))
urlMap.Set(constant.PIDKey, fmt.Sprintf("%d", os.Getpid()))
- for _, v := range svc.Methods {
+ for _, v := range s.Methods {
prefix := "methods." + v.Name + "."
urlMap.Set(prefix+constant.LoadbalanceKey, v.LoadBalance)
urlMap.Set(prefix+constant.RetriesKey, v.Retries)
@@ -430,10 +477,10 @@ func (svc *ServiceConfig) getUrlMap() url.Values {
}
// GetExportedUrls will return the url in service config's exporter
-func (svc *ServiceConfig) GetExportedUrls() []*common.URL {
- if svc.exported.Load() {
+func (s *ServiceConfig) GetExportedUrls() []*common.URL {
+ if s.exported.Load() {
var urls []*common.URL
- for _, exporter := range svc.exporters {
+ for _, exporter := range s.exporters {
urls = append(urls, exporter.GetInvoker().GetURL())
}
return urls
@@ -442,7 +489,7 @@ func (svc *ServiceConfig) GetExportedUrls() []*common.URL {
}
// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
-func (svc *ServiceConfig) postProcessConfig(url *common.URL) {
+func (s *ServiceConfig) postProcessConfig(url *common.URL) {
for _, p := range extension.GetConfigPostProcessors() {
p.PostProcessServiceConfig(url)
}
diff --git a/filter/tps/filter.go b/filter/tps/filter.go
index 6577059..f57e909 100644
--- a/filter/tps/filter.go
+++ b/filter/tps/filter.go
@@ -72,7 +72,12 @@ func (t *tpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i
tpsLimiter := url.GetParam(constant.TPSLimiterKey, "")
rejectedExeHandler := url.GetParam(constant.TPSRejectedExecutionHandlerKey, constant.DefaultKey)
if len(tpsLimiter) > 0 {
- allow := extension.GetTpsLimiter(tpsLimiter).IsAllowable(invoker.GetURL(), invocation)
+ limiter, err := extension.GetTpsLimiter(tpsLimiter)
+ if err != nil {
+ logger.Warn(err)
+ return invoker.Invoke(ctx, invocation)
+ }
+ allow := limiter.IsAllowable(invoker.GetURL(), invocation)
if allow {
return invoker.Invoke(ctx, invocation)
}
diff --git a/filter/tps/limiter/method_service.go b/filter/tps/limiter/method_service.go
index 41c16f8..8925e04 100644
--- a/filter/tps/limiter/method_service.go
+++ b/filter/tps/limiter/method_service.go
@@ -31,6 +31,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)
@@ -150,6 +151,7 @@ func (limiter MethodServiceTpsLimiter) IsAllowable(url *common.URL, invocation p
if limitRate < 0 {
// the limitTarget is not necessary to be limited.
+ logger.Errorf("Found error configuration value of tps.limit.rate for the invocation %s, ignores TPS Limiter", url.ServiceKey()+"#"+invocation.MethodName())
return true
}
@@ -157,13 +159,18 @@ func (limiter MethodServiceTpsLimiter) IsAllowable(url *common.URL, invocation p
constant.TPSLimitIntervalKey,
constant.DefaultTPSLimitInterval)
if limitInterval <= 0 {
- panic(fmt.Sprintf("The interval must be positive, please check your configuration! url: %s", url.String()))
+ logger.Errorf(fmt.Sprintf("Found error configuration value of tps.limit.interval for the invocation %s, ignores TPS Limiter", url.ServiceKey()+"#"+invocation.MethodName()))
+ return true
}
// find the strategy config and then create one
limitStrategyConfig := url.GetParam(methodConfigPrefix+constant.TPSLimitStrategyKey,
url.GetParam(constant.TPSLimitStrategyKey, constant.DefaultKey))
- limitStateCreator := extension.GetTpsLimitStrategyCreator(limitStrategyConfig)
+ limitStateCreator, err := extension.GetTpsLimitStrategyCreator(limitStrategyConfig)
+ if err != nil {
+ logger.Warn(err)
+ return true
+ }
// we using loadOrStore to ensure thread-safe
limitState, _ = limiter.tpsState.LoadOrStore(limitTarget, limitStateCreator.Create(int(limitRate), int(limitInterval)))
@@ -178,22 +185,24 @@ func getLimitConfig(methodLevelConfig string,
url *common.URL,
invocation protocol.Invocation,
configKey string,
- defaultVal string) int64 {
+ defaultVal int64) int64 {
if len(methodLevelConfig) > 0 {
result, err := strconv.ParseInt(methodLevelConfig, 0, 0)
if err != nil {
- panic(fmt.Sprintf("The %s for invocation %s # %s must be positive, please check your configuration!",
+ logger.Error(fmt.Sprintf("The %s for invocation %s # %s must be positive, please check your configuration!",
configKey, url.ServiceKey(), invocation.MethodName()))
+ return defaultVal
}
return result
}
// actually there is no method-level configuration, so we use the service-level configuration
- result, err := strconv.ParseInt(url.GetParam(configKey, defaultVal), 0, 0)
+ result, err := strconv.ParseInt(url.GetParam(configKey, ""), 0, 0)
if err != nil {
- panic(fmt.Sprintf("Cannot parse the configuration %s, please check your configuration!", configKey))
+ logger.Errorf(fmt.Sprintf("Cannot parse the configuration %s, please check your configuration!", configKey))
+ return defaultVal
}
return result
}
diff --git a/filter/tps/limiter/method_service_test.go b/filter/tps/limiter/method_service_test.go
index 21886fe..e6a3e93 100644
--- a/filter/tps/limiter/method_service_test.go
+++ b/filter/tps/limiter/method_service_test.go
@@ -47,7 +47,8 @@ func TestMethodServiceTpsLimiterImplIsAllowableOnlyServiceLevel(t *testing.T) {
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.InterfaceKey, methodName),
- common.WithParamsValue(constant.TPSLimitRateKey, "20"))
+ common.WithParamsValue(constant.TPSLimitRateKey, "20"),
+ common.WithParamsValue(constant.TPSLimitIntervalKey, "60000"))
mockStrategyImpl := strategy.NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1)