You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/06/13 01:40:25 UTC

[dubbo-go] branch 3.0 updated: Fix: Graceful shutdown bugs (#1254)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 7fedc22  Fix: Graceful shutdown bugs (#1254)
7fedc22 is described below

commit 7fedc22e18a62d0ec9f01b5a2dba00c045d02bdf
Author: XavierNiu <a...@nxw.name>
AuthorDate: Sun Jun 13 09:40:16 2021 +0800

    Fix: Graceful shutdown bugs (#1254)
    
    * fix graceful shutdown bugs
    
    * simplify log
    
    * fix graceful shutdown bug for invokers
    
    * fix timeout bug
    
    * fix RequestsFinished not working bug
    
    * fix bugs related ShutdownConfig
---
 config/graceful_shutdown.go                               | 6 ++++--
 config/graceful_shutdown_config.go                        | 1 -
 filter/filter_impl/graceful_shutdown_filter.go            | 3 +++
 registry/protocol/protocol.go                             | 8 +++-----
 registry/zookeeper/registry.go                            | 1 +
 registry/zookeeper/service_discovery.go                   | 2 +-
 remoting/zookeeper/curator_discovery/service_discovery.go | 9 +++++++++
 7 files changed, 21 insertions(+), 9 deletions(-)

diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index d06c1dc..fd34e20 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -163,6 +163,7 @@ func waitForReceivingRequests() {
 		// ignore this step
 		return
 	}
+	providerConfig.ShutdownConfig.RejectRequest = true
 	waitingProcessedTimeout(providerConfig.ShutdownConfig)
 }
 
@@ -173,6 +174,7 @@ func waitForSendingRequests() {
 		// ignore this step
 		return
 	}
+	consumerConfig.ShutdownConfig.RejectRequest = true
 	waitingProcessedTimeout(consumerConfig.ShutdownConfig)
 }
 
@@ -181,9 +183,9 @@ func waitingProcessedTimeout(shutdownConfig *ShutdownConfig) {
 	if timeout <= 0 {
 		return
 	}
-	start := time.Now()
+	deadline := time.Now().Add(timeout)
 
-	for time.Now().After(start.Add(timeout)) && !shutdownConfig.RequestsFinished {
+	for time.Now().Before(deadline) && !shutdownConfig.RequestsFinished {
 		// sleep 10 ms and then we check it again
 		time.Sleep(10 * time.Millisecond)
 	}
diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go
index f4c8201..fd0bfb4 100644
--- a/config/graceful_shutdown_config.go
+++ b/config/graceful_shutdown_config.go
@@ -52,7 +52,6 @@ type ShutdownConfig struct {
 	RejectRequestHandler string `yaml:"reject_handler" json:"reject_handler,omitempty" property:"reject_handler"`
 	// true -> new request will be rejected.
 	RejectRequest bool
-
 	// true -> all requests had been processed. In provider side it means that all requests are returned response to clients
 	// In consumer side, it means that all requests getting response from servers
 	RequestsFinished bool
diff --git a/filter/filter_impl/graceful_shutdown_filter.go b/filter/filter_impl/graceful_shutdown_filter.go
index 8428bb9..028ffd2 100644
--- a/filter/filter_impl/graceful_shutdown_filter.go
+++ b/filter/filter_impl/graceful_shutdown_filter.go
@@ -60,6 +60,9 @@ func (gf *gracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.I
 		return gf.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation)
 	}
 	atomic.AddInt32(&gf.activeCount, 1)
+	if gf.shutdownConfig != nil && gf.activeCount > 0 {
+		gf.shutdownConfig.RequestsFinished = false
+	}
 	return invoker.Invoke(ctx, invocation)
 }
 
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index 891cd1e..25204e6 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -384,13 +384,11 @@ func getSubscribedOverrideUrl(providerUrl *common.URL) *common.URL {
 
 // Destroy registry protocol
 func (proto *registryProtocol) Destroy() {
-	for _, ivk := range proto.invokers {
-		ivk.Destroy()
-	}
+	// invoker.Destroy() should be performed in config.destroyConsumerProtocols().
 	proto.invokers = []protocol.Invoker{}
 	proto.bounds.Range(func(key, value interface{}) bool {
-		exporter := value.(protocol.Exporter)
-		exporter.Unexport()
+		// protocol holds the exporters actually, instead, registry holds them in order to avoid export repeatedly, so
+		// the work for unexport should be finished in protocol.Unexport(), see also config.destroyProviderProtocols().
 		proto.bounds.Delete(key)
 		return true
 	})
diff --git a/registry/zookeeper/registry.go b/registry/zookeeper/registry.go
index f8f661d..4079d56 100644
--- a/registry/zookeeper/registry.go
+++ b/registry/zookeeper/registry.go
@@ -180,6 +180,7 @@ func (r *zkRegistry) DoUnsubscribe(conf *common.URL) (registry.Listener, error)
 
 // CloseAndNilClient closes listeners and clear client
 func (r *zkRegistry) CloseAndNilClient() {
+	r.listener.Close()
 	r.client.Close()
 	r.client = nil
 }
diff --git a/registry/zookeeper/service_discovery.go b/registry/zookeeper/service_discovery.go
index a23b6c4..d9e979c 100644
--- a/registry/zookeeper/service_discovery.go
+++ b/registry/zookeeper/service_discovery.go
@@ -167,7 +167,7 @@ func (zksd *zookeeperServiceDiscovery) String() string {
 
 // Close client be closed
 func (zksd *zookeeperServiceDiscovery) Destroy() error {
-	zksd.client.Close()
+	zksd.csd.Close()
 	return nil
 }
 
diff --git a/remoting/zookeeper/curator_discovery/service_discovery.go b/remoting/zookeeper/curator_discovery/service_discovery.go
index beab89c..27cd9c7 100644
--- a/remoting/zookeeper/curator_discovery/service_discovery.go
+++ b/remoting/zookeeper/curator_discovery/service_discovery.go
@@ -276,3 +276,12 @@ func (sd *ServiceDiscovery) pathForInstance(name, id string) string {
 func (sd *ServiceDiscovery) pathForName(name string) string {
 	return path.Join(sd.basePath, name)
 }
+
+func (sd *ServiceDiscovery) Close() {
+	if sd.listener != nil {
+		sd.listener.Close()
+	}
+	if sd.client != nil {
+		sd.client.Close()
+	}
+}