You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by la...@apache.org on 2021/09/24 05:50:51 UTC

[dubbo-go] branch config-enhance updated: feat(*): add graceful shutdown (#1470)

This is an automated email from the ASF dual-hosted git repository.

laurence pushed a commit to branch config-enhance
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/config-enhance by this push:
     new 0697950  feat(*): add graceful shutdown (#1470)
0697950 is described below

commit 0697950e18505e3989d738fb2750d3dc67e60587
Author: Mulavar <97...@qq.com>
AuthorDate: Fri Sep 24 13:50:46 2021 +0800

    feat(*): add graceful shutdown (#1470)
    
    * feat(*): add graceful shutdown
    
    * feat(*): merge destroyProviderProtocols and destroyConsumerProtocols
    
    Co-authored-by: dongjianhui03 <do...@meituan.com>
---
 common/constant/key.go      |  39 ++++----
 config/config_setter.go     |   4 -
 config/graceful_shutdown.go | 234 +++++++++++++++++++-------------------------
 filter/gshutdown/filter.go  |   4 +-
 4 files changed, 124 insertions(+), 157 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index 80d2fe7..bf8b3e1 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -65,25 +65,26 @@ const (
 
 // Filter Keys
 const (
-	AccessLogFilterKey                = "accesslog"
-	ActiveFilterKey                   = "active"
-	AuthConsumerFilterKey             = "sign"
-	AuthProviderFilterKey             = "auth"
-	EchoFilterKey                     = "echo"
-	ExecuteLimitFilterKey             = "execute"
-	GenericFilterKey                  = "generic"
-	GenericServiceFilterKey           = "generic_service"
-	GracefulShutdownProviderFilterKey = "pshutdown"
-	GracefulShutdownConsumerFilterKey = "cshutdown"
-	HystrixConsumerFilterKey          = "hystrix_consumer"
-	HystrixProviderFilterKey          = "hystrix_provider"
-	MetricsFilterKey                  = "metrics"
-	SeataFilterKey                    = "seata"
-	SentinelProviderFilterKey         = "sentinel-provider"
-	SentinelConsumerFilterKey         = "sentinel-consumer"
-	TokenFilterKey                    = "token"
-	TpsLimitFilterKey                 = "tps"
-	TracingFilterKey                  = "tracing"
+	AccessLogFilterKey                   = "accesslog"
+	ActiveFilterKey                      = "active"
+	AuthConsumerFilterKey                = "sign"
+	AuthProviderFilterKey                = "auth"
+	EchoFilterKey                        = "echo"
+	ExecuteLimitFilterKey                = "execute"
+	GenericFilterKey                     = "generic"
+	GenericServiceFilterKey              = "generic_service"
+	GracefulShutdownProviderFilterKey    = "pshutdown"
+	GracefulShutdownConsumerFilterKey    = "cshutdown"
+	GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
+	HystrixConsumerFilterKey             = "hystrix_consumer"
+	HystrixProviderFilterKey             = "hystrix_provider"
+	MetricsFilterKey                     = "metrics"
+	SeataFilterKey                       = "seata"
+	SentinelProviderFilterKey            = "sentinel-provider"
+	SentinelConsumerFilterKey            = "sentinel-consumer"
+	TokenFilterKey                       = "token"
+	TpsLimitFilterKey                    = "tps"
+	TracingFilterKey                     = "tracing"
 )
 
 const (
diff --git a/config/config_setter.go b/config/config_setter.go
index 9f114f2..7f409e3 100644
--- a/config/config_setter.go
+++ b/config/config_setter.go
@@ -17,10 +17,6 @@
 
 package config
 
-const (
-	GracefulShutdownFilterShutdownConfig = "GracefulShutdownFilterShutdownConfig"
-)
-
 type Setter interface {
 	Set(name string, config interface{})
 }
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index d37ad82..6c0b755 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -18,6 +18,9 @@
 package config
 
 import (
+	"os"
+	"os/signal"
+	"runtime/debug"
 	"time"
 )
 
@@ -51,61 +54,49 @@ import (
 const defaultShutDownTime = time.Second * 60
 
 // nolint
-//func GracefulShutdownInit() {
-//	signals := make(chan os.Signal, 1)
-//
-//	signal.Notify(signals, ShutdownSignals...)
-//
-//	// retrieve ShutdownConfig for gracefulShutdownFilter
-//	if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(config.Setter); ok && config.GetConsumerConfig().ShutdownConfig != nil {
-//		filter.Set(config.GracefulShutdownFilterShutdownConfig, config.GetConsumerConfig().ShutdownConfig)
-//	}
-//	if filter, ok := extension.GetFilter(constant.GracefulShutdownProviderFilterKey).(config.Setter); ok && config.GetProviderConfig().ShutdownConfig != nil {
-//		filter.Set(config.GracefulShutdownFilterShutdownConfig, config.GetProviderConfig().ShutdownConfig)
-//	}
-//
-//	go func() {
-//		select {
-//		case sig := <-signals:
-//			logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
-//			// gracefulShutdownOnce.Do(func() {
-//			time.AfterFunc(totalTimeout(), func() {
-//				logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
-//				os.Exit(0)
-//			})
-//			BeforeShutdown()
-//			// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
-//			for _, dumpSignal := range DumpHeapShutdownSignals {
-//				if sig == dumpSignal {
-//					debug.WriteHeapDump(os.Stdout.Fd())
-//				}
-//			}
-//			os.Exit(0)
-//		}
-//	}()
-//}
+func GracefulShutdownInit() {
+	signals := make(chan os.Signal, 1)
+
+	signal.Notify(signals, ShutdownSignals...)
+
+	// retrieve ShutdownConfig for gracefulShutdownFilter
+	if filter, ok := extension.GetFilter(constant.GracefulShutdownConsumerFilterKey).(Setter); ok && rootConfig.Shutdown != nil {
+		filter.Set(constant.GracefulShutdownFilterShutdownConfig, rootConfig.Shutdown)
+	}
+
+	go func() {
+		select {
+		case sig := <-signals:
+			logger.Infof("get signal %s, applicationConfig will shutdown.", sig)
+			// gracefulShutdownOnce.Do(func() {
+			time.AfterFunc(totalTimeout(), func() {
+				logger.Warn("Shutdown gracefully timeout, applicationConfig will shutdown immediately. ")
+				os.Exit(0)
+			})
+			BeforeShutdown()
+			// those signals' original behavior is exit with dump ths stack, so we try to keep the behavior
+			for _, dumpSignal := range DumpHeapShutdownSignals {
+				if sig == dumpSignal {
+					debug.WriteHeapDump(os.Stdout.Fd())
+				}
+			}
+			os.Exit(0)
+		}
+	}()
+}
 
 // BeforeShutdown provides processing flow before shutdown
 func BeforeShutdown() {
 	destroyAllRegistries()
 	// waiting for a short time so that the clients have enough time to get the notification that server shutdowns
 	// The value of configuration depends on how long the clients will get notification.
-	//waitAndAcceptNewRequests()
-
-	// reject the new request, but keeping waiting for accepting requests
-	//waitForReceivingRequests()
-
-	// we fetch the protocols from Consumer.References. Consumer.ProtocolConfig doesn't contains all protocol, like jsonrpc
-	//consumerProtocols := getConsumerProtocols()
-
-	// If this applicationConfig is not the provider, it will do nothing
-	//destroyProviderProtocols(consumerProtocols)
+	waitAndAcceptNewRequests()
 
-	// reject sending the new request, and waiting for response of sending requests
-	//waitForSendingRequests()
+	// reject sending/receiving the new request, but keeping waiting for accepting requests
+	waitForSendingAndReceivingRequests()
 
-	// If this applicationConfig is not the consumer, it will do nothing
-	//destroyConsumerProtocols(consumerProtocols)
+	// destroy all protocols
+	destroyProtocols()
 
 	logger.Info("Graceful shutdown --- Execute the custom callbacks.")
 	customCallbacks := extension.GetAllCustomShutdownCallbacks()
@@ -120,68 +111,56 @@ func destroyAllRegistries() {
 	registryProtocol.Destroy()
 }
 
-func destroyConsumerProtocols(consumerProtocols *gxset.HashSet) {
-	logger.Info("Graceful shutdown --- Destroy consumer's protocols. ")
+// destroyProtocols destroys protocols.
+// First we destroy provider's protocols, and then we destroy the consumer protocols.
+func destroyProtocols() {
+	logger.Info("Graceful shutdown --- Destroy protocols. ")
+	logger.Info("Graceful shutdown --- First destroy provider's protocols. ")
+
+	consumerProtocols := getConsumerProtocols()
+	if rootConfig.Protocols == nil {
+		return
+	}
+
+	for _, protocol := range rootConfig.Protocols {
+		// the protocol is the consumer's protocol too, we can not destroy it.
+		if consumerProtocols.Contains(protocol.Name) {
+			continue
+		}
+		extension.GetProtocol(protocol.Name).Destroy()
+	}
+
+	logger.Info("Graceful shutdown --- Second destroy consumer's protocols. ")
 	for name := range consumerProtocols.Items {
 		extension.GetProtocol(name.(string)).Destroy()
 	}
 }
 
-// destroyProviderProtocols destroys the provider's protocol.
-// if the protocol is consumer's protocol too, we will keep it
-//func destroyProviderProtocols(consumerProtocols *gxset.HashSet) {
-//	logger.Info("Graceful shutdown --- Destroy provider's protocols. ")
-//
-//	if config.providerConfig == nil || config.providerConfig.Protocols == nil {
-//		return
-//	}
-//
-//	for _, protocol := range config.providerConfig.Protocols {
-//
-//		// the protocol is the consumer's protocol too, we can not destroy it.
-//		if consumerProtocols.Contains(protocol.Name) {
-//			continue
-//		}
-//		extension.GetProtocol(protocol.Name).Destroy()
-//	}
-//}
-
-//func waitAndAcceptNewRequests() {
-//	logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
-//	if config.providerConfig == nil || config.providerConfig.ShutdownConfig == nil {
-//		return
-//	}
-//
-//	timeout := config.providerConfig.ShutdownConfig.GetStepTimeout()
-//
-//	// ignore this step
-//	if timeout < 0 {
-//		return
-//	}
-//	time.Sleep(timeout)
-//}
-
-// for provider. It will wait for processing receiving requests
-//func waitForReceivingRequests() {
-//	logger.Info("Graceful shutdown --- Keep waiting until accepting requests finish or timeout. ")
-//	if config.providerConfig == nil || config.providerConfig.ShutdownConfig == nil {
-//		// ignore this step
-//		return
-//	}
-//	config.providerConfig.ShutdownConfig.RejectRequest = true
-//	waitingProcessedTimeout(config.providerConfig.ShutdownConfig)
-//}
-
-// for consumer. It will wait for the response of sending requests
-//func waitForSendingRequests() {
-//	logger.Info("Graceful shutdown --- Keep waiting until sending requests getting response or timeout ")
-//	if config.consumerConfig == nil || config.consumerConfig.ShutdownConfig == nil {
-//		// ignore this step
-//		return
-//	}
-//	config.consumerConfig.ShutdownConfig.RejectRequest = true
-//	waitingProcessedTimeout(config.consumerConfig.ShutdownConfig)
-//}
+func waitAndAcceptNewRequests() {
+	logger.Info("Graceful shutdown --- Keep waiting and accept new requests for a short time. ")
+	if rootConfig.Shutdown == nil {
+		return
+	}
+
+	timeout := rootConfig.Shutdown.GetStepTimeout()
+
+	// ignore this step
+	if timeout < 0 {
+		return
+	}
+	time.Sleep(timeout)
+}
+
+//for provider. It will wait for processing receiving requests
+func waitForSendingAndReceivingRequests() {
+	logger.Info("Graceful shutdown --- Keep waiting until sending/accepting requests finish or timeout. ")
+	if rootConfig == nil || rootConfig.Shutdown == nil {
+		// ignore this step
+		return
+	}
+	rootConfig.Shutdown.RejectRequest = true
+	waitingProcessedTimeout(rootConfig.Shutdown)
+}
 
 func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
 	timeout := shutdownConfig.GetStepTimeout()
@@ -196,33 +175,24 @@ func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
 	}
 }
 
-//func totalTimeout() time.Duration {
-//	providerShutdown := defaultShutDownTime
-//	if config.providerConfig != nil && config.providerConfig.ShutdownConfig != nil {
-//		providerShutdown = config.providerConfig.ShutdownConfig.GetTimeout()
-//	}
-//
-//	var consumerShutdown time.Duration
-//	if config.consumerConfig != nil && config.consumerConfig.ShutdownConfig != nil {
-//		consumerShutdown = config.consumerConfig.ShutdownConfig.GetTimeout()
-//	}
-//
-//	timeout := providerShutdown
-//	if consumerShutdown > providerShutdown {
-//		timeout = consumerShutdown
-//	}
-//	return timeout
-//}
+func totalTimeout() time.Duration {
+	timeout := defaultShutDownTime
+	if rootConfig.Shutdown != nil && rootConfig.Shutdown.GetTimeout() > timeout {
+		timeout = rootConfig.Shutdown.GetTimeout()
+	}
+
+	return timeout
+}
 
 // we can not get the protocols from consumerConfig because some protocol don't have configuration, like jsonrpc.
-//func getConsumerProtocols() *gxset.HashSet {
-//	result := gxset.NewSet()
-//	if config.consumerConfig == nil || config.consumerConfig.References == nil {
-//		return result
-//	}
-//
-//	for _, reference := range config.consumerConfig.References {
-//		result.Add(reference.Protocol)
-//	}
-//	return result
-//}
+func getConsumerProtocols() *gxset.HashSet {
+	result := gxset.NewSet()
+	if rootConfig.Consumer == nil || rootConfig.Consumer.References == nil {
+		return result
+	}
+
+	for _, reference := range rootConfig.Consumer.References {
+		result.Add(reference.Protocol)
+	}
+	return result
+}
diff --git a/filter/gshutdown/filter.go b/filter/gshutdown/filter.go
index 1bbc197..7b4a905 100644
--- a/filter/gshutdown/filter.go
+++ b/filter/gshutdown/filter.go
@@ -68,12 +68,12 @@ func (f *Filter) OnResponse(ctx context.Context, result protocol.Result, invoker
 
 func (f *Filter) Set(name string, conf interface{}) {
 	switch name {
-	case config.GracefulShutdownFilterShutdownConfig:
+	case constant.GracefulShutdownFilterShutdownConfig:
 		if shutdownConfig, ok := conf.(*config.ShutdownConfig); !ok {
 			f.shutdownConfig = shutdownConfig
 			return
 		}
-		logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", config.GracefulShutdownFilterShutdownConfig)
+		logger.Warnf("the type of config for {%s} should be *config.ShutdownConfig", constant.GracefulShutdownFilterShutdownConfig)
 	default:
 		// do nothing
 	}