You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/07/08 06:37:16 UTC
[dubbo-go] branch 3.0 updated: Rft: Filter (#1299)
This is an automated email from the ASF dual-hosted git repository.
laurence pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new af29042 Rft: Filter (#1299)
af29042 is described below
commit af290429dee484c75e8fa2079a0600b961a9f7c0
Author: XavierNiu <a...@nxw.name>
AuthorDate: Thu Jul 8 14:36:55 2021 +0800
Rft: Filter (#1299)
* refactor filter
fix config name
fix name problems
update for code style
temp
sync
sync
refactor filter done
* enhance compatibility
* apache license
* fix extension errors
* fix bugs and update docs
* enhance code style
* fix bugs
* rename EchoFilterForTest
* move filter keys to constant package
* go fmt
---
README.md | 2 +-
README_CN.md | 4 +-
common/constant/default.go | 6 +-
common/constant/key.go | 31 +++++++--
common/extension/auth.go | 18 ++---
config/config_loader_test.go | 4 +-
config/graceful_shutdown.go | 4 +-
config/graceful_shutdown_test.go | 4 +-
config/reference_config_test.go | 2 +-
config/service_config.go | 2 +-
filter/README.md | 38 +++++++++++
.../access_log_filter.go => accesslog/filter.go} | 76 +++++++++++-----------
.../filter_test.go} | 14 ++--
.../active_filter.go => active/filter.go} | 21 +++---
.../filter_test.go} | 10 +--
filter/{filter_impl => }/auth/accesskey_storage.go | 15 ++---
.../auth/accesskey_storage_test.go | 2 +-
.../consumer_sign_filter.go} | 17 ++---
.../consumer_sign_filter_test.go} | 0
.../auth/default_authenticator.go | 14 ++--
.../auth/default_authenticator_test.go | 0
.../provider_auth_filter.go} | 10 +--
.../provider_auth_filter_test.go} | 0
filter/{filter_impl => }/auth/sign_util.go | 0
filter/{filter_impl => }/auth/sign_util_test.go | 0
.../{filter_impl/echo_filter.go => echo/filter.go} | 26 +++-----
.../echo_filter_test.go => echo/filter_test.go} | 6 +-
.../execute_limit_filter.go => execlmt/filter.go} | 26 ++++----
.../filter_test.go} | 14 ++--
filter/filter_impl/import.go | 41 ++++++++++++
.../generic_filter.go => generic/filter.go} | 23 ++-----
.../filter_test.go} | 2 +-
.../service_filter.go} | 19 ++----
.../service_filter_test.go} | 8 +--
.../filter.go} | 47 +++++++------
.../filter_test.go} | 4 +-
.../handler/rejected_execution_handler_only_log.go | 3 +-
.../hystrix_filter.go => hystrix/filter.go} | 75 ++++++++++-----------
.../filter_test.go} | 28 ++++----
.../metrics_filter.go => metrics/filter.go} | 28 ++++----
.../filter_test.go} | 4 +-
.../seata_filter.go => seata/filter.go} | 20 +++---
.../seata_filter_test.go => seata/filter_test.go} | 4 +-
.../sentinel_filter.go => sentinel/filter.go} | 34 ++++------
.../filter_test.go} | 8 +--
.../token_filter.go => token/filter.go} | 24 +++----
.../token_filter_test.go => token/filter_test.go} | 10 +--
.../tps_limit_filter.go => tps/filter.go} | 30 ++++-----
.../filter_test.go} | 20 +++---
.../limiter/method_service.go} | 22 +++----
.../limiter/method_service_test.go} | 11 ++--
.../tps_limiter_mock.go => tps/limiter/mock.go} | 2 +-
.../strategy/fix_window.go} | 16 ++---
.../strategy/fix_window_test.go} | 2 +-
.../strategy/mock.go} | 4 +-
.../strategy/sliding_window.go} | 12 ++--
.../strategy/sliding_window_test.go} | 2 +-
.../strategy/thread_safe_fix_window.go} | 18 ++---
.../strategy/thread_safe_fix_window_test.go} | 2 +-
filter/{tps_limit_strategy.go => tps_strategy.go} | 2 +-
.../tracing_filter.go => tracing/filter.go} | 10 +--
.../filter_test.go} | 2 +-
.../protocol_filter_wrapper_test.go | 12 ++--
63 files changed, 467 insertions(+), 448 deletions(-)
diff --git a/README.md b/README.md
index 8662601..40e3a11 100644
--- a/README.md
+++ b/README.md
@@ -31,7 +31,7 @@ The features that are available for Dubbo-go are:
- **Dynamic Configure Center & Service Management Configurator**: Zookeeper, [Apollo](https://github.com/apache/dubbo-go/pull/250), [Nacos](https://github.com/apache/dubbo-go/pull/357)
- **Cluster Strategy**: Failover, [Failfast](https://github.com/apache/dubbo-go/pull/140), [Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136), [Available](https://github.com/apache/dubbo-go/pull/155), [Broadcast](https://github.com/apache/dubbo-go/pull/158), [Forking](https://github.com/apache/dubbo-go/pull/161)
- **Load Balance**: Random, [RoundRobin](https://github.com/apache/dubbo-go/pull/66), [LeastActive](https://github.com/apache/dubbo-go/pull/65), [ConsistentHash](https://github.com/apache/dubbo-go/pull/261)
-- **Filter**: Echo Health Check, [Circuit Break and Service Downgrade](https://github.com/apache/dubbo-go/pull/133), [TokenFilter](https://github.com/apache/dubbo-go/pull/202), [AccessLogFilter](https://github.com/apache/dubbo-go/pull/214), [TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237), [ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246), [GenericServiceFilter](https://github.com/apache/dubbo-go/pull/291), [Auth/Sign](https://github.com/apache/dubbo-go/pull/ [...]
+- [**Filter**](./filter): Echo, Hystrix, Token, AccessLog, TpsLimiter, ExecuteLimit, Generic, Auth/Sign, Metrics, Tracing, Active, Seata, Sentinel
- **Invoke**: [Generic Invoke](https://github.com/apache/dubbo-go/pull/122)
- **Monitor**: Opentracing API, [Prometheus](https://github.com/apache/dubbo-go/pull/342)
- **Tracing**: [For JsonRPC](https://github.com/apache/dubbo-go/pull/335), [For Dubbo](https://github.com/apache/dubbo-go/pull/344), [For gRPC](https://github.com/apache/dubbo-go/pull/397)
diff --git a/README_CN.md b/README_CN.md
index d6b5dcb..be43e95 100644
--- a/README_CN.md
+++ b/README_CN.md
@@ -31,12 +31,12 @@ Dubbo-go中已实现的特性:
- **动态配置中心与服务治理配置器**: Zookeeper, [Apollo](https://github.com/apache/dubbo-go/pull/250), [Nacos](https://github.com/apache/dubbo-go/pull/357)
- **集群策略**: Failover, [Failfast](https://github.com/apache/dubbo-go/pull/140), [Failsafe/Failback](https://github.com/apache/dubbo-go/pull/136), [Available](https://github.com/apache/dubbo-go/pull/155), [Broadcast](https://github.com/apache/dubbo-go/pull/158), [Forking](https://github.com/apache/dubbo-go/pull/161)
- **负载均衡策略**: Random, [RoundRobin](https://github.com/apache/dubbo-go/pull/66), [LeastActive](https://github.com/apache/dubbo-go/pull/65), [ConsistentHash](https://github.com/apache/dubbo-go/pull/261)
-- **过滤器**: Echo Health Check, [Circuit Break and Service Downgrade](https://github.com/apache/dubbo-go/pull/133), [TokenFilter](https://github.com/apache/dubbo-go/pull/202), [AccessLogFilter](https://github.com/apache/dubbo-go/pull/214), [TpsLimitFilter](https://github.com/apache/dubbo-go/pull/237), [ExecuteLimitFilter](https://github.com/apache/dubbo-go/pull/246), [GenericServiceFilter](https://github.com/apache/dubbo-go/pull/291), [Auth/Sign](https://github.com/apache/dubbo-go/pull/323 [...]
+- [**过滤器**](./filter): Echo, Hystrix, Token, AccessLog, TpsLimiter, ExecuteLimit, Generic, Auth/Sign, Metrics, Tracing, Active, Seata, Sentinel
- **调用**: [Generic Invoke](https://github.com/apache/dubbo-go/pull/122)
- **监控**: Opentracing API, [Prometheus](https://github.com/apache/dubbo-go/pull/342)
- **Tracing**: [For JsonRPC](https://github.com/apache/dubbo-go/pull/335), [For Dubbo](https://github.com/apache/dubbo-go/pull/344), [For gRPC](https://github.com/apache/dubbo-go/pull/397)
- **元数据中心**: [Nacos(Local)](https://github.com/apache/dubbo-go/pull/522), [ZooKeeper(Local)](https://github.com/apache/dubbo-go/pull/633), [etcd(Local)](https://github.com/apache/dubbo-go/blob/9a5990d9a9c3d5e6633c0d7d926c156416bcb931/metadata/report/etcd/report.go), [Consul(Local)](https://github.com/apache/dubbo-go/pull/633), [ZooKeeper(Remoting)](https://github.com/apache/dubbo-go/pull/1161)
-- **工具**: [Dubbo-go-cli](https://github.com/apache/dubbo-go/pull/818)
+- **工具**: [Dubbo-go-cli](https://github.com/dubbogo/tools)
## 开始
diff --git a/common/constant/default.go b/common/constant/default.go
index bf9a19b..037abea 100644
--- a/common/constant/default.go
+++ b/common/constant/default.go
@@ -51,9 +51,9 @@ const (
const (
DEFAULT_KEY = "default"
PREFIX_DEFAULT_KEY = "default."
- DEFAULT_SERVICE_FILTERS = "echo,token,accesslog,tps,generic_service,execute,pshutdown"
- DEFAULT_REFERENCE_FILTERS = "cshutdown"
- GENERIC_REFERENCE_FILTERS = "generic"
+ DEFAULT_SERVICE_FILTERS = EchoFilterKey + "," + TokenFilterKey + "," + AccessLogFilterKey + "," + TpsLimitFilterKey + "," + GenericServiceFilterKey + "," + ExecuteLimitFilterKey + "," + GracefulShutdownProviderFilterKey
+ DEFAULT_REFERENCE_FILTERS = GracefulShutdownConsumerFilterKey
+ GENERIC_REFERENCE_FILTERS = GenericFilterKey
GENERIC = "$invoke"
ECHO = "$echo"
)
diff --git a/common/constant/key.go b/common/constant/key.go
index 601a5dd..61d03e9 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -60,6 +60,29 @@ const (
REFERENCE_FILTER_KEY = "reference.filter"
)
+// Filter Keys
+const (
+ AccessLogFilterKey = "accesslog"
+ ActiveFilterKey = "active"
+ AuthConsumerFilterKey = "sign"
+ AuthProviderFilterKey = "auth"
+ EchoFilterKey = "echo"
+ ExecuteLimitFilterKey = "execute"
+ GenericFilterKey = "generic"
+ GenericServiceFilterKey = "generic_service"
+ GracefulShutdownProviderFilterKey = "pshutdown"
+ GracefulShutdownConsumerFilterKey = "cshutdown"
+ HystrixConsumerFilterKey = "hystrix_consumer"
+ HystrixProviderFilterKey = "hystrix_provider"
+ MetricsFilterKey = "metrics"
+ SeataFilterKey = "seata"
+ SentinelProviderFilterKey = "sentinel-provider"
+ SentinelConsumerFilterKey = "sentinel-consumer"
+ TokenFilterKey = "token"
+ TpsLimitFilterKey = "tps"
+ TracingFilterKey = "tracing"
+)
+
const (
TIMESTAMP_KEY = "timestamp"
REMOTE_TIMESTAMP_KEY = "remote.timestamp"
@@ -74,7 +97,6 @@ const (
FORKS_KEY = "forks"
DEFAULT_FORKS = 2
DEFAULT_TIMEOUT = 1000
- ACCESS_LOG_KEY = "accesslog"
TPS_LIMITER_KEY = "tps.limiter"
TPS_REJECTED_EXECUTION_HANDLER_KEY = "tps.limit.rejected.handler"
TPS_LIMIT_RATE_KEY = "tps.limit.rate"
@@ -85,8 +107,6 @@ const (
EXECUTE_LIMIT_KEY = "execute.limit"
DEFAULT_EXECUTE_LIMIT = "-1"
EXECUTE_REJECTED_EXECUTION_HANDLER_KEY = "execute.limit.rejected.handler"
- PROVIDER_SHUTDOWN_FILTER = "pshutdown"
- CONSUMER_SHUTDOWN_FILTER = "cshutdown"
SERIALIZATION_KEY = "serialization"
PID_KEY = "pid"
SYNC_REPORT_KEY = "sync.report"
@@ -251,11 +271,8 @@ const (
AttachmentKey = DubboCtxKey("attachment")
)
+// Auth filter
const (
- // name of consumer sign filter
- CONSUMER_SIGN_FILTER = "sign"
- // name of consumer sign filter
- PROVIDER_AUTH_FILTER = "auth"
// name of service filter
SERVICE_AUTH_KEY = "auth"
// key of authenticator
diff --git a/common/extension/auth.go b/common/extension/auth.go
index 31a78a6..fd92144 100644
--- a/common/extension/auth.go
+++ b/common/extension/auth.go
@@ -23,7 +23,7 @@ import (
var (
authenticators = make(map[string]func() filter.Authenticator)
- accesskeyStorages = make(map[string]func() filter.AccessKeyStorage)
+ accessKeyStorages = make(map[string]func() filter.AccessKeyStorage)
)
// SetAuthenticator puts the @fcn into map with name
@@ -40,16 +40,16 @@ func GetAuthenticator(name string) filter.Authenticator {
return authenticators[name]()
}
-// SetAccesskeyStorages will set the @fcn into map with this name
-func SetAccesskeyStorages(name string, fcn func() filter.AccessKeyStorage) {
- accesskeyStorages[name] = fcn
+// SetAccessKeyStorages will set the @fcn into map with this name
+func SetAccessKeyStorages(name string, fcn func() filter.AccessKeyStorage) {
+ accessKeyStorages[name] = fcn
}
-// GetAccesskeyStorages finds the storage with the @name.
+// GetAccessKeyStorages finds the storage with the @name.
// Panic if not found
-func GetAccesskeyStorages(name string) filter.AccessKeyStorage {
- if accesskeyStorages[name] == nil {
- panic("accesskeyStorages for " + name + " is not existing, make sure you have import the package.")
+func GetAccessKeyStorages(name string) filter.AccessKeyStorage {
+ if accessKeyStorages[name] == nil {
+ panic("accessKeyStorages for " + name + " is not existing, make sure you have import the package.")
}
- return accesskeyStorages[name]()
+ return accessKeyStorages[name]()
}
diff --git a/config/config_loader_test.go b/config/config_loader_test.go
index 59593ef..737b6f6 100644
--- a/config/config_loader_test.go
+++ b/config/config_loader_test.go
@@ -77,10 +77,10 @@ func TestConfigLoader(t *testing.T) {
}
func TestLoad(t *testing.T) {
- extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
+ extension.SetFilter(constant.GracefulShutdownConsumerFilterKey, func() filter.Filter {
return &mockGracefulShutdownFilter{}
})
- extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
+ extension.SetFilter(constant.GracefulShutdownProviderFilterKey, func() filter.Filter {
return &mockGracefulShutdownFilter{}
})
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 51443c9..6d2b716 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -60,10 +60,10 @@ func GracefulShutdownInit() {
signal.Notify(signals, ShutdownSignals...)
// retrieve ShutdownConfig for gracefulShutdownFilter
- if filter, ok := extension.GetFilter(constant.CONSUMER_SHUTDOWN_FILTER).(Setter); ok && GetConsumerConfig().ShutdownConfig != nil {
+ if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && GetConsumerConfig().ShutdownConfig != nil {
filter.Set(GracefulShutdownFilterShutdownConfig, GetConsumerConfig().ShutdownConfig)
}
- if filter, ok := extension.GetFilter(constant.PROVIDER_SHUTDOWN_FILTER).(Setter); ok && GetProviderConfig().ShutdownConfig != nil {
+ if filter, ok := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(Setter); ok && GetProviderConfig().ShutdownConfig != nil {
filter.Set(GracefulShutdownFilterShutdownConfig, GetProviderConfig().ShutdownConfig)
}
diff --git a/config/graceful_shutdown_test.go b/config/graceful_shutdown_test.go
index 920a268..c30f6ea 100644
--- a/config/graceful_shutdown_test.go
+++ b/config/graceful_shutdown_test.go
@@ -29,10 +29,10 @@ import (
)
func TestGracefulShutdownInit(t *testing.T) {
- extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
+ extension.SetFilter(constant.GracefulShutdownConsumerFilterKey, func() filter.Filter {
return &mockGracefulShutdownFilter{}
})
- extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
+ extension.SetFilter(constant.GracefulShutdownProviderFilterKey, func() filter.Filter {
return &mockGracefulShutdownFilter{}
})
GracefulShutdownInit()
diff --git a/config/reference_config_test.go b/config/reference_config_test.go
index 9b5335a..fc76f1d 100644
--- a/config/reference_config_test.go
+++ b/config/reference_config_test.go
@@ -385,7 +385,7 @@ func (p *mockRegistryProtocol) GetRegistries() []registry.Registry {
func mockFilter() {
consumerFiler := &mockShutdownFilter{}
- extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
+ extension.SetFilter(constant.GracefulShutdownConsumerFilterKey, func() filter.Filter {
return consumerFiler
})
}
diff --git a/config/service_config.go b/config/service_config.go
index ea1a913..ebcac9a 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -304,7 +304,7 @@ func (c *ServiceConfig) getUrlMap() url.Values {
urlMap.Set(constant.SERVICE_FILTER_KEY, mergeValue(providerConfig.Filter, c.Filter, constant.DEFAULT_SERVICE_FILTERS))
// filter special config
- urlMap.Set(constant.ACCESS_LOG_KEY, c.AccessLog)
+ urlMap.Set(constant.AccessLogFilterKey, c.AccessLog)
// tps limiter
urlMap.Set(constant.TPS_LIMIT_STRATEGY_KEY, c.TpsLimitStrategy)
urlMap.Set(constant.TPS_LIMIT_INTERVAL_KEY, c.TpsLimitInterval)
diff --git a/filter/README.md b/filter/README.md
new file mode 100644
index 0000000..5b5647d
--- /dev/null
+++ b/filter/README.md
@@ -0,0 +1,38 @@
+# Filter
+
+## Getting Started
+
+Recommended Way: import what you needs, see also [dubbo-go/imports](https://github.com/dubbogo/imports).
+
+```go
+package demo
+
+// use echo and generic filters
+import _ "dubbo.apache.org/dubbo-go/v3/filter/echo"
+import _ "dubbo.apache.org/dubbo-go/v3/filter/generic"
+```
+
+Legacy way: import all filters by one line.
+
+```go
+package demo
+
+import _ "dubbo.apache.org/dubbo-go/v3/filter/filter_impl"
+```
+
+## Contents
+
+- accesslog: Access Log Filter(https://github.com/apache/dubbo-go/pull/214)
+- active
+- auth: Auth/Sign Filter(https://github.com/apache/dubbo-go/pull/323)
+- echo: Echo Health Check Filter
+- execlmt: Execute Limit Filter(https://github.com/apache/dubbo-go/pull/246)
+- generic: Generic Filter(https://github.com/apache/dubbo-go/pull/291)
+- gshutdown: Graceful Shutdown Filter
+- hystrix: Hystric Filter(https://github.com/apache/dubbo-go/pull/133)
+- metrics: Metrics Filter(https://github.com/apache/dubbo-go/pull/342)
+- seata: Seata Filter
+- sentinel: Sentinel Filter
+- token: Token Filter(https://github.com/apache/dubbo-go/pull/202)
+- tps: Tps Limit Filter(https://github.com/apache/dubbo-go/pull/237)
+- tracing: Tracing Filter(https://github.com/apache/dubbo-go/pull/335)
\ No newline at end of file
diff --git a/filter/filter_impl/access_log_filter.go b/filter/accesslog/filter.go
similarity index 75%
rename from filter/filter_impl/access_log_filter.go
rename to filter/accesslog/filter.go
index 0fe95d2..981f24c 100644
--- a/filter/filter_impl/access_log_filter.go
+++ b/filter/accesslog/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package accesslog
import (
"context"
@@ -35,7 +35,6 @@ import (
const (
// used in URL.
-
// nolint
FileDateFormat = "2006-01-02"
// nolint
@@ -54,11 +53,11 @@ const (
)
func init() {
- extension.SetFilter(constant.ACCESS_LOG_KEY, GetAccessLogFilter)
+ extension.SetFilter(constant.AccessLogFilterKey, newFilter)
}
-/*
- * AccessLogFilter
+// Filter for Access Log
+/**
* Although the access log filter is a default filter,
* you should config "accesslog" in service's config to tell the filter where store the access log.
* for example:
@@ -73,27 +72,27 @@ func init() {
* If the value is one of them, the access log will be record in log file which defined in log.yml
* AccessLogFilter is designed to be singleton
*/
-type AccessLogFilter struct {
- logChan chan AccessLogData
+type Filter struct {
+ logChan chan Data
}
// Invoke will check whether user wants to use this filter.
-// If we find the value of key constant.ACCESS_LOG_KEY, we will log the invocation info
-func (ef *AccessLogFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- accessLog := invoker.GetURL().GetParam(constant.ACCESS_LOG_KEY, "")
+// If we find the value of key constant.AccessLogFilterKey, we will log the invocation info
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+ accessLog := invoker.GetURL().GetParam(constant.AccessLogFilterKey, "")
// the user do not
if len(accessLog) > 0 {
- accessLogData := AccessLogData{data: ef.buildAccessLogData(invoker, invocation), accessLog: accessLog}
- ef.logIntoChannel(accessLogData)
+ accessLogData := Data{data: f.buildAccessLogData(invoker, invocation), accessLog: accessLog}
+ f.logIntoChannel(accessLogData)
}
return invoker.Invoke(ctx, invocation)
}
// logIntoChannel won't block the invocation
-func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
+func (f *Filter) logIntoChannel(accessLogData Data) {
select {
- case ef.logChan <- accessLogData:
+ case f.logChan <- accessLogData:
return
default:
logger.Warn("The channel is full and the access logIntoChannel data will be dropped")
@@ -102,7 +101,7 @@ func (ef *AccessLogFilter) logIntoChannel(accessLogData AccessLogData) {
}
// buildAccessLogData builds the access log data
-func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string {
+func (f *Filter) buildAccessLogData(_ protocol.Invoker, invocation protocol.Invocation) map[string]string {
dataMap := make(map[string]string, 16)
attachments := invocation.Attachments()
itf := attachments[constant.INTERFACE_KEY]
@@ -154,19 +153,19 @@ func (ef *AccessLogFilter) buildAccessLogData(_ protocol.Invoker, invocation pro
}
// OnResponse do nothing
-func (ef *AccessLogFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
+func (f *Filter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
return result
}
// writeLogToFile actually write the logs into file
-func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {
+func (f *Filter) writeLogToFile(data Data) {
accessLog := data.accessLog
if isDefault(accessLog) {
logger.Info(data.toLogMessage())
return
}
- logFile, err := ef.openLogFile(accessLog)
+ logFile, err := f.openLogFile(accessLog)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
return
@@ -186,7 +185,7 @@ func (ef *AccessLogFilter) writeLogToFile(data AccessLogData) {
// You may find out that, once we want to write access log into log file,
// we open the file again and again.
// It needs to be optimized.
-func (ef *AccessLogFilter) openLogFile(accessLog string) (*os.File, error) {
+func (f *Filter) openLogFile(accessLog string) (*os.File, error) {
logFile, err := os.OpenFile(accessLog, os.O_CREATE|os.O_APPEND|os.O_RDWR, LogFileMode)
if err != nil {
logger.Warnf("Can not open the access log file: %s, %v", accessLog, err)
@@ -221,9 +220,8 @@ func isDefault(accessLog string) bool {
return strings.EqualFold("true", accessLog) || strings.EqualFold("default", accessLog)
}
-// GetAccessLogFilter return the instance of AccessLogFilter
-func GetAccessLogFilter() filter.Filter {
- accessLogFilter := &AccessLogFilter{logChan: make(chan AccessLogData, LogMaxBuffer)}
+func newFilter() filter.Filter {
+ accessLogFilter := &Filter{logChan: make(chan Data, LogMaxBuffer)}
go func() {
for accessLogData := range accessLogFilter.logChan {
accessLogFilter.writeLogToFile(accessLogData)
@@ -232,44 +230,44 @@ func GetAccessLogFilter() filter.Filter {
return accessLogFilter
}
-// AccessLogData defines the data that will be log into file
-type AccessLogData struct {
+// Data defines the data that will be log into file
+type Data struct {
accessLog string
data map[string]string
}
-// toLogMessage convert the AccessLogData to String
-func (ef *AccessLogData) toLogMessage() string {
+// toLogMessage convert the Data to String
+func (d *Data) toLogMessage() string {
builder := strings.Builder{}
builder.WriteString("[")
- builder.WriteString(ef.data[constant.TIMESTAMP_KEY])
+ builder.WriteString(d.data[constant.TIMESTAMP_KEY])
builder.WriteString("] ")
- builder.WriteString(ef.data[constant.REMOTE_ADDR])
+ builder.WriteString(d.data[constant.REMOTE_ADDR])
builder.WriteString(" -> ")
- builder.WriteString(ef.data[constant.LOCAL_ADDR])
+ builder.WriteString(d.data[constant.LOCAL_ADDR])
builder.WriteString(" - ")
- if len(ef.data[constant.GROUP_KEY]) > 0 {
- builder.WriteString(ef.data[constant.GROUP_KEY])
+ if len(d.data[constant.GROUP_KEY]) > 0 {
+ builder.WriteString(d.data[constant.GROUP_KEY])
builder.WriteString("/")
}
- builder.WriteString(ef.data[constant.INTERFACE_KEY])
+ builder.WriteString(d.data[constant.INTERFACE_KEY])
- if len(ef.data[constant.VERSION_KEY]) > 0 {
+ if len(d.data[constant.VERSION_KEY]) > 0 {
builder.WriteString(":")
- builder.WriteString(ef.data[constant.VERSION_KEY])
+ builder.WriteString(d.data[constant.VERSION_KEY])
}
builder.WriteString(" ")
- builder.WriteString(ef.data[constant.METHOD_KEY])
+ builder.WriteString(d.data[constant.METHOD_KEY])
builder.WriteString("(")
- if len(ef.data[Types]) > 0 {
- builder.WriteString(ef.data[Types])
+ if len(d.data[Types]) > 0 {
+ builder.WriteString(d.data[Types])
}
builder.WriteString(") ")
- if len(ef.data[Arguments]) > 0 {
- builder.WriteString(ef.data[Arguments])
+ if len(d.data[Arguments]) > 0 {
+ builder.WriteString(d.data[Arguments])
}
return builder.String()
}
diff --git a/filter/filter_impl/access_log_filter_test.go b/filter/accesslog/filter_test.go
similarity index 90%
rename from filter/filter_impl/access_log_filter_test.go
rename to filter/accesslog/filter_test.go
index 3a26fae..45be9de 100644
--- a/filter/filter_impl/access_log_filter_test.go
+++ b/filter/accesslog/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package accesslog
import (
"context"
@@ -34,7 +34,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
-func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) {
+func TestFilter_Invoke_Not_Config(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
url, _ := common.NewURL(
@@ -48,12 +48,12 @@ func TestAccessLogFilter_Invoke_Not_Config(t *testing.T) {
attach := make(map[string]interface{}, 10)
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
- accessLogFilter := GetAccessLogFilter()
+ accessLogFilter := &Filter{}
result := accessLogFilter.Invoke(context.Background(), invoker, inv)
assert.Nil(t, result.Error())
}
-func TestAccessLogFilterInvokeDefaultConfig(t *testing.T) {
+func TestFilterInvokeDefaultConfig(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
url, _ := common.NewURL(
@@ -69,14 +69,14 @@ func TestAccessLogFilterInvokeDefaultConfig(t *testing.T) {
attach[constant.GROUP_KEY] = "MyGroup"
inv := invocation.NewRPCInvocation("MethodName", []interface{}{"OK", "Hello"}, attach)
- accessLogFilter := GetAccessLogFilter()
+ accessLogFilter := &Filter{}
result := accessLogFilter.Invoke(context.Background(), invoker, inv)
assert.Nil(t, result.Error())
}
-func TestAccessLogFilterOnResponse(t *testing.T) {
+func TestFilterOnResponse(t *testing.T) {
result := &protocol.RPCResult{}
- accessLogFilter := GetAccessLogFilter()
+ accessLogFilter := &Filter{}
response := accessLogFilter.OnResponse(context.TODO(), result, nil, nil)
assert.Equal(t, result, response)
}
diff --git a/filter/filter_impl/active_filter.go b/filter/active/filter.go
similarity index 78%
rename from filter/filter_impl/active_filter.go
rename to filter/active/filter.go
index d27c3a2..723a554 100644
--- a/filter/filter_impl/active_filter.go
+++ b/filter/active/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package active
import (
"context"
@@ -23,6 +23,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/filter"
@@ -31,19 +32,20 @@ import (
)
const (
- active = "active"
dubboInvokeStartTime = "dubboInvokeStartTime"
)
func init() {
- extension.SetFilter(active, GetActiveFilter)
+ extension.SetFilter(constant.ActiveFilterKey, func() filter.Filter {
+ return &Filter{}
+ })
}
-// ActiveFilter tracks the requests status
-type ActiveFilter struct{}
+// Filter tracks the requests status
+type Filter struct{}
// Invoke starts to record the requests status
-func (ef *ActiveFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking active filter. %v,%v", invocation.MethodName(), len(invocation.Arguments()))
invocation.(*invocation2.RPCInvocation).SetAttachments(dubboInvokeStartTime, strconv.FormatInt(protocol.CurrentTimeMillis(), 10))
protocol.BeginCount(invoker.GetURL(), invocation.MethodName())
@@ -51,7 +53,7 @@ func (ef *ActiveFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in
}
// OnResponse update the active count base on the request result.
-func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
startTime, err := strconv.ParseInt(invocation.(*invocation2.RPCInvocation).AttachmentsByKey(dubboInvokeStartTime, "0"), 10, 64)
if err != nil {
result.SetError(err)
@@ -62,8 +64,3 @@ func (ef *ActiveFilter) OnResponse(ctx context.Context, result protocol.Result,
protocol.EndCount(invoker.GetURL(), invocation.MethodName(), elapsed, result.Error() == nil)
return result
}
-
-// GetActiveFilter creates ActiveFilter instance
-func GetActiveFilter() filter.Filter {
- return &ActiveFilter{}
-}
diff --git a/filter/filter_impl/active_filter_test.go b/filter/active/filter_test.go
similarity index 94%
rename from filter/filter_impl/active_filter_test.go
rename to filter/active/filter_test.go
index f597ccc..9a39bb6 100644
--- a/filter/filter_impl/active_filter_test.go
+++ b/filter/active/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package active
import (
"context"
@@ -36,10 +36,10 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol/mock"
)
-func TestActiveFilterInvoke(t *testing.T) {
+func TestFilterInvoke(t *testing.T) {
invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, make(map[string]interface{}))
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
- filter := ActiveFilter{}
+ filter := Filter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
@@ -49,14 +49,14 @@ func TestActiveFilterInvoke(t *testing.T) {
assert.True(t, invoc.AttachmentsByKey(dubboInvokeStartTime, "") != "")
}
-func TestActiveFilterOnResponse(t *testing.T) {
+func TestFilterOnResponse(t *testing.T) {
c := protocol.CurrentTimeMillis()
elapsed := 100
invoc := invocation.NewRPCInvocation("test", []interface{}{"OK"}, map[string]interface{}{
dubboInvokeStartTime: strconv.FormatInt(c-int64(elapsed), 10),
})
url, _ := common.NewURL("dubbo://192.168.10.10:20000/com.ikurento.user.UserProvider")
- filter := ActiveFilter{}
+ filter := Filter{}
ctrl := gomock.NewController(t)
defer ctrl.Finish()
invoker := mock.NewMockInvoker(ctrl)
diff --git a/filter/filter_impl/auth/accesskey_storage.go b/filter/auth/accesskey_storage.go
similarity index 85%
rename from filter/filter_impl/auth/accesskey_storage.go
rename to filter/auth/accesskey_storage.go
index b5d3f17..de6c456 100644
--- a/filter/filter_impl/auth/accesskey_storage.go
+++ b/filter/auth/accesskey_storage.go
@@ -25,6 +25,12 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
+func init() {
+ extension.SetAccessKeyStorages(constant.DEFAULT_ACCESS_KEY_STORAGE, func() filter.AccessKeyStorage {
+ return &DefaultAccesskeyStorage{}
+ })
+}
+
// DefaultAccesskeyStorage is the default implementation of AccesskeyStorage
type DefaultAccesskeyStorage struct{}
@@ -35,12 +41,3 @@ func (storage *DefaultAccesskeyStorage) GetAccessKeyPair(invocation protocol.Inv
SecretKey: url.GetParam(constant.SECRET_ACCESS_KEY_KEY, ""),
}
}
-
-func init() {
- extension.SetAccesskeyStorages(constant.DEFAULT_ACCESS_KEY_STORAGE, GetDefaultAccesskeyStorage)
-}
-
-// GetDefaultAccesskeyStorage initiates an empty DefaultAccesskeyStorage
-func GetDefaultAccesskeyStorage() filter.AccessKeyStorage {
- return &DefaultAccesskeyStorage{}
-}
diff --git a/filter/filter_impl/auth/accesskey_storage_test.go b/filter/auth/accesskey_storage_test.go
similarity index 97%
rename from filter/filter_impl/auth/accesskey_storage_test.go
rename to filter/auth/accesskey_storage_test.go
index b2958ec..a8b1eb6 100644
--- a/filter/filter_impl/auth/accesskey_storage_test.go
+++ b/filter/auth/accesskey_storage_test.go
@@ -38,7 +38,7 @@ func TestDefaultAccesskeyStorage_GetAccesskeyPair(t *testing.T) {
common.WithParamsValue(constant.SECRET_ACCESS_KEY_KEY, "skey"),
common.WithParamsValue(constant.ACCESS_KEY_ID_KEY, "akey"))
invocation := &invocation2.RPCInvocation{}
- storage := GetDefaultAccesskeyStorage()
+ storage := &DefaultAccesskeyStorage{}
accesskeyPair := storage.GetAccessKeyPair(invocation, url)
assert.Equal(t, "skey", accesskeyPair.SecretKey)
assert.Equal(t, "akey", accesskeyPair.AccessKey)
diff --git a/filter/filter_impl/auth/consumer_sign.go b/filter/auth/consumer_sign_filter.go
similarity index 90%
rename from filter/filter_impl/auth/consumer_sign.go
rename to filter/auth/consumer_sign_filter.go
index 207c299..ac2bf96 100644
--- a/filter/filter_impl/auth/consumer_sign.go
+++ b/filter/auth/consumer_sign_filter.go
@@ -30,13 +30,18 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-// ConsumerSignFilter signs the request on consumer side
-type ConsumerSignFilter struct{}
-
func init() {
- extension.SetFilter(constant.CONSUMER_SIGN_FILTER, getConsumerSignFilter)
+ extension.SetFilter(constant.AuthConsumerFilterKey, func() filter.Filter {
+ return &ConsumerSignFilter{}
+ })
+ extension.SetFilter(constant.AuthProviderFilterKey, func() filter.Filter {
+ return &ProviderAuthFilter{}
+ })
}
+// ConsumerSignFilter signs the request on consumer side
+type ConsumerSignFilter struct{}
+
// Invoke retrieves the configured Authenticator to add signature to invocation
func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking ConsumerSign filter.")
@@ -55,7 +60,3 @@ func (csf *ConsumerSignFilter) Invoke(ctx context.Context, invoker protocol.Invo
func (csf *ConsumerSignFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
-
-func getConsumerSignFilter() filter.Filter {
- return &ConsumerSignFilter{}
-}
diff --git a/filter/filter_impl/auth/consumer_sign_test.go b/filter/auth/consumer_sign_filter_test.go
similarity index 100%
rename from filter/filter_impl/auth/consumer_sign_test.go
rename to filter/auth/consumer_sign_filter_test.go
diff --git a/filter/filter_impl/auth/default_authenticator.go b/filter/auth/default_authenticator.go
similarity index 93%
rename from filter/filter_impl/auth/default_authenticator.go
rename to filter/auth/default_authenticator.go
index 9bc843b..a36f1e1 100644
--- a/filter/filter_impl/auth/default_authenticator.go
+++ b/filter/auth/default_authenticator.go
@@ -22,20 +22,21 @@ import (
"fmt"
"strconv"
"time"
-
- "dubbo.apache.org/dubbo-go/v3/filter"
)
import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
invocation_impl "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
func init() {
- extension.SetAuthenticator(constant.DEFAULT_AUTHENTICATOR, GetDefaultAuthenticator)
+ extension.SetAuthenticator(constant.DEFAULT_AUTHENTICATOR, func() filter.Authenticator {
+ return &DefaultAuthenticator{}
+ })
}
// DefaultAuthenticator is the default implementation of Authenticator
@@ -109,7 +110,7 @@ func (authenticator *DefaultAuthenticator) Authenticate(invocation protocol.Invo
}
func getAccessKeyPair(invocation protocol.Invocation, url *common.URL) (*filter.AccessKeyPair, error) {
- accesskeyStorage := extension.GetAccesskeyStorages(url.GetParam(constant.ACCESS_KEY_STORAGE_KEY, constant.DEFAULT_ACCESS_KEY_STORAGE))
+ accesskeyStorage := extension.GetAccessKeyStorages(url.GetParam(constant.ACCESS_KEY_STORAGE_KEY, constant.DEFAULT_ACCESS_KEY_STORAGE))
accessKeyPair := accesskeyStorage.GetAccessKeyPair(invocation, url)
if accessKeyPair == nil || IsEmpty(accessKeyPair.AccessKey, false) || IsEmpty(accessKeyPair.SecretKey, true) {
return nil, errors.New("accessKeyId or secretAccessKey not found")
@@ -118,11 +119,6 @@ func getAccessKeyPair(invocation protocol.Invocation, url *common.URL) (*filter.
}
}
-// GetDefaultAuthenticator creates an empty DefaultAuthenticator instance
-func GetDefaultAuthenticator() filter.Authenticator {
- return &DefaultAuthenticator{}
-}
-
func doAuthWork(url *common.URL, do func(filter.Authenticator) error) error {
shouldAuth := url.GetParamBool(constant.SERVICE_AUTH_KEY, false)
if shouldAuth {
diff --git a/filter/filter_impl/auth/default_authenticator_test.go b/filter/auth/default_authenticator_test.go
similarity index 100%
rename from filter/filter_impl/auth/default_authenticator_test.go
rename to filter/auth/default_authenticator_test.go
diff --git a/filter/filter_impl/auth/provider_auth.go b/filter/auth/provider_auth_filter.go
similarity index 94%
rename from filter/filter_impl/auth/provider_auth.go
rename to filter/auth/provider_auth_filter.go
index efdf7c0..381b025 100644
--- a/filter/filter_impl/auth/provider_auth.go
+++ b/filter/auth/provider_auth_filter.go
@@ -29,13 +29,13 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-// ProviderAuthFilter verifies the correctness of the signature on provider side
-type ProviderAuthFilter struct{}
-
func init() {
- extension.SetFilter(constant.PROVIDER_AUTH_FILTER, getProviderAuthFilter)
+ extension.SetFilter(constant.AuthProviderFilterKey, newProviderAuthFilter)
}
+// ProviderAuthFilter verifies the correctness of the signature on provider side
+type ProviderAuthFilter struct{}
+
// Invoke retrieves the configured Authenticator to verify the signature in an invocation
func (paf *ProviderAuthFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking providerAuth filter.")
@@ -59,6 +59,6 @@ func (paf *ProviderAuthFilter) OnResponse(ctx context.Context, result protocol.R
return result
}
-func getProviderAuthFilter() filter.Filter {
+func newProviderAuthFilter() filter.Filter {
return &ProviderAuthFilter{}
}
diff --git a/filter/filter_impl/auth/provider_auth_test.go b/filter/auth/provider_auth_filter_test.go
similarity index 100%
rename from filter/filter_impl/auth/provider_auth_test.go
rename to filter/auth/provider_auth_filter_test.go
diff --git a/filter/filter_impl/auth/sign_util.go b/filter/auth/sign_util.go
similarity index 100%
rename from filter/filter_impl/auth/sign_util.go
rename to filter/auth/sign_util.go
diff --git a/filter/filter_impl/auth/sign_util_test.go b/filter/auth/sign_util_test.go
similarity index 100%
rename from filter/filter_impl/auth/sign_util_test.go
rename to filter/auth/sign_util_test.go
diff --git a/filter/filter_impl/echo_filter.go b/filter/echo/filter.go
similarity index 78%
rename from filter/filter_impl/echo_filter.go
rename to filter/echo/filter.go
index 14a8991..bbdbf22 100644
--- a/filter/filter_impl/echo_filter.go
+++ b/filter/echo/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package echo
import (
"context"
@@ -29,23 +29,20 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-const (
- // ECHO echo module name
- ECHO = "echo"
-)
-
func init() {
- extension.SetFilter(ECHO, GetFilter)
+ extension.SetFilter(constant.EchoFilterKey, func() filter.Filter {
+ return &Filter{}
+ })
}
-// EchoFilter health check
-// RPCService need a Echo method in consumer, if you want to use EchoFilter
+// Filter health check
+// RPCService need a Echo method in consumer, if you want to use Filter
// eg:
// Echo func(ctx context.Context, arg interface{}, rsp *Xxx) error
-type EchoFilter struct{}
+type Filter struct{}
// Invoke response to the callers with its first argument.
-func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking echo filter.")
logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments()))
if invocation.MethodName() == constant.ECHO && len(invocation.Arguments()) == 1 {
@@ -59,13 +56,8 @@ func (ef *EchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invo
}
// OnResponse dummy process, returns the result directly
-func (ef *EchoFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
+func (f *Filter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
_ protocol.Invocation) protocol.Result {
return result
}
-
-// GetFilter gets the Filter
-func GetFilter() filter.Filter {
- return &EchoFilter{}
-}
diff --git a/filter/filter_impl/echo_filter_test.go b/filter/echo/filter_test.go
similarity index 94%
rename from filter/filter_impl/echo_filter_test.go
rename to filter/echo/filter_test.go
index f96145f..bda26ca 100644
--- a/filter/filter_impl/echo_filter_test.go
+++ b/filter/echo/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package echo
import (
"context"
@@ -32,8 +32,8 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
-func TestEchoFilterInvoke(t *testing.T) {
- filter := GetFilter()
+func TestFilterInvoke(t *testing.T) {
+ filter := &Filter{}
result := filter.Invoke(context.Background(), protocol.NewBaseInvoker(&common.URL{}), invocation.NewRPCInvocation("$echo", []interface{}{"OK"}, nil))
assert.Equal(t, "OK", result.Result())
diff --git a/filter/filter_impl/execute_limit_filter.go b/filter/execlmt/filter.go
similarity index 85%
rename from filter/filter_impl/execute_limit_filter.go
rename to filter/execlmt/filter.go
index 5427b31..060e3c4 100644
--- a/filter/filter_impl/execute_limit_filter.go
+++ b/filter/execlmt/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package execlmt
import (
"context"
@@ -37,15 +37,11 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-const (
- name = "execute"
-)
-
func init() {
- extension.SetFilter(name, GetExecuteLimitFilter)
+ extension.SetFilter(constant.ExecuteLimitFilterKey, newFilter)
}
-// ExecuteLimitFilter will limit the number of in-progress request and it's thread-safe.
+// Filter will limit the number of in-progress request and it's thread-safe.
/**
* example:
* "UserProvider":
@@ -70,7 +66,7 @@ func init() {
* Sometimes we want to do something, like log the request or return default value when the request is over limitation.
* Then you can implement the RejectedExecutionHandler interface and register it by invoking SetRejectedExecutionHandler.
*/
-type ExecuteLimitFilter struct {
+type Filter struct {
executeState *concurrent.Map
}
@@ -80,7 +76,7 @@ type ExecuteState struct {
}
// Invoke judges whether the current processing requests over the threshold
-func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
methodConfigPrefix := "methods." + invocation.MethodName() + "."
ivkURL := invoker.GetURL()
limitTarget := ivkURL.ServiceKey()
@@ -105,7 +101,7 @@ func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invok
return invoker.Invoke(ctx, invocation)
}
- state, _ := ef.executeState.LoadOrStore(limitTarget, &ExecuteState{
+ state, _ := f.executeState.LoadOrStore(limitTarget, &ExecuteState{
concurrentCount: 0,
})
@@ -122,7 +118,7 @@ func (ef *ExecuteLimitFilter) Invoke(ctx context.Context, invoker protocol.Invok
}
// OnResponse dummy process, returns the result directly
-func (ef *ExecuteLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
+func (f *Filter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
return result
}
@@ -136,13 +132,13 @@ func (state *ExecuteState) decrease() {
var (
executeLimitOnce sync.Once
- executeLimitFilter *ExecuteLimitFilter
+ executeLimitFilter *Filter
)
-// GetExecuteLimitFilter returns the singleton ExecuteLimitFilter instance
-func GetExecuteLimitFilter() filter.Filter {
+// newFilter returns the singleton Filter instance
+func newFilter() filter.Filter {
executeLimitOnce.Do(func() {
- executeLimitFilter = &ExecuteLimitFilter{
+ executeLimitFilter = &Filter{
executeState: concurrent.NewMap(),
}
})
diff --git a/filter/filter_impl/execute_limit_filter_test.go b/filter/execlmt/filter_test.go
similarity index 88%
rename from filter/filter_impl/execute_limit_filter_test.go
rename to filter/execlmt/filter_test.go
index 9925756..d5ff237 100644
--- a/filter/filter_impl/execute_limit_filter_test.go
+++ b/filter/execlmt/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package execlmt
import (
"context"
@@ -34,7 +34,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
-func TestExecuteLimitFilterInvokeIgnored(t *testing.T) {
+func TestFilterInvokeIgnored(t *testing.T) {
methodName := "hello"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}))
@@ -42,14 +42,14 @@ func TestExecuteLimitFilterInvokeIgnored(t *testing.T) {
common.WithParams(url.Values{}),
common.WithParamsValue(constant.INTERFACE_KEY, methodName))
- limitFilter := GetExecuteLimitFilter()
+ limitFilter := newFilter()
result := limitFilter.Invoke(context.Background(), protocol.NewBaseInvoker(invokeUrl), invoc)
assert.NotNil(t, result)
assert.Nil(t, result.Error())
}
-func TestExecuteLimitFilterInvokeConfigureError(t *testing.T) {
+func TestFilterInvokeConfigureError(t *testing.T) {
methodName := "hello1"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}))
@@ -59,14 +59,14 @@ func TestExecuteLimitFilterInvokeConfigureError(t *testing.T) {
common.WithParamsValue(constant.EXECUTE_LIMIT_KEY, "13a"),
)
- limitFilter := GetExecuteLimitFilter()
+ limitFilter := newFilter()
result := limitFilter.Invoke(context.Background(), protocol.NewBaseInvoker(invokeUrl), invoc)
assert.NotNil(t, result)
assert.Nil(t, result.Error())
}
-func TestExecuteLimitFilterInvoke(t *testing.T) {
+func TestFilterInvoke(t *testing.T) {
methodName := "hello1"
invoc := invocation.NewRPCInvocation(methodName, []interface{}{"OK"}, make(map[string]interface{}))
@@ -76,7 +76,7 @@ func TestExecuteLimitFilterInvoke(t *testing.T) {
common.WithParamsValue(constant.EXECUTE_LIMIT_KEY, "20"),
)
- limitFilter := GetExecuteLimitFilter()
+ limitFilter := newFilter()
result := limitFilter.Invoke(context.Background(), protocol.NewBaseInvoker(invokeUrl), invoc)
assert.NotNil(t, result)
diff --git a/filter/filter_impl/import.go b/filter/filter_impl/import.go
new file mode 100644
index 0000000..8782cd4
--- /dev/null
+++ b/filter/filter_impl/import.go
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package filter_impl
+
+// This package is for being compatible with older dubbo-go, please refer to https://github.com/dubbogo/imports to see
+// the recommended import ways.
+// This package may be DEPRECATED OR REMOVED in the future.
+
+import (
+ _ "dubbo.apache.org/dubbo-go/v3/filter/accesslog"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/active"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/auth"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/echo"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/execlmt"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/generic"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/gshutdown"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/hystrix"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/metrics"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/seata"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/sentinel"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/token"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/tps"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/tracing"
+)
+
+func init() {}
diff --git a/filter/filter_impl/generic_filter.go b/filter/generic/filter.go
similarity index 89%
rename from filter/filter_impl/generic_filter.go
rename to filter/generic/filter.go
index ee618f9..df435bb 100644
--- a/filter/filter_impl/generic_filter.go
+++ b/filter/generic/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package generic
import (
"context"
@@ -36,23 +36,19 @@ import (
invocation2 "dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
-const (
- // GENERIC
- // generic module name
- GENERIC = "generic"
-)
-
func init() {
- extension.SetFilter(GENERIC, GetGenericFilter)
+ extension.SetFilter(constant.GenericFilterKey, func() filter.Filter {
+ return &Filter{}
+ })
}
// when do a generic invoke, struct need to be map
// nolint
-type GenericFilter struct{}
+type Filter struct{}
// Invoke turns the parameters to map for generic method
-func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 {
oldArguments := invocation.Arguments()
@@ -75,16 +71,11 @@ func (ef *GenericFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i
}
// OnResponse dummy process, returns the result directly
-func (ef *GenericFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
+func (f *Filter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
_ protocol.Invocation) protocol.Result {
return result
}
-// GetGenericFilter returns GenericFilter instance
-func GetGenericFilter() filter.Filter {
- return &GenericFilter{}
-}
-
func struct2MapAll(obj interface{}) interface{} {
if obj == nil {
return obj
diff --git a/filter/filter_impl/generic_filter_test.go b/filter/generic/filter_test.go
similarity index 99%
rename from filter/filter_impl/generic_filter_test.go
rename to filter/generic/filter_test.go
index 4203dd6..815a6ae 100644
--- a/filter/filter_impl/generic_filter_test.go
+++ b/filter/generic/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package generic
import (
"reflect"
diff --git a/filter/filter_impl/generic_service_filter.go b/filter/generic/service_filter.go
similarity index 86%
rename from filter/filter_impl/generic_service_filter.go
rename to filter/generic/service_filter.go
index 39f119f..1bcd982 100644
--- a/filter/filter_impl/generic_service_filter.go
+++ b/filter/generic/service_filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package generic
import (
"context"
@@ -39,21 +39,21 @@ import (
)
const (
- // GENERIC_SERVICE defines the filter name
- GENERIC_SERVICE = "generic_service"
// nolint
GENERIC_SERIALIZATION_DEFAULT = "true"
)
func init() {
- extension.SetFilter(GENERIC_SERVICE, GetGenericServiceFilter)
+ extension.SetFilter(constant.GenericServiceFilterKey, func() filter.Filter {
+ return &ServiceFilter{}
+ })
}
// nolint
-type GenericServiceFilter struct{}
+type ServiceFilter struct{}
// Invoke is used to call service method by invocation
-func (ef *GenericServiceFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *ServiceFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking generic service filter.")
logger.Debugf("generic service filter methodName:%v,args:%v", invocation.MethodName(), len(invocation.Arguments()))
@@ -115,7 +115,7 @@ func (ef *GenericServiceFilter) Invoke(ctx context.Context, invoker protocol.Inv
}
// nolint
-func (ef *GenericServiceFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *ServiceFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
if invocation.MethodName() == constant.GENERIC && len(invocation.Arguments()) == 3 && result.Result() != nil {
v := reflect.ValueOf(result.Result())
if v.Kind() == reflect.Ptr {
@@ -125,8 +125,3 @@ func (ef *GenericServiceFilter) OnResponse(ctx context.Context, result protocol.
}
return result
}
-
-// nolint
-func GetGenericServiceFilter() filter.Filter {
- return &GenericServiceFilter{}
-}
diff --git a/filter/filter_impl/generic_service_filter_test.go b/filter/generic/service_filter_test.go
similarity index 96%
rename from filter/filter_impl/generic_service_filter_test.go
rename to filter/generic/service_filter_test.go
index bc0e851..f049b27 100644
--- a/filter/filter_impl/generic_service_filter_test.go
+++ b/filter/generic/service_filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package generic
import (
"context"
@@ -99,7 +99,7 @@ func TestGenericServiceFilterInvoke(t *testing.T) {
s := &TestService{}
_, _ = common.ServiceMap.Register("com.test.Path", "testprotocol", "", "", s)
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
- filter := GetGenericServiceFilter()
+ filter := &ServiceFilter{}
url, _ := common.NewURL("testprotocol://127.0.0.1:20000/com.test.Path")
result := filter.Invoke(context.Background(), &proxy_factory.ProxyInvoker{BaseInvoker: *protocol.NewBaseInvoker(url)}, rpcInvocation)
assert.NotNil(t, result)
@@ -123,7 +123,7 @@ func TestGenericServiceFilterResponseTestStruct(t *testing.T) {
nil,
[]hessian.Object{nil},
}
- filter := GetGenericServiceFilter()
+ filter := &ServiceFilter{}
methodName := "$invoke"
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
r := filter.OnResponse(context.TODO(), result, nil, rpcInvocation)
@@ -141,7 +141,7 @@ func TestGenericServiceFilterResponseString(t *testing.T) {
nil,
[]hessian.Object{nil},
}
- filter := GetGenericServiceFilter()
+ filter := &ServiceFilter{}
methodName := "$invoke"
rpcInvocation := invocation.NewRPCInvocation(methodName, aurguments, nil)
r := filter.OnResponse(context.TODO(), result, nil, rpcInvocation)
diff --git a/filter/filter_impl/graceful_shutdown_filter.go b/filter/gshutdown/filter.go
similarity index 60%
rename from filter/filter_impl/graceful_shutdown_filter.go
rename to filter/gshutdown/filter.go
index 9da238b..1bbc197 100644
--- a/filter/filter_impl/graceful_shutdown_filter.go
+++ b/filter/gshutdown/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package gshutdown
import (
"context"
@@ -33,45 +33,44 @@ import (
func init() {
// `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded.
- extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
- return &gracefulShutdownFilter{}
+ extension.SetFilter(constant.GracefulShutdownConsumerFilterKey, func() filter.Filter {
+ return &Filter{}
})
-
- extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
- return &gracefulShutdownFilter{}
+ extension.SetFilter(constant.GracefulShutdownProviderFilterKey, func() filter.Filter {
+ return &Filter{}
})
}
-type gracefulShutdownFilter struct {
+type Filter struct {
activeCount int32
shutdownConfig *config.ShutdownConfig
}
// Invoke adds the requests count and block the new requests if application is closing
-func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- if gf.rejectNewRequest() {
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+ if f.rejectNewRequest() {
logger.Info("The application is closing, new request will be rejected.")
- return gf.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation)
+ return f.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation)
}
- atomic.AddInt32(&gf.activeCount, 1)
+ atomic.AddInt32(&f.activeCount, 1)
return invoker.Invoke(ctx, invocation)
}
// OnResponse reduces the number of active processes then return the process result
-func (gf *gracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- atomic.AddInt32(&gf.activeCount, -1)
- // although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true.
- if gf.shutdownConfig != nil && gf.shutdownConfig.RejectRequest && gf.activeCount <= 0 {
- gf.shutdownConfig.RequestsFinished = true
+func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+ atomic.AddInt32(&f.activeCount, -1)
+ // although this isn't thread safe, it won't be a problem if the f.rejectNewRequest() is true.
+ if f.shutdownConfig != nil && f.shutdownConfig.RejectRequest && f.activeCount <= 0 {
+ f.shutdownConfig.RequestsFinished = true
}
return result
}
-func (gf *gracefulShutdownFilter) Set(name string, conf interface{}) {
+func (f *Filter) Set(name string, conf interface{}) {
switch name {
case config.GracefulShutdownFilterShutdownConfig:
if shutdownConfig, ok := conf.(*config.ShutdownConfig); !ok {
- gf.shutdownConfig = shutdownConfig
+ f.shutdownConfig = shutdownConfig
return
}
logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", config.GracefulShutdownFilterShutdownConfig)
@@ -80,17 +79,17 @@ func (gf *gracefulShutdownFilter) Set(name string, conf interface{}) {
}
}
-func (gf *gracefulShutdownFilter) rejectNewRequest() bool {
- if gf.shutdownConfig == nil {
+func (f *Filter) rejectNewRequest() bool {
+ if f.shutdownConfig == nil {
return false
}
- return gf.shutdownConfig.RejectRequest
+ return f.shutdownConfig.RejectRequest
}
-func (gf *gracefulShutdownFilter) getRejectHandler() filter.RejectedExecutionHandler {
+func (f *Filter) getRejectHandler() filter.RejectedExecutionHandler {
handler := constant.DEFAULT_KEY
- if gf.shutdownConfig != nil && len(gf.shutdownConfig.RejectRequestHandler) > 0 {
- handler = gf.shutdownConfig.RejectRequestHandler
+ if f.shutdownConfig != nil && len(f.shutdownConfig.RejectRequestHandler) > 0 {
+ handler = f.shutdownConfig.RejectRequestHandler
}
return extension.GetRejectedExecutionHandler(handler)
}
diff --git a/filter/filter_impl/graceful_shutdown_filter_test.go b/filter/gshutdown/filter_test.go
similarity index 95%
rename from filter/filter_impl/graceful_shutdown_filter_test.go
rename to filter/gshutdown/filter_test.go
index 55e28e4..d8b9dc6 100644
--- a/filter/filter_impl/graceful_shutdown_filter_test.go
+++ b/filter/gshutdown/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package gshutdown
import (
"context"
@@ -42,7 +42,7 @@ func TestGenericFilterInvoke(t *testing.T) {
invoc := invocation.NewRPCInvocation("GetUser", []interface{}{"OK"}, make(map[string]interface{}))
invokeUrl := common.NewURLWithOptions(common.WithParams(url.Values{}))
- shutdownFilter := extension.GetFilter(constant.PROVIDER_SHUTDOWN_FILTER).(*gracefulShutdownFilter)
+ shutdownFilter := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(*Filter)
providerConfig := config.GetProviderConfig()
diff --git a/filter/handler/rejected_execution_handler_only_log.go b/filter/handler/rejected_execution_handler_only_log.go
index c66c827..c370fe6 100644
--- a/filter/handler/rejected_execution_handler_only_log.go
+++ b/filter/handler/rejected_execution_handler_only_log.go
@@ -19,8 +19,6 @@ package handler
import (
"sync"
-
- "dubbo.apache.org/dubbo-go/v3/filter"
)
import (
@@ -28,6 +26,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
+ "dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)
diff --git a/filter/filter_impl/hystrix_filter.go b/filter/hystrix/filter.go
similarity index 82%
rename from filter/filter_impl/hystrix_filter.go
rename to filter/hystrix/filter.go
index e12b85b..8dbdae9 100644
--- a/filter/filter_impl/hystrix_filter.go
+++ b/filter/hystrix/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package hystrix
import (
"context"
@@ -31,6 +31,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/config"
@@ -40,52 +41,48 @@ import (
const (
// nolint
- HYSTRIX_CONSUMER = "hystrix_consumer"
- // nolint
- HYSTRIX_PROVIDER = "hystrix_provider"
- // nolint
HYSTRIX = "hystrix"
)
var (
- confConsumer = &HystrixFilterConfig{}
- confProvider = &HystrixFilterConfig{}
+ confConsumer = &FilterConfig{}
+ confProvider = &FilterConfig{}
configLoadMutex = sync.RWMutex{}
consumerConfigOnce sync.Once
providerConfigOnce sync.Once
)
func init() {
- extension.SetFilter(HYSTRIX_CONSUMER, GetHystrixFilterConsumer)
- extension.SetFilter(HYSTRIX_PROVIDER, GetHystrixFilterProvider)
+ extension.SetFilter(constant.HystrixConsumerFilterKey, newFilterConsumer)
+ extension.SetFilter(constant.HystrixProviderFilterKey, newFilterProvider)
}
-// HystrixFilterError implements error interface
-type HystrixFilterError struct {
+// FilterError implements error interface
+type FilterError struct {
err error
failByHystrix bool
}
-func (hfError *HystrixFilterError) Error() string {
+func (hfError *FilterError) Error() string {
return hfError.err.Error()
}
// FailByHystrix returns whether the fails causing by Hystrix
-func (hfError *HystrixFilterError) FailByHystrix() bool {
+func (hfError *FilterError) FailByHystrix() bool {
return hfError.failByHystrix
}
-// NewHystrixFilterError return a HystrixFilterError instance
+// NewHystrixFilterError return a FilterError instance
func NewHystrixFilterError(err error, failByHystrix bool) error {
- return &HystrixFilterError{
+ return &FilterError{
err: err,
failByHystrix: failByHystrix,
}
}
+// Filter for Hystrix
/**
- * HystrixFilter
- * You should add hystrix related configuration in provider or consumer config or both, according to which side you are to apply HystrixFilter.
+ * You should add hystrix related configuration in provider or consumer config or both, according to which side you are to apply Filter.
* For example:
* filter_conf:
* hystrix:
@@ -124,29 +121,29 @@ func NewHystrixFilterError(err error, failByHystrix bool) error {
* "GetUser": "userp_m"
* "GetUser1": "userp_m"
*/
-type HystrixFilter struct {
+type Filter struct {
COrP bool // true for consumer
res map[string][]*regexp.Regexp
ifNewMap sync.Map
}
// Invoke is an implementation of filter, provides Hystrix pattern latency and fault tolerance
-func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
cmdName := fmt.Sprintf("%s&method=%s", invoker.GetURL().Key(), invocation.MethodName())
// Do the configuration if the circuit breaker is created for the first time
- if _, load := hf.ifNewMap.LoadOrStore(cmdName, true); !load {
+ if _, load := f.ifNewMap.LoadOrStore(cmdName, true); !load {
configLoadMutex.Lock()
- filterConf := getConfig(invoker.GetURL().Service(), invocation.MethodName(), hf.COrP)
+ filterConf := getConfig(invoker.GetURL().Service(), invocation.MethodName(), f.COrP)
for _, ptn := range filterConf.Error {
reg, err := regexp.Compile(ptn)
if err != nil {
logger.Warnf("[Hystrix Filter]Errors occurred parsing error omit regexp: %s, %v", ptn, err)
} else {
- if hf.res == nil {
- hf.res = make(map[string][]*regexp.Regexp)
+ if f.res == nil {
+ f.res = make(map[string][]*regexp.Regexp)
}
- hf.res[invocation.MethodName()] = append(hf.res[invocation.MethodName()], reg)
+ f.res[invocation.MethodName()] = append(f.res[invocation.MethodName()], reg)
}
}
hystrix.ConfigureCommand(cmdName, hystrix.CommandConfig{
@@ -172,7 +169,7 @@ func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i
err := result.Error()
if err != nil {
result.SetError(NewHystrixFilterError(err, false))
- for _, reg := range hf.res[invocation.MethodName()] {
+ for _, reg := range f.res[invocation.MethodName()] {
if reg.MatchString(err.Error()) {
logger.Debugf("[Hystrix Filter]Error in invocation but omitted in circuit breaker: %v; %s", err, cmdName)
return nil
@@ -193,34 +190,34 @@ func (hf *HystrixFilter) Invoke(ctx context.Context, invoker protocol.Invoker, i
}
// OnResponse dummy process, returns the result directly
-func (hf *HystrixFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
-// GetHystrixFilterConsumer returns HystrixFilter instance for consumer
-func GetHystrixFilterConsumer() filter.Filter {
+// newFilterConsumer returns Filter instance for consumer
+func newFilterConsumer() filter.Filter {
// When first called, load the config in
consumerConfigOnce.Do(func() {
- if err := initHystrixConfigConsumer(); err != nil {
+ if err := initConfigConsumer(); err != nil {
logger.Warnf("[Hystrix Filter]Config load failed for consumer, error is: %v , will use default", err)
}
})
- return &HystrixFilter{COrP: true}
+ return &Filter{COrP: true}
}
-// GetHystrixFilterProvider returns HystrixFilter instance for provider
-func GetHystrixFilterProvider() filter.Filter {
+// newFilterProvider returns Filter instance for provider
+func newFilterProvider() filter.Filter {
providerConfigOnce.Do(func() {
- if err := initHystrixConfigProvider(); err != nil {
+ if err := initConfigProvider(); err != nil {
logger.Warnf("[Hystrix Filter]Config load failed for provider, error is: %v , will use default", err)
}
})
- return &HystrixFilter{COrP: false}
+ return &Filter{COrP: false}
}
func getConfig(service string, method string, cOrP bool) CommandConfigWithError {
// Find method level config
- var conf *HystrixFilterConfig
+ var conf *FilterConfig
if cOrP {
conf = confConsumer
} else {
@@ -248,7 +245,7 @@ func getConfig(service string, method string, cOrP bool) CommandConfigWithError
return *getConf
}
-func initHystrixConfigConsumer() error {
+func initConfigConsumer() error {
if config.GetConsumerConfig().FilterConf == nil {
return perrors.Errorf("no config for hystrix_consumer")
}
@@ -267,7 +264,7 @@ func initHystrixConfigConsumer() error {
return nil
}
-func initHystrixConfigProvider() error {
+func initConfigProvider() error {
if config.GetProviderConfig().FilterConf == nil {
return perrors.Errorf("no config for hystrix_provider")
}
@@ -288,7 +285,7 @@ func initHystrixConfigProvider() error {
//For sake of dynamic config
//func RefreshHystrix() error {
-// conf = &HystrixFilterConfig{}
+// conf = &FilterConfig{}
// hystrix.Flush()
// return initHystrixConfig()
//}
@@ -312,7 +309,7 @@ type CommandConfigWithError struct {
//See hystrix doc
// nolint
-type HystrixFilterConfig struct {
+type FilterConfig struct {
Configs map[string]*CommandConfigWithError
Default string
Services map[string]ServiceHystrixConfig
diff --git a/filter/filter_impl/hystrix_filter_test.go b/filter/hystrix/filter_test.go
similarity index 91%
rename from filter/filter_impl/hystrix_filter_test.go
rename to filter/hystrix/filter_test.go
index 0bf78e7..fdf5e76 100644
--- a/filter/filter_impl/hystrix_filter_test.go
+++ b/filter/hystrix/filter_test.go
@@ -14,7 +14,7 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package filter_impl
+package hystrix
import (
"context"
@@ -42,13 +42,13 @@ func init() {
func TestNewHystrixFilterError(t *testing.T) {
get := NewHystrixFilterError(errors.New("test"), true)
- assert.True(t, get.(*HystrixFilterError).FailByHystrix())
+ assert.True(t, get.(*FilterError).FailByHystrix())
assert.Equal(t, "test", get.Error())
}
func mockInitHystrixConfig() {
// Mock config
- confConsumer = &HystrixFilterConfig{
+ confConsumer = &FilterConfig{
make(map[string]*CommandConfigWithError),
"Default",
make(map[string]ServiceHystrixConfig),
@@ -88,7 +88,7 @@ func mockInitHystrixConfig() {
}
func TestGetHystrixFilter(t *testing.T) {
- filterGot := GetHystrixFilterConsumer()
+ filterGot := newFilterConsumer()
assert.NotNil(t, filterGot)
}
@@ -148,7 +148,7 @@ func (iv *testMockFailInvoker) Invoke(_ context.Context, _ protocol.Invocation)
}
func TestHystrixFilterInvokeSuccess(t *testing.T) {
- hf := &HystrixFilter{}
+ hf := &Filter{}
testUrl, err := common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
assert.NoError(t, err)
@@ -160,7 +160,7 @@ func TestHystrixFilterInvokeSuccess(t *testing.T) {
}
func TestHystrixFilterInvokeFail(t *testing.T) {
- hf := &HystrixFilter{}
+ hf := &Filter{}
testUrl, err := common.NewURL(
fmt.Sprintf("dubbo://%s:%d/com.ikurento.user.UserProvider", constant.LOCAL_HOST_VALUE, constant.DEFAULT_PORT))
assert.NoError(t, err)
@@ -173,7 +173,7 @@ func TestHystrixFilterInvokeFail(t *testing.T) {
func TestHystricFilterInvokeCircuitBreak(t *testing.T) {
mockInitHystrixConfig()
hystrix.Flush()
- hf := &HystrixFilter{COrP: true}
+ hf := &Filter{COrP: true}
resChan := make(chan protocol.Result, 50)
for i := 0; i < 50; i++ {
go func() {
@@ -189,7 +189,7 @@ func TestHystricFilterInvokeCircuitBreak(t *testing.T) {
//var lastRest bool
//for i := 0; i < 50; i++ {
- // lastRest = (<-resChan).Error().(*HystrixFilterError).FailByHystrix()
+ // lastRest = (<-resChan).Error().(*FilterError).FailByHystrix()
//}
//Normally the last result should be true, which means the circuit has been opened
//
@@ -201,7 +201,7 @@ func TestHystricFilterInvokeCircuitBreakOmitException(t *testing.T) {
hystrix.Flush()
reg, _ := regexp.Compile(".*exception.*")
regs := []*regexp.Regexp{reg}
- hf := &HystrixFilter{res: map[string][]*regexp.Regexp{"": regs}, COrP: true}
+ hf := &Filter{res: map[string][]*regexp.Regexp{"": regs}, COrP: true}
resChan := make(chan protocol.Result, 50)
for i := 0; i < 50; i++ {
go func() {
@@ -218,20 +218,20 @@ func TestHystricFilterInvokeCircuitBreakOmitException(t *testing.T) {
//time.Sleep(time.Second * 6)
//var lastRest bool
//for i := 0; i < 50; i++ {
- // lastRest = (<-resChan).Error().(*HystrixFilterError).FailByHystrix()
+ // lastRest = (<-resChan).Error().(*FilterError).FailByHystrix()
//}
//
//assert.False(t, lastRest)
}
func TestGetHystrixFilterConsumer(t *testing.T) {
- get := GetHystrixFilterConsumer()
+ get := newFilterConsumer()
assert.NotNil(t, get)
- assert.True(t, get.(*HystrixFilter).COrP)
+ assert.True(t, get.(*Filter).COrP)
}
func TestGetHystrixFilterProvider(t *testing.T) {
- get := GetHystrixFilterProvider()
+ get := newFilterProvider()
assert.NotNil(t, get)
- assert.False(t, get.(*HystrixFilter).COrP)
+ assert.False(t, get.(*Filter).COrP)
}
diff --git a/filter/filter_impl/metrics_filter.go b/filter/metrics/filter.go
similarity index 77%
rename from filter/filter_impl/metrics_filter.go
rename to filter/metrics/filter.go
index 525e976..9da3a33 100644
--- a/filter/filter_impl/metrics_filter.go
+++ b/filter/metrics/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package metrics
import (
"context"
@@ -23,6 +23,7 @@ import (
)
import (
+ "dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/config"
"dubbo.apache.org/dubbo-go/v3/filter"
@@ -30,18 +31,14 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-const (
- metricFilterName = "metrics"
-)
-
-var metricFilterInstance filter.Filter
-
// must initialized before using the filter and after loading configuration
+var metricFilterInstance *Filter
+
func init() {
- extension.SetFilter(metricFilterName, newMetricsFilter)
+ extension.SetFilter(constant.MetricsFilterKey, newFilter)
}
-// metricFilter will calculate the invocation's duration and the report to the reporters
+// Filter will calculate the invocation's duration and the report to the reporters
// If you want to use this filter to collect the metrics,
// Adding this into your configuration file, like:
// filter: "metrics"
@@ -49,12 +46,12 @@ func init() {
// reporter:
// - "your reporter" # here you should specify the reporter, for example 'prometheus'
// more info please take a look at dubbo-samples projects
-type metricsFilter struct {
+type Filter struct {
reporters []metrics.Reporter
}
// Invoke collect the duration of invocation and then report the duration by using goroutine
-func (p *metricsFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (p *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
start := time.Now()
res := invoker.Invoke(ctx, invocation)
end := time.Now()
@@ -68,24 +65,23 @@ func (p *metricsFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in
}
// OnResponse do nothing and return the result
-func (p *metricsFilter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (p *Filter) OnResponse(ctx context.Context, res protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return res
}
-// newMetricsFilter the metricsFilter is singleton.
+// newFilter the Filter is singleton.
// it's lazy initialization
// make sure that the configuration had been loaded before invoking this method.
-func newMetricsFilter() filter.Filter {
+func newFilter() filter.Filter {
if metricFilterInstance == nil {
reporterNames := config.GetMetricConfig().Reporters
reporters := make([]metrics.Reporter, 0, len(reporterNames))
for _, name := range reporterNames {
reporters = append(reporters, extension.GetMetricReporter(name))
}
- metricFilterInstance = &metricsFilter{
+ metricFilterInstance = &Filter{
reporters: reporters,
}
}
-
return metricFilterInstance
}
diff --git a/filter/filter_impl/metrics_filter_test.go b/filter/metrics/filter_test.go
similarity index 97%
rename from filter/filter_impl/metrics_filter_test.go
rename to filter/metrics/filter_test.go
index 8b1cccb..f4b3440 100644
--- a/filter/filter_impl/metrics_filter_test.go
+++ b/filter/metrics/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package metrics
import (
"context"
@@ -46,7 +46,7 @@ func TestMetricsFilterInvoke(t *testing.T) {
return mk
})
- instance := extension.GetFilter(metricFilterName)
+ instance := newFilter()
url, _ := common.NewURL(
"dubbo://:20000/UserProvider?app.version=0.0.1&application=BDTService&bean.name=UserProvider" +
diff --git a/filter/filter_impl/seata_filter.go b/filter/seata/filter.go
similarity index 73%
rename from filter/filter_impl/seata_filter.go
rename to filter/seata/filter.go
index 15a3fb9..7f44104 100644
--- a/filter/filter_impl/seata_filter.go
+++ b/filter/seata/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package seata
import (
"context"
@@ -31,20 +31,21 @@ import (
)
const (
- SEATA = constant.DubboCtxKey("seata")
SEATA_XID = constant.DubboCtxKey("SEATA_XID")
)
func init() {
- extension.SetFilter(string(SEATA), getSeataFilter)
+ extension.SetFilter(constant.SeataFilterKey, func() filter.Filter {
+ return &Filter{}
+ })
}
-// SeataFilter when use seata-golang, use this filter to transfer xid
-type SeataFilter struct{}
+// Filter when use seata-golang, use this filter to transfer xid
+type Filter struct{}
// When use Seata, transfer xid by attachments
// Invoke Get Xid by attachment key `SEATA_XID`
-func (sf *SeataFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking seata filter.")
xid := invocation.AttachmentsByKey(string(SEATA_XID), "")
if strings.TrimSpace(xid) != "" {
@@ -55,11 +56,6 @@ func (sf *SeataFilter) Invoke(ctx context.Context, invoker protocol.Invoker, inv
}
// OnResponse dummy process, returns the result directly
-func (sf *SeataFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
-
-// getSeataFilter create SeataFilter instance
-func getSeataFilter() filter.Filter {
- return &SeataFilter{}
-}
diff --git a/filter/filter_impl/seata_filter_test.go b/filter/seata/filter_test.go
similarity index 97%
rename from filter/filter_impl/seata_filter_test.go
rename to filter/seata/filter_test.go
index 4094e57..511ba67 100644
--- a/filter/filter_impl/seata_filter_test.go
+++ b/filter/seata/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package seata
import (
"context"
@@ -47,7 +47,7 @@ func (iv *testMockSeataInvoker) Invoke(ctx context.Context, _ protocol.Invocatio
}
func TestSeataFilter_Invoke(t *testing.T) {
- filter := getSeataFilter()
+ filter := &Filter{}
result := filter.Invoke(context.Background(), &testMockSeataInvoker{}, invocation.NewRPCInvocation("$echo",
[]interface{}{"OK"}, map[string]interface{}{
string(SEATA_XID): "10.30.21.227:8091:2000047792",
diff --git a/filter/filter_impl/sentinel_filter.go b/filter/sentinel/filter.go
similarity index 86%
rename from filter/filter_impl/sentinel_filter.go
rename to filter/sentinel/filter.go
index d983170..474ad2e 100644
--- a/filter/filter_impl/sentinel_filter.go
+++ b/filter/sentinel/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package sentinel
import (
"context"
@@ -44,9 +44,12 @@ import (
// 2. Register rules for resources user want to guard
func init() {
- extension.SetFilter(SentinelProviderFilterName, GetSentinelProviderFilter)
- extension.SetFilter(SentinelConsumerFilterName, GetSentinelConsumerFilter)
-
+ extension.SetFilter(constant.SentinelConsumerFilterKey, func() filter.Filter {
+ return &ConsumerFilter{}
+ })
+ extension.SetFilter(constant.SentinelProviderFilterKey, func() filter.Filter {
+ return &ProviderFilter{}
+ })
if err := logging.ResetGlobalLogger(DubboLoggerWrapper{Logger: logger.GetLogger()}); err != nil {
logger.Errorf("[Sentinel Filter] fail to ingest dubbo logger into sentinel")
}
@@ -88,14 +91,6 @@ func (d DubboLoggerWrapper) ErrorEnabled() bool {
return true
}
-func GetSentinelConsumerFilter() filter.Filter {
- return &SentinelConsumerFilter{}
-}
-
-func GetSentinelProviderFilter() filter.Filter {
- return &SentinelProviderFilter{}
-}
-
func sentinelExit(ctx context.Context, result protocol.Result) {
if methodEntry := ctx.Value(MethodEntryKey); methodEntry != nil {
e := methodEntry.(*base.SentinelEntry)
@@ -109,9 +104,9 @@ func sentinelExit(ctx context.Context, result protocol.Result) {
}
}
-type SentinelProviderFilter struct{}
+type ProviderFilter struct{}
-func (d *SentinelProviderFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (d *ProviderFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getProviderPrefix())
var (
@@ -138,14 +133,14 @@ func (d *SentinelProviderFilter) Invoke(ctx context.Context, invoker protocol.In
return invoker.Invoke(ctx, invocation)
}
-func (d *SentinelProviderFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
+func (d *ProviderFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
sentinelExit(ctx, result)
return result
}
-type SentinelConsumerFilter struct{}
+type ConsumerFilter struct{}
-func (d *SentinelConsumerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (d *ConsumerFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
interfaceResourceName, methodResourceName := getResourceName(invoker, invocation, getConsumerPrefix())
var (
interfaceEntry *base.SentinelEntry
@@ -171,7 +166,7 @@ func (d *SentinelConsumerFilter) Invoke(ctx context.Context, invoker protocol.In
return invoker.Invoke(ctx, invocation)
}
-func (d *SentinelConsumerFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
+func (d *ConsumerFilter) OnResponse(ctx context.Context, result protocol.Result, _ protocol.Invoker, _ protocol.Invocation) protocol.Result {
sentinelExit(ctx, result)
return result
}
@@ -201,9 +196,6 @@ func getDefaultDubboFallback() DubboFallback {
}
const (
- SentinelProviderFilterName = "sentinel-provider"
- SentinelConsumerFilterName = "sentinel-consumer"
-
DefaultProviderPrefix = "dubbo:provider:"
DefaultConsumerPrefix = "dubbo:consumer:"
diff --git a/filter/filter_impl/sentinel_filter_test.go b/filter/sentinel/filter_test.go
similarity index 97%
rename from filter/filter_impl/sentinel_filter_test.go
rename to filter/sentinel/filter_test.go
index 9481dd9..20710f1 100644
--- a/filter/filter_impl/sentinel_filter_test.go
+++ b/filter/sentinel/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package sentinel
import (
"context"
@@ -62,7 +62,7 @@ func TestSentinelFilter_QPS(t *testing.T) {
wg := sync.WaitGroup{}
wg.Add(10)
- f := GetSentinelProviderFilter()
+ f := &ProviderFilter{}
pass := int64(0)
block := int64(0)
for i := 0; i < 10; i++ {
@@ -84,7 +84,7 @@ func TestSentinelFilter_QPS(t *testing.T) {
}
func TestConsumerFilter_Invoke(t *testing.T) {
- f := GetSentinelConsumerFilter()
+ f := &ConsumerFilter{}
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
@@ -98,7 +98,7 @@ func TestConsumerFilter_Invoke(t *testing.T) {
}
func TestProviderFilter_Invoke(t *testing.T) {
- f := GetSentinelProviderFilter()
+ f := &ProviderFilter{}
url, err := common.NewURL("dubbo://127.0.0.1:20000/UserProvider?anyhost=true&" +
"application=BDTService&category=providers&default.timeout=10000&dubbo=dubbo-provider-golang-1.0.0&" +
"environment=dev&interface=com.ikurento.user.UserProvider&ip=192.168.56.1&methods=GetUser%2C&" +
diff --git a/filter/filter_impl/token_filter.go b/filter/token/filter.go
similarity index 76%
rename from filter/filter_impl/token_filter.go
rename to filter/token/filter.go
index 52de6c3..7362c2c 100644
--- a/filter/filter_impl/token_filter.go
+++ b/filter/token/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package token
import (
"context"
@@ -33,20 +33,17 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-const (
- // nolint
- TOKEN = "token"
-)
-
func init() {
- extension.SetFilter(TOKEN, GetTokenFilter)
+ extension.SetFilter(constant.TokenFilterKey, func() filter.Filter {
+ return &Filter{}
+ })
}
-// TokenFilter will verify if the token is valid
-type TokenFilter struct{}
+// Filter will verify if the token is valid
+type Filter struct{}
// Invoke verifies the incoming token with the service configured token
-func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
invokerTkn := invoker.GetURL().GetParam(constant.TOKEN_KEY, "")
if len(invokerTkn) > 0 {
attachs := invocation.Attachments()
@@ -62,11 +59,6 @@ func (tf *TokenFilter) Invoke(ctx context.Context, invoker protocol.Invoker, inv
}
// OnResponse dummy process, returns the result directly
-func (tf *TokenFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
-
-// nolint
-func GetTokenFilter() filter.Filter {
- return &TokenFilter{}
-}
diff --git a/filter/filter_impl/token_filter_test.go b/filter/token/filter_test.go
similarity index 95%
rename from filter/filter_impl/token_filter_test.go
rename to filter/token/filter_test.go
index efbe443..ddc44c2 100644
--- a/filter/filter_impl/token_filter_test.go
+++ b/filter/token/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package token
import (
"context"
@@ -35,7 +35,7 @@ import (
)
func TestTokenFilterInvoke(t *testing.T) {
- filter := GetTokenFilter()
+ filter := &Filter{}
url := common.NewURLWithOptions(
common.WithParams(url.Values{}),
@@ -51,7 +51,7 @@ func TestTokenFilterInvoke(t *testing.T) {
}
func TestTokenFilterInvokeEmptyToken(t *testing.T) {
- filter := GetTokenFilter()
+ filter := &Filter{}
testUrl := common.URL{}
attch := make(map[string]interface{})
@@ -62,7 +62,7 @@ func TestTokenFilterInvokeEmptyToken(t *testing.T) {
}
func TestTokenFilterInvokeEmptyAttach(t *testing.T) {
- filter := GetTokenFilter()
+ filter := &Filter{}
testUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
@@ -73,7 +73,7 @@ func TestTokenFilterInvokeEmptyAttach(t *testing.T) {
}
func TestTokenFilterInvokeNotEqual(t *testing.T) {
- filter := GetTokenFilter()
+ filter := &Filter{}
testUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
diff --git a/filter/filter_impl/tps_limit_filter.go b/filter/tps/filter.go
similarity index 76%
rename from filter/filter_impl/tps_limit_filter.go
rename to filter/tps/filter.go
index a2c63d1..32a31ae 100644
--- a/filter/filter_impl/tps_limit_filter.go
+++ b/filter/tps/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package tps
import (
"context"
@@ -26,21 +26,18 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/extension"
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/filter"
- _ "dubbo.apache.org/dubbo-go/v3/filter/filter_impl/tps"
_ "dubbo.apache.org/dubbo-go/v3/filter/handler"
+ _ "dubbo.apache.org/dubbo-go/v3/filter/tps/limiter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-const (
- // TpsLimitFilterKey key
- TpsLimitFilterKey = "tps"
-)
-
func init() {
- extension.SetFilter(TpsLimitFilterKey, GetTpsLimitFilter)
+ extension.SetFilter(constant.TpsLimitFilterKey, func() filter.Filter {
+ return &Filter{}
+ })
}
-// TpsLimitFilter filters the requests by TPS
+// Filter filters the requests by TPS
/**
* if you wish to use the TpsLimiter, please add the configuration into your service provider configuration:
* for example:
@@ -50,14 +47,14 @@ func init() {
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
* tps.limiter: "method-service", # it should be the name of limiter. if the value is 'default',
- * # the MethodServiceTpsLimiterImpl will be used.
+ * # the MethodServiceTpsLimiter will be used.
* tps.limit.rejected.handler: "default", # optional, or the name of the implementation
* if the value of 'tps.limiter' is nil or empty string, the tps filter will do nothing
*/
-type TpsLimitFilter struct{}
+type Filter struct{}
// Invoke gets the configured limter to impose TPS limiting
-func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (t *Filter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
url := invoker.GetURL()
tpsLimiter := url.GetParam(constant.TPS_LIMITER_KEY, "")
rejectedExeHandler := url.GetParam(constant.TPS_REJECTED_EXECUTION_HANDLER_KEY, constant.DEFAULT_KEY)
@@ -66,19 +63,14 @@ func (t TpsLimitFilter) Invoke(ctx context.Context, invoker protocol.Invoker, in
if allow {
return invoker.Invoke(ctx, invocation)
}
- logger.Errorf("The invocation was rejected due to over the tps limitation, url: %s ", url.String())
+ logger.Errorf("The invocation was rejected due to over the limiter limitation, url: %s ", url.String())
return extension.GetRejectedExecutionHandler(rejectedExeHandler).RejectedExecution(url, invocation)
}
return invoker.Invoke(ctx, invocation)
}
// OnResponse dummy process, returns the result directly
-func (t TpsLimitFilter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
+func (t *Filter) OnResponse(_ context.Context, result protocol.Result, _ protocol.Invoker,
_ protocol.Invocation) protocol.Result {
return result
}
-
-// GetTpsLimitFilter returns an TpsLimitFilter instance.
-func GetTpsLimitFilter() filter.Filter {
- return &TpsLimitFilter{}
-}
diff --git a/filter/filter_impl/tps_limit_filter_test.go b/filter/tps/filter_test.go
similarity index 89%
rename from filter/filter_impl/tps_limit_filter_test.go
rename to filter/tps/filter_test.go
index e9f8660..3c8cce5 100644
--- a/filter/filter_impl/tps_limit_filter_test.go
+++ b/filter/tps/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package tps
import (
"context"
@@ -24,9 +24,6 @@ import (
)
import (
- "dubbo.apache.org/dubbo-go/v3/filter"
- "dubbo.apache.org/dubbo-go/v3/filter/filter_impl/tps"
- common2 "dubbo.apache.org/dubbo-go/v3/filter/handler"
"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"
)
@@ -35,12 +32,15 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/filter"
+ "dubbo.apache.org/dubbo-go/v3/filter/handler"
+ "dubbo.apache.org/dubbo-go/v3/filter/tps/limiter"
"dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
func TestTpsLimitFilterInvokeWithNoTpsLimiter(t *testing.T) {
- tpsFilter := GetTpsLimitFilter()
+ tpsFilter := &Filter{}
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TPS_LIMITER_KEY, ""))
@@ -57,13 +57,13 @@ func TestTpsLimitFilterInvokeWithNoTpsLimiter(t *testing.T) {
func TestGenericFilterInvokeWithDefaultTpsLimiter(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
- mockLimiter := tps.NewMockTpsLimiter(ctrl)
+ mockLimiter := limiter.NewMockTpsLimiter(ctrl)
mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(true).Times(1)
extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter {
return mockLimiter
})
- tpsFilter := GetTpsLimitFilter()
+ tpsFilter := &Filter{}
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY))
@@ -80,21 +80,21 @@ func TestGenericFilterInvokeWithDefaultTpsLimiter(t *testing.T) {
func TestGenericFilterInvokeWithDefaultTpsLimiterNotAllow(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
- mockLimiter := tps.NewMockTpsLimiter(ctrl)
+ mockLimiter := limiter.NewMockTpsLimiter(ctrl)
mockLimiter.EXPECT().IsAllowable(gomock.Any(), gomock.Any()).Return(false).Times(1)
extension.SetTpsLimiter(constant.DEFAULT_KEY, func() filter.TpsLimiter {
return mockLimiter
})
mockResult := &protocol.RPCResult{}
- mockRejectedHandler := common2.NewMockRejectedExecutionHandler(ctrl)
+ mockRejectedHandler := handler.NewMockRejectedExecutionHandler(ctrl)
mockRejectedHandler.EXPECT().RejectedExecution(gomock.Any(), gomock.Any()).Return(mockResult).Times(1)
extension.SetRejectedExecutionHandler(constant.DEFAULT_KEY, func() filter.RejectedExecutionHandler {
return mockRejectedHandler
})
- tpsFilter := GetTpsLimitFilter()
+ tpsFilter := &Filter{}
invokeUrl := common.NewURLWithOptions(
common.WithParams(url.Values{}),
common.WithParamsValue(constant.TPS_LIMITER_KEY, constant.DEFAULT_KEY))
diff --git a/filter/filter_impl/tps/tps_limiter_method_service.go b/filter/tps/limiter/method_service.go
similarity index 91%
rename from filter/filter_impl/tps/tps_limiter_method_service.go
rename to filter/tps/limiter/method_service.go
index e1ad563..ff3817c 100644
--- a/filter/filter_impl/tps/tps_limiter_method_service.go
+++ b/filter/tps/limiter/method_service.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package tps
+package limiter
import (
"fmt"
@@ -44,7 +44,7 @@ func init() {
extension.SetTpsLimiter(name, GetMethodServiceTpsLimiter)
}
-// MethodServiceTpsLimiterImpl allows developer to config both method-level and service-level tps limiter.
+// MethodServiceTpsLimiter allows developer to config both method-level and service-level tps limiter.
/**
* for example:
* "UserProvider":
@@ -52,7 +52,7 @@ func init() {
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
- * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too.
+ * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiter. It's the default limiter too.
* tps.limit.interval: 5000 # interval, the time unit is ms
* tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited.
* methods:
@@ -70,7 +70,7 @@ func init() {
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
- * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too.
+ * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiter. It's the default limiter too.
* tps.limit.interval: 5000 # interval, the time unit is ms
* tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited.
* methods:
@@ -86,7 +86,7 @@ func init() {
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
- * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too.
+ * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiter. It's the default limiter too.
* tps.limit.interval: 5000 # interval, the time unit is ms
* tps.limit.rate: 300 # the max value in the interval. <0 means that the service will not be limited.
* methods:
@@ -102,7 +102,7 @@ func init() {
* protocol : "dubbo"
* interface : "com.ikurento.user.UserProvider"
* ... # other configuration
- * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiterImpl. It's the default limiter too.
+ * tps.limiter: "method-service" or "default" # the name of MethodServiceTpsLimiter. It's the default limiter too.
* methods:
* - name: "GetUser"
* - name: "UpdateUser"
@@ -110,7 +110,7 @@ func init() {
* tps.limit.interval: 40000
* In this case, only UpdateUser will be limited by its configuration (70 times in 40000ms)
*/
-type MethodServiceTpsLimiterImpl struct {
+type MethodServiceTpsLimiter struct {
tpsState *concurrent.Map
}
@@ -120,7 +120,7 @@ type MethodServiceTpsLimiterImpl struct {
// The key point is how to keep thread-safe
// This implementation use concurrent map + loadOrStore to make implementation thread-safe
// You can image that even multiple threads create limiter, but only one could store the limiter into tpsState
-func (limiter MethodServiceTpsLimiterImpl) IsAllowable(url *common.URL, invocation protocol.Invocation) bool {
+func (limiter MethodServiceTpsLimiter) IsAllowable(url *common.URL, invocation protocol.Invocation) bool {
methodConfigPrefix := "methods." + invocation.MethodName() + "."
methodLimitRateConfig := url.GetParam(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "")
@@ -199,14 +199,14 @@ func getLimitConfig(methodLevelConfig string,
}
var (
- methodServiceTpsLimiterInstance *MethodServiceTpsLimiterImpl
+ methodServiceTpsLimiterInstance *MethodServiceTpsLimiter
methodServiceTpsLimiterOnce sync.Once
)
-// GetMethodServiceTpsLimiter will return an MethodServiceTpsLimiterImpl instance.
+// GetMethodServiceTpsLimiter will return an MethodServiceTpsLimiter instance.
func GetMethodServiceTpsLimiter() filter.TpsLimiter {
methodServiceTpsLimiterOnce.Do(func() {
- methodServiceTpsLimiterInstance = &MethodServiceTpsLimiterImpl{
+ methodServiceTpsLimiterInstance = &MethodServiceTpsLimiter{
tpsState: concurrent.NewMap(),
}
})
diff --git a/filter/filter_impl/tps/tps_limiter_method_service_test.go b/filter/tps/limiter/method_service_test.go
similarity index 95%
rename from filter/filter_impl/tps/tps_limiter_method_service_test.go
rename to filter/tps/limiter/method_service_test.go
index 9f4d2b4..f392494 100644
--- a/filter/filter_impl/tps/tps_limiter_method_service_test.go
+++ b/filter/tps/limiter/method_service_test.go
@@ -15,9 +15,10 @@
* limitations under the License.
*/
-package tps
+package limiter
import (
+ "dubbo.apache.org/dubbo-go/v3/filter"
"net/url"
"testing"
)
@@ -31,7 +32,7 @@ import (
"dubbo.apache.org/dubbo-go/v3/common"
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
- "dubbo.apache.org/dubbo-go/v3/filter"
+ "dubbo.apache.org/dubbo-go/v3/filter/tps/strategy"
"dubbo.apache.org/dubbo-go/v3/protocol/invocation"
)
@@ -47,7 +48,7 @@ func TestMethodServiceTpsLimiterImplIsAllowableOnlyServiceLevel(t *testing.T) {
common.WithParamsValue(constant.INTERFACE_KEY, methodName),
common.WithParamsValue(constant.TPS_LIMIT_RATE_KEY, "20"))
- mockStrategyImpl := NewMockTpsLimitStrategy(ctrl)
+ mockStrategyImpl := strategy.NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1)
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, &mockStrategyCreator{
@@ -96,7 +97,7 @@ func TestMethodServiceTpsLimiterImplIsAllowableMethodLevelOverride(t *testing.T)
common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_STRATEGY_KEY, "default"),
)
- mockStrategyImpl := NewMockTpsLimitStrategy(ctrl)
+ mockStrategyImpl := strategy.NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1)
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, &mockStrategyCreator{
@@ -126,7 +127,7 @@ func TestMethodServiceTpsLimiterImplIsAllowableBothMethodAndService(t *testing.T
common.WithParamsValue(methodConfigPrefix+constant.TPS_LIMIT_RATE_KEY, "40"),
)
- mockStrategyImpl := NewMockTpsLimitStrategy(ctrl)
+ mockStrategyImpl := strategy.NewMockTpsLimitStrategy(ctrl)
mockStrategyImpl.EXPECT().IsAllowable().Return(true).Times(1)
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, &mockStrategyCreator{
diff --git a/filter/filter_impl/tps/tps_limiter_mock.go b/filter/tps/limiter/mock.go
similarity index 99%
rename from filter/filter_impl/tps/tps_limiter_mock.go
rename to filter/tps/limiter/mock.go
index 96e530b..72d21d3 100644
--- a/filter/filter_impl/tps/tps_limiter_mock.go
+++ b/filter/tps/limiter/mock.go
@@ -19,7 +19,7 @@
// Source: tps_limiter.go
// Package filter is a generated GoMock package.
-package tps
+package limiter
import (
reflect "reflect"
diff --git a/filter/filter_impl/tps/tps_limit_fix_window_strategy.go b/filter/tps/strategy/fix_window.go
similarity index 85%
rename from filter/filter_impl/tps/tps_limit_fix_window_strategy.go
rename to filter/tps/strategy/fix_window.go
index edb3747..043e5bf 100644
--- a/filter/filter_impl/tps/tps_limit_fix_window_strategy.go
+++ b/filter/tps/strategy/fix_window.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package tps
+package strategy
import (
"sync/atomic"
@@ -29,7 +29,7 @@ import (
)
const (
- // FixedWindowKey defines tps limit algorithm
+ // FixedWindowKey defines limiter limit algorithm
FixedWindowKey = "fixedWindow"
)
@@ -39,11 +39,11 @@ func init() {
extension.SetTpsLimitStrategy(constant.DEFAULT_KEY, creator)
}
-// FixedWindowTpsLimitStrategyImpl implements the TPS limit strategy base on requests count during the interval
+// FixedWindowTpsLimitStrategy implements the TPS limit strategy base on requests count during the interval
/**
* It's the same as default implementation in Java
* It's not a thread-safe implementation.
- * It you want to use the thread-safe implementation, please use ThreadSafeFixedWindowTpsLimitStrategyImpl
+ * It you want to use the thread-safe implementation, please use ThreadSafeFixedWindowTpsLimitStrategy
* This is the default implementation.
*
* "UserProvider":
@@ -58,7 +58,7 @@ func init() {
* tps.interval: 3000
* tps.limit.strategy: "default" or "fixedWindow" # method-level
*/
-type FixedWindowTpsLimitStrategyImpl struct {
+type FixedWindowTpsLimitStrategy struct {
rate int32
interval int64
count int32
@@ -67,7 +67,7 @@ type FixedWindowTpsLimitStrategyImpl struct {
// IsAllowable determines if the requests over the TPS limit within the interval.
// It is not thread-safe.
-func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool {
+func (impl *FixedWindowTpsLimitStrategy) IsAllowable() bool {
current := time.Now().UnixNano()
if impl.timestamp+impl.interval < current {
// it's a new window
@@ -82,9 +82,9 @@ func (impl *FixedWindowTpsLimitStrategyImpl) IsAllowable() bool {
type fixedWindowStrategyCreator struct{}
-// Create returns a FixedWindowTpsLimitStrategyImpl instance with pre-configured limit rate and interval
+// Create returns a FixedWindowTpsLimitStrategy instance with pre-configured limit rate and interval
func (creator *fixedWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy {
- return &FixedWindowTpsLimitStrategyImpl{
+ return &FixedWindowTpsLimitStrategy{
rate: int32(rate),
interval: int64(interval) * int64(time.Millisecond), // convert to ns
count: 0,
diff --git a/filter/filter_impl/tps/tps_limit_fix_window_strategy_test.go b/filter/tps/strategy/fix_window_test.go
similarity index 98%
rename from filter/filter_impl/tps/tps_limit_fix_window_strategy_test.go
rename to filter/tps/strategy/fix_window_test.go
index 83a2f9b..2b3461b 100644
--- a/filter/filter_impl/tps/tps_limit_fix_window_strategy_test.go
+++ b/filter/tps/strategy/fix_window_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package tps
+package strategy
import (
"testing"
diff --git a/filter/filter_impl/tps/tps_limit_strategy_mock.go b/filter/tps/strategy/mock.go
similarity index 98%
rename from filter/filter_impl/tps/tps_limit_strategy_mock.go
rename to filter/tps/strategy/mock.go
index be76466..d9b45d4 100644
--- a/filter/filter_impl/tps/tps_limit_strategy_mock.go
+++ b/filter/tps/strategy/mock.go
@@ -16,10 +16,10 @@
*/
// Code generated by MockGen. DO NOT EDIT.
-// Source: tps_limit_strategy.go
+// Source: tps_strategy.go
// Package filter is a generated GoMock package.
-package tps
+package strategy
import (
gomock "github.com/golang/mock/gomock"
diff --git a/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go b/filter/tps/strategy/sliding_window.go
similarity index 86%
rename from filter/filter_impl/tps/tps_limit_sliding_window_strategy.go
rename to filter/tps/strategy/sliding_window.go
index 885122c..d0b229f 100644
--- a/filter/filter_impl/tps/tps_limit_sliding_window_strategy.go
+++ b/filter/tps/strategy/sliding_window.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package tps
+package strategy
import (
"container/list"
@@ -32,7 +32,7 @@ func init() {
extension.SetTpsLimitStrategy("slidingWindow", &slidingWindowStrategyCreator{})
}
-// SlidingWindowTpsLimitStrategyImpl implements a thread-safe TPS limit strategy base on requests count.
+// SlidingWindowTpsLimitStrategy implements a thread-safe TPS limit strategy base on requests count.
/**
* it's thread-safe.
* "UserProvider":
@@ -47,7 +47,7 @@ func init() {
* tps.interval: 3000
* tps.limit.strategy: "slidingWindow" # method-level
*/
-type SlidingWindowTpsLimitStrategyImpl struct {
+type SlidingWindowTpsLimitStrategy struct {
rate int
interval int64
mutex *sync.Mutex
@@ -56,7 +56,7 @@ type SlidingWindowTpsLimitStrategyImpl struct {
// IsAllowable determines whether the number of requests within the time window overs the threshold
// It is thread-safe.
-func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool {
+func (impl *SlidingWindowTpsLimitStrategy) IsAllowable() bool {
impl.mutex.Lock()
defer impl.mutex.Unlock()
// quick path
@@ -85,9 +85,9 @@ func (impl *SlidingWindowTpsLimitStrategyImpl) IsAllowable() bool {
type slidingWindowStrategyCreator struct{}
-// Create returns SlidingWindowTpsLimitStrategyImpl instance with configured limit rate and interval
+// Create returns SlidingWindowTpsLimitStrategy instance with configured limit rate and interval
func (creator *slidingWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy {
- return &SlidingWindowTpsLimitStrategyImpl{
+ return &SlidingWindowTpsLimitStrategy{
rate: rate,
interval: int64(interval) * int64(time.Millisecond),
mutex: &sync.Mutex{},
diff --git a/filter/filter_impl/tps/tps_limit_sliding_window_strategy_test.go b/filter/tps/strategy/sliding_window_test.go
similarity index 98%
rename from filter/filter_impl/tps/tps_limit_sliding_window_strategy_test.go
rename to filter/tps/strategy/sliding_window_test.go
index ca5d70a..c39ff2f 100644
--- a/filter/filter_impl/tps/tps_limit_sliding_window_strategy_test.go
+++ b/filter/tps/strategy/sliding_window_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package tps
+package strategy
import (
"testing"
diff --git a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go b/filter/tps/strategy/thread_safe_fix_window.go
similarity index 82%
rename from filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go
rename to filter/tps/strategy/thread_safe_fix_window.go
index ba6f50d..75a0aee 100644
--- a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy.go
+++ b/filter/tps/strategy/thread_safe_fix_window.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package tps
+package strategy
import (
"sync"
@@ -32,8 +32,8 @@ func init() {
})
}
-// ThreadSafeFixedWindowTpsLimitStrategyImpl is the thread-safe implementation.
-// It's also a thread-safe decorator of FixedWindowTpsLimitStrategyImpl
+// ThreadSafeFixedWindowTpsLimitStrategy is the thread-safe implementation.
+// It's also a thread-safe decorator of FixedWindowTpsLimitStrategy
/**
* "UserProvider":
* registry: "hangzhouzk"
@@ -47,13 +47,13 @@ func init() {
* tps.interval: 3000
* tps.limit.strategy: "threadSafeFixedWindow" # method-level
*/
-type ThreadSafeFixedWindowTpsLimitStrategyImpl struct {
+type ThreadSafeFixedWindowTpsLimitStrategy struct {
mutex *sync.Mutex
- fixedWindow *FixedWindowTpsLimitStrategyImpl
+ fixedWindow *FixedWindowTpsLimitStrategy
}
// IsAllowable implements thread-safe then run the FixedWindowTpsLimitStrategy
-func (impl *ThreadSafeFixedWindowTpsLimitStrategyImpl) IsAllowable() bool {
+func (impl *ThreadSafeFixedWindowTpsLimitStrategy) IsAllowable() bool {
impl.mutex.Lock()
defer impl.mutex.Unlock()
return impl.fixedWindow.IsAllowable()
@@ -63,10 +63,10 @@ type threadSafeFixedWindowStrategyCreator struct {
fixedWindowStrategyCreator *fixedWindowStrategyCreator
}
-// Create returns ThreadSafeFixedWindowTpsLimitStrategyImpl instance
+// Create returns ThreadSafeFixedWindowTpsLimitStrategy instance
func (creator *threadSafeFixedWindowStrategyCreator) Create(rate int, interval int) filter.TpsLimitStrategy {
- fixedWindowStrategy := creator.fixedWindowStrategyCreator.Create(rate, interval).(*FixedWindowTpsLimitStrategyImpl)
- return &ThreadSafeFixedWindowTpsLimitStrategyImpl{
+ fixedWindowStrategy := creator.fixedWindowStrategyCreator.Create(rate, interval).(*FixedWindowTpsLimitStrategy)
+ return &ThreadSafeFixedWindowTpsLimitStrategy{
fixedWindow: fixedWindowStrategy,
mutex: &sync.Mutex{},
}
diff --git a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy_test.go b/filter/tps/strategy/thread_safe_fix_window_test.go
similarity index 98%
rename from filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy_test.go
rename to filter/tps/strategy/thread_safe_fix_window_test.go
index 80d31f8..ac6cad2 100644
--- a/filter/filter_impl/tps/tps_limit_thread_safe_fix_window_strategy_test.go
+++ b/filter/tps/strategy/thread_safe_fix_window_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package tps
+package strategy
import (
"testing"
diff --git a/filter/tps_limit_strategy.go b/filter/tps_strategy.go
similarity index 95%
rename from filter/tps_limit_strategy.go
rename to filter/tps_strategy.go
index 2ee876a..b2ff78c 100644
--- a/filter/tps_limit_strategy.go
+++ b/filter/tps_strategy.go
@@ -43,6 +43,6 @@ type TpsLimitStrategyCreator interface {
// It will be a little hard to understand this method.
// The unit of interval is ms
// for example, if the limit = 100, interval = 1000
- // which means that the tps limitation is 100 times per 1000ms (100/1000ms)
+ // which means that the limiter limitation is 100 times per 1000ms (100/1000ms)
Create(limit int, interval int) TpsLimitStrategy
}
diff --git a/filter/filter_impl/tracing_filter.go b/filter/tracing/filter.go
similarity index 95%
rename from filter/filter_impl/tracing_filter.go
rename to filter/tracing/filter.go
index b1e10c5..0966966 100644
--- a/filter/filter_impl/tracing_filter.go
+++ b/filter/tracing/filter.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package tracing
import (
"context"
@@ -33,13 +33,9 @@ import (
"dubbo.apache.org/dubbo-go/v3/protocol"
)
-const (
- tracingFilterName = "tracing"
-)
-
// this should be executed before users set their own Tracer
func init() {
- extension.SetFilter(tracingFilterName, newTracingFilter)
+ extension.SetFilter(constant.TracingFilterKey, newTracingFilter)
opentracing.SetGlobalTracer(opentracing.NoopTracer{})
}
@@ -94,7 +90,7 @@ func (tf *tracingFilter) OnResponse(ctx context.Context, result protocol.Result,
return result
}
-var tracingFilterInstance *tracingFilter
+var tracingFilterInstance filter.Filter
func newTracingFilter() filter.Filter {
if tracingFilterInstance == nil {
diff --git a/filter/filter_impl/tracing_filter_test.go b/filter/tracing/filter_test.go
similarity index 99%
rename from filter/filter_impl/tracing_filter_test.go
rename to filter/tracing/filter_test.go
index cbd2637..9d28393 100644
--- a/filter/filter_impl/tracing_filter_test.go
+++ b/filter/tracing/filter_test.go
@@ -15,7 +15,7 @@
* limitations under the License.
*/
-package filter_impl
+package tracing
import (
"context"
diff --git a/protocol/protocolwrapper/protocol_filter_wrapper_test.go b/protocol/protocolwrapper/protocol_filter_wrapper_test.go
index c7ef62d..842531f 100644
--- a/protocol/protocolwrapper/protocol_filter_wrapper_test.go
+++ b/protocol/protocolwrapper/protocol_filter_wrapper_test.go
@@ -62,12 +62,12 @@ func TestProtocolFilterWrapperRefer(t *testing.T) {
// the same as echo filter, for test
func init() {
- extension.SetFilter("echo", GetFilter)
+ extension.SetFilter("echo", newFilter)
}
-type EchoFilterForTest struct{}
+type mockEchoFilter struct{}
-func (ef *EchoFilterForTest) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (ef *mockEchoFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
logger.Infof("invoking echo filter.")
logger.Debugf("%v,%v", invocation.MethodName(), len(invocation.Arguments()))
if invocation.MethodName() == constant.ECHO && len(invocation.Arguments()) == 1 {
@@ -79,10 +79,10 @@ func (ef *EchoFilterForTest) Invoke(ctx context.Context, invoker protocol.Invoke
return invoker.Invoke(ctx, invocation)
}
-func (ef *EchoFilterForTest) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+func (ef *mockEchoFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
return result
}
-func GetFilter() filter.Filter {
- return &EchoFilterForTest{}
+func newFilter() filter.Filter {
+ return &mockEchoFilter{}
}