You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/24 03:19:04 UTC
[dubbo-go] branch 3.0 updated: Fix: Graceful shutdown
bugs(supplement #1254) (#1257)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 0f0e1f6 Fix: Graceful shutdown bugs(supplement #1254) (#1257)
0f0e1f6 is described below
commit 0f0e1f66b67ac91fba963db5d0bddfd1f82f1188
Author: XavierNiu <a...@nxw.name>
AuthorDate: Thu Jun 24 11:18:54 2021 +0800
Fix: Graceful shutdown bugs(supplement #1254) (#1257)
* supplementary fix #1254
remove unused comments
fix import cycle
append apache license header
fix gracefulShutdownFilter unittest bug
go fmt
fix gracefulShutdownConfig unittest bug
fix gracefulShutdownConfig unittest bug
go fmt
* improve formatting based on code style
* go fmt
* set RequestsFinished explicitly
* use mutex to protect variables of ShutdownConfig
* ftr: add config (#1258)
* recover gracefulShutdownFilter logic
* remove unused mutex
Co-authored-by: Laurence <45...@users.noreply.github.com>
---
config/config_loader_test.go | 24 +++++++++++++++++++++
config/config_setter.go | 26 ++++++++++++++++++++++
config/graceful_shutdown.go | 8 +++++++
config/graceful_shutdown_test.go | 9 ++++++++
filter/filter_impl/graceful_shutdown_filter.go | 30 +++++++++++++++-----------
filter/filter_impl/tps_limit_filter_test.go | 5 ++---
go.sum | 1 -
7 files changed, 86 insertions(+), 17 deletions(-)
diff --git a/config/config_loader_test.go b/config/config_loader_test.go
index b0935a2..59593ef 100644
--- a/config/config_loader_test.go
+++ b/config/config_loader_test.go
@@ -18,6 +18,7 @@
package config
import (
+ "context"
"path/filepath"
"sort"
"sync"
@@ -41,7 +42,9 @@ import (
"dubbo.apache.org/dubbo-go/v3/common/logger"
"dubbo.apache.org/dubbo-go/v3/common/proxy/proxy_factory"
"dubbo.apache.org/dubbo-go/v3/config_center"
+ "dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/metadata/service"
+ "dubbo.apache.org/dubbo-go/v3/protocol"
"dubbo.apache.org/dubbo-go/v3/registry"
)
@@ -74,6 +77,13 @@ func TestConfigLoader(t *testing.T) {
}
func TestLoad(t *testing.T) {
+ extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
+ return &mockGracefulShutdownFilter{}
+ })
+ extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
+ return &mockGracefulShutdownFilter{}
+ })
+
doInitConsumer()
doInitProvider()
@@ -596,3 +606,17 @@ func ConvertURLArrToIntfArr(urls []*common.URL) []interface{} {
}
return res
}
+
+type mockGracefulShutdownFilter struct{}
+
+func (f *mockGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+ panic("implement me")
+}
+
+func (f *mockGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
+ panic("implement me")
+}
+
+func (f *mockGracefulShutdownFilter) Set(name string, config interface{}) {
+ return
+}
diff --git a/config/config_setter.go b/config/config_setter.go
new file mode 100644
index 0000000..9f114f2
--- /dev/null
+++ b/config/config_setter.go
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package config
+
+const (
+ GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
+)
+
+type Setter interface {
+ Set(name string, config interface{})
+}
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index fd34e20..51443c9 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -59,6 +59,14 @@ func GracefulShutdownInit() {
signal.Notify(signals, ShutdownSignals...)
+ // retrieve ShutdownConfig for gracefulShutdownFilter
+ if filter, ok := extension.GetFilter(constant.CONSUMER_SHUTDOWN_FILTER).(Setter); ok && GetConsumerConfig().ShutdownConfig != nil {
+ filter.Set(GracefulShutdownFilterShutdownConfig, GetConsumerConfig().ShutdownConfig)
+ }
+ if filter, ok := extension.GetFilter(constant.PROVIDER_SHUTDOWN_FILTER).(Setter); ok && GetProviderConfig().ShutdownConfig != nil {
+ filter.Set(GracefulShutdownFilterShutdownConfig, GetProviderConfig().ShutdownConfig)
+ }
+
go func() {
select {
case sig := <-signals:
diff --git a/config/graceful_shutdown_test.go b/config/graceful_shutdown_test.go
index 870302d..920a268 100644
--- a/config/graceful_shutdown_test.go
+++ b/config/graceful_shutdown_test.go
@@ -24,10 +24,17 @@ import (
import (
"dubbo.apache.org/dubbo-go/v3/common/constant"
"dubbo.apache.org/dubbo-go/v3/common/extension"
+ "dubbo.apache.org/dubbo-go/v3/filter"
"dubbo.apache.org/dubbo-go/v3/protocol"
)
func TestGracefulShutdownInit(t *testing.T) {
+ extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
+ return &mockGracefulShutdownFilter{}
+ })
+ extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
+ return &mockGracefulShutdownFilter{}
+ })
GracefulShutdownInit()
}
@@ -49,6 +56,8 @@ func TestBeforeShutdown(t *testing.T) {
}
// without configuration
+ consumerConfig = nil
+ providerConfig = nil
BeforeShutdown()
consumerConfig = &ConsumerConfig{
diff --git a/filter/filter_impl/graceful_shutdown_filter.go b/filter/filter_impl/graceful_shutdown_filter.go
index 028ffd2..9da238b 100644
--- a/filter/filter_impl/graceful_shutdown_filter.go
+++ b/filter/filter_impl/graceful_shutdown_filter.go
@@ -32,19 +32,13 @@ import (
)
func init() {
- consumerFiler := &gracefulShutdownFilter{
- shutdownConfig: config.GetConsumerConfig().ShutdownConfig,
- }
- providerFilter := &gracefulShutdownFilter{
- shutdownConfig: config.GetProviderConfig().ShutdownConfig,
- }
-
+ // `init()` is performed before config.Load(), so shutdownConfig will be retrieved after config was loaded.
extension.SetFilter(constant.CONSUMER_SHUTDOWN_FILTER, func() filter.Filter {
- return consumerFiler
+ return &gracefulShutdownFilter{}
})
extension.SetFilter(constant.PROVIDER_SHUTDOWN_FILTER, func() filter.Filter {
- return providerFilter
+ return &gracefulShutdownFilter{}
})
}
@@ -60,9 +54,6 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I
return gf.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation)
}
atomic.AddInt32(&gf.activeCount, 1)
- if gf.shutdownConfig != nil && gf.activeCount > 0 {
- gf.shutdownConfig.RequestsFinished = false
- }
return invoker.Invoke(ctx, invocation)
}
@@ -70,12 +61,25 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I
func (gf *gracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
atomic.AddInt32(&gf.activeCount, -1)
// although this isn't thread safe, it won't be a problem if the gf.rejectNewRequest() is true.
- if gf.shutdownConfig != nil && gf.activeCount <= 0 {
+ if gf.shutdownConfig != nil && gf.shutdownConfig.RejectRequest && gf.activeCount <= 0 {
gf.shutdownConfig.RequestsFinished = true
}
return result
}
+func (gf *gracefulShutdownFilter) Set(name string, conf interface{}) {
+ switch name {
+ case config.GracefulShutdownFilterShutdownConfig:
+ if shutdownConfig, ok := conf.(*config.ShutdownConfig); !ok {
+ gf.shutdownConfig = shutdownConfig
+ return
+ }
+ logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", config.GracefulShutdownFilterShutdownConfig)
+ default:
+ // do nothing
+ }
+}
+
func (gf *gracefulShutdownFilter) rejectNewRequest() bool {
if gf.shutdownConfig == nil {
return false
diff --git a/filter/filter_impl/tps_limit_filter_test.go b/filter/filter_impl/tps_limit_filter_test.go
index b70611d..e9f8660 100644
--- a/filter/filter_impl/tps_limit_filter_test.go
+++ b/filter/filter_impl/tps_limit_filter_test.go
@@ -101,9 +101,8 @@ func TestGenericFilterInvokeWithDefaultTpsLimiterNotAllow(t *testing.T) {
attch := make(map[string]interface{})
result := tpsFilter.Invoke(context.Background(),
- protocol.NewBaseInvoker(
-
- invokeUrl), invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
+ protocol.NewBaseInvoker(invokeUrl),
+ invocation.NewRPCInvocation("MethodName", []interface{}{"OK"}, attch))
assert.Nil(t, result.Error())
assert.Nil(t, result.Result())
}
diff --git a/go.sum b/go.sum
index 3a278e0..3b01184 100644
--- a/go.sum
+++ b/go.sum
@@ -161,7 +161,6 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
github.com/coreos/go-systemd/v22 v22.1.0 h1:kq/SbG2BCKLkDKkjQf5OWwKWUKj1lgs3lFI4PxnR5lg=
github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk=
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
-github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwcJI5acqYI6dE=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=