You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/09/24 05:50:51 UTC
[dubbo-go] branch config-enhance updated: feat(*): add graceful
shutdown (#1470)
This is an automated email from the ASF dual-hosted git repository.
laurence pushed a commit to branch config-enhance
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/config-enhance by this push:
new 0697950 feat(*): add graceful shutdown (#1470)
0697950 is described below
commit 0697950e18505e3989d738fb2750d3dc67e60587
Author: Mulavar <97...@qq.com>
AuthorDate: Fri Sep 24 13:50:46 2021 +0800
feat(*): add graceful shutdown (#1470)
* feat(*): add graceful shutdown
* feat(*): merge destroyProviderProtocols and destroyConsumerProtocols
Co-authored-by: dongjianhui03 <do...@meituan.com>
---
common/constant/key.go | 39 ++++----
config/config_setter.go | 4 -
config/graceful_shutdown.go | 234 +++++++++++++++++++-------------------------
filter/gshutdown/filter.go | 4 +-
4 files changed, 124 insertions(+), 157 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index 80d2fe7..bf8b3e1 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -65,25 +65,26 @@ const (
// Filter Keys
const (
- AccessLogFilterKey = "accesslog"
- ActiveFilterKey = "active"
- AuthConsumerFilterKey = "sign"
- AuthProviderFilterKey = "auth"
- EchoFilterKey = "echo"
- ExecuteLimitFilterKey = "execute"
- GenericFilterKey = "generic"
- GenericServiceFilterKey = "generic_service"
- GracefulShutdownProviderFilterKey = "pshutdown"
- GracefulShutdownConsumerFilterKey = "cshutdown"
- HystrixConsumerFilterKey = "hystrix_consumer"
- HystrixProviderFilterKey = "hystrix_provider"
- MetricsFilterKey = "metrics"
- SeataFilterKey = "seata"
- SentinelProviderFilterKey = "sentinel-provider"
- SentinelConsumerFilterKey = "sentinel-consumer"
- TokenFilterKey = "token"
- TpsLimitFilterKey = "tps"
- TracingFilterKey = "tracing"
+ AccessLogFilterKey = "accesslog"
+ ActiveFilterKey = "active"
+ AuthConsumerFilterKey = "sign"
+ AuthProviderFilterKey = "auth"
+ EchoFilterKey = "echo"
+ ExecuteLimitFilterKey = "execute"
+ GenericFilterKey = "generic"
+ GenericServiceFilterKey = "generic_service"
+ GracefulShutdownProviderFilterKey = "pshutdown"
+ GracefulShutdownConsumerFilterKey = "cshutdown"
+ GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
+ HystrixConsumerFilterKey = "hystrix_consumer"
+ HystrixProviderFilterKey = "hystrix_provider"
+ MetricsFilterKey = "metrics"
+ SeataFilterKey = "seata"
+ SentinelProviderFilterKey = "sentinel-provider"
+ SentinelConsumerFilterKey = "sentinel-consumer"
+ TokenFilterKey = "token"
+ TpsLimitFilterKey = "tps"
+ TracingFilterKey = "tracing"
)
const (
diff --git a/config/config_setter.go b/config/config_setter.go
index 9f114f2..7f409e3 100644
--- a/config/config_setter.go
+++ b/config/config_setter.go
@@ -17,10 +17,6 @@
package config
-const (
- GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
-)
-
type Setter interface {
Set(name string, config interface{})
}
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index d37ad82..6c0b755 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -18,6 +18,9 @@
package config
import (
+ "os"
+ "os/signal"
+ "runtime/debug"
"time"
)
@@ -51,61 +54,49 @@ import (
const defaultShutDownTime = time.Second * 60
// nolint
-//func GracefulShutdownInit() {
-// signals := make(chan os.Signal, 1)
-//
-// signal.Notify(signals, ShutdownSignals...)
-//
-// // retrieve ShutdownConfig for gracefulShutdownFilter
-// if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(config.Setter); ok && config.GetConsumerConfig().ShutdownConfig != nil {
-// filter.Set(config.GracefulShutdownFilterShutdownConfig, config.GetConsumerConfig().ShutdownConfig)
-// }
-// if filter, ok := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(config.Setter); ok && config.GetProviderConfig().ShutdownConfig != nil {
-// filter.Set(config.GracefulShutdownFilterShutdownConfig, config.GetProviderConfig().ShutdownConfig)
-// }
-//
-// go func() {
-// select {
-// case sig := <-signals:
-// logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
-// // gracefulShutdownOnce.Do(func() {
-// time.AfterFunc(totalTimeout(), func() {
-// logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
-// os.Exit(0)
-// })
-// BeforeShutdown()
-// // those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
-// for _, dumpSignal := range DumpHeapShutdownSignals {
-// if sig == dumpSignal {
-// debug.WriteHeapDump(os.Stdout.Fd())
-// }
-// }
-// os.Exit(0)
-// }
-// }()
-//}
+func GracefulShutdownInit() {
+ signals := make(chan os.Signal, 1)
+
+ signal.Notify(signals, ShutdownSignals...)
+
+ // retrieve ShutdownConfig for gracefulShutdownFilter
+ if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && rootConfig.Shutdown != nil {
+ filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown)
+ }
+
+ go func() {
+ select {
+ case sig := <-signals:
+ logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
+ // gracefulShutdownOnce.Do(func() {
+ time.AfterFunc(totalTimeout(), func() {
+ logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
+ os.Exit(0)
+ })
+ BeforeShutdown()
+ // those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
+ for _, dumpSignal := range DumpHeapShutdownSignals {
+ if sig == dumpSignal {
+ debug.WriteHeapDump(os.Stdout.Fd())
+ }
+ }
+ os.Exit(0)
+ }
+ }()
+}
// BeforeShutdown provides processing flow before shutdown
func BeforeShutdown() {
destroyAllRegistries()
// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
// The value of configuration depends on how long the clients will get notification.
- //waitAndAcceptNewRequests()
-
- // reject the new request, but keeping waiting for accepting requests
- //waitForReceivingRequests()
-
- // we fetch the protocols from Consumer.References. Consumer.ProtocolConfig doesn't contains all protocol, like jsonrpc
- //consumerProtocols := getConsumerProtocols()
-
- // If this applicationConfig is not the provider, it will do nothing
- //destroyProviderProtocols(consumerProtocols)
+ waitAndAcceptNewRequests()
- // reject sending the new request, and waiting for response of sending requests
- //waitForSendingRequests()
+ // reject sending/receiving the new request, but keeping waiting for accepting requests
+ waitForSendingAndReceivingRequests()
- // If this applicationConfig is not the consumer, it will do nothing
- //destroyConsumerProtocols(consumerProtocols)
+ // destroy all protocols
+ destroyProtocols()
logger.Info("Graceful shutdown --- Execute the custom callbacks.")
customCallbacks := extension.GetAllCustomShutdownCallbacks()
@@ -120,68 +111,56 @@ func destroyAllRegistries() {
registryProtocol.Destroy()
}
-func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) {
- logger.Info("Graceful shutdown --- Destroy consumer's protocols. ")
+// destroyProtocols destroys protocols.
+// First we destroy provider's protocols, and then we destroy the consumer protocols.
+func destroyProtocols() {
+ logger.Info("Graceful shutdown --- Destroy protocols. ")
+ logger.Info("Graceful shutdown --- First destroy provider's protocols. ")
+
+ consumerProtocols := getConsumerProtocols()
+ if rootConfig.Protocols == nil {
+ return
+ }
+
+ for _, protocol := range rootConfig.Protocols {
+ // the protocol is the consumer's protocol too, we can not destroy it.
+ if consumerProtocols.Contains(protocol.Name) {
+ continue
+ }
+ extension.GetProtocol(protocol.Name).Destroy()
+ }
+
+ logger.Info("Graceful shutdown --- Second destroy consumer's protocols. ")
for name := range consumerProtocols.Items {
extension.GetProtocol(name.(string)).Destroy()
}
}
-// destroyProviderProtocols destroys the provider's protocol.
-// if the protocol is consumer's protocol too, we will keep it
-//func destroyProviderProtocols(consumerProtocols *gxset.HashSet) {
-// logger.Info("Graceful shutdown --- Destroy provider's protocols. ")
-//
-// if config.providerConfig == nil || config.providerConfig.Protocols == nil {
-// return
-// }
-//
-// for _, protocol := range config.providerConfig.Protocols {
-//
-// // the protocol is the consumer's protocol too, we can not destroy it.
-// if consumerProtocols.Contains(protocol.Name) {
-// continue
-// }
-// extension.GetProtocol(protocol.Name).Destroy()
-// }
-//}
-
-//func waitAndAcceptNewRequests() {
-// logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
-// if config.providerConfig == nil || config.providerConfig.ShutdownConfig == nil {
-// return
-// }
-//
-// timeout := config.providerConfig.ShutdownConfig.GetStepTimeout()
-//
-// // ignore this step
-// if timeout < 0 {
-// return
-// }
-// time.Sleep(timeout)
-//}
-
-// for provider. It will wait for processing receiving requests
-//func waitForReceivingRequests() {
-// logger.Info("Graceful shutdown --- Keep waiting until accepting requests finish or timeout. ")
-// if config.providerConfig == nil || config.providerConfig.ShutdownConfig == nil {
-// // ignore this step
-// return
-// }
-// config.providerConfig.ShutdownConfig.RejectRequest = true
-// waitingProcessedTimeout(config.providerConfig.ShutdownConfig)
-//}
-
-// for consumer. It will wait for the response of sending requests
-//func waitForSendingRequests() {
-// logger.Info("Graceful shutdown --- Keep waiting until sending requests getting response or timeout ")
-// if config.consumerConfig == nil || config.consumerConfig.ShutdownConfig == nil {
-// // ignore this step
-// return
-// }
-// config.consumerConfig.ShutdownConfig.RejectRequest = true
-// waitingProcessedTimeout(config.consumerConfig.ShutdownConfig)
-//}
+func waitAndAcceptNewRequests() {
+ logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
+ if rootConfig.Shutdown == nil {
+ return
+ }
+
+ timeout := rootConfig.Shutdown.GetStepTimeout()
+
+ // ignore this step
+ if timeout < 0 {
+ return
+ }
+ time.Sleep(timeout)
+}
+
+//for provider. It will wait for processing receiving requests
+func waitForSendingAndReceivingRequests() {
+ logger.Info("Graceful shutdown --- Keep waiting until sending/accepting requests finish or timeout. ")
+ if rootConfig == nil || rootConfig.Shutdown == nil {
+ // ignore this step
+ return
+ }
+ rootConfig.Shutdown.RejectRequest = true
+ waitingProcessedTimeout(rootConfig.Shutdown)
+}
func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
timeout := shutdownConfig.GetStepTimeout()
@@ -196,33 +175,24 @@ func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
}
}
-//func totalTimeout() time.Duration {
-// providerShutdown := defaultShutDownTime
-// if config.providerConfig != nil && config.providerConfig.ShutdownConfig != nil {
-// providerShutdown = config.providerConfig.ShutdownConfig.GetTimeout()
-// }
-//
-// var consumerShutdown time.Duration
-// if config.consumerConfig != nil && config.consumerConfig.ShutdownConfig != nil {
-// consumerShutdown = config.consumerConfig.ShutdownConfig.GetTimeout()
-// }
-//
-// timeout := providerShutdown
-// if consumerShutdown > providerShutdown {
-// timeout = consumerShutdown
-// }
-// return timeout
-//}
+func totalTimeout() time.Duration {
+ timeout := defaultShutDownTime
+ if rootConfig.Shutdown != nil && rootConfig.Shutdown.GetTimeout() > timeout {
+ timeout = rootConfig.Shutdown.GetTimeout()
+ }
+
+ return timeout
+}
// we can not get the protocols from consumerConfig because some protocol don't have configuration, like jsonrpc.
-//func getConsumerProtocols() *gxset.HashSet {
-// result := gxset.NewSet()
-// if config.consumerConfig == nil || config.consumerConfig.References == nil {
-// return result
-// }
-//
-// for _, reference := range config.consumerConfig.References {
-// result.Add(reference.Protocol)
-// }
-// return result
-//}
+func getConsumerProtocols() *gxset.HashSet {
+ result := gxset.NewSet()
+ if rootConfig.Consumer == nil || rootConfig.Consumer.References == nil {
+ return result
+ }
+
+ for _, reference := range rootConfig.Consumer.References {
+ result.Add(reference.Protocol)
+ }
+ return result
+}
diff --git a/filter/gshutdown/filter.go b/filter/gshutdown/filter.go
index 1bbc197..7b4a905 100644
--- a/filter/gshutdown/filter.go
+++ b/filter/gshutdown/filter.go
@@ -68,12 +68,12 @@ func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker
func (f *Filter) Set(name string, conf interface{}) {
switch name {
- case config.GracefulShutdownFilterShutdownConfig:
+ case constant.GracefulShutdownFilterShutdownConfig:
if shutdownConfig, ok := conf.(*config.ShutdownConfig); !ok {
f.shutdownConfig = shutdownConfig
return
}
- logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", config.GracefulShutdownFilterShutdownConfig)
+ logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
default:
// do nothing
}