You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/02/01 03:30:17 UTC

[dubbo-go] branch 3.0 updated: graceful pr refine (#1731)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 3d09243  graceful pr refine (#1731)
3d09243 is described below

commit 3d092438dbbb3dd0db5e6c8c72b10d2b4778fe94
Author: binbin.zhang <bb...@163.com>
AuthorDate: Tue Feb 1 11:30:09 2022 +0800

    graceful pr refine (#1731)
---
 config/graceful_shutdown.go                 | 10 +++++-----
 config/graceful_shutdown_config.go          |  7 ++++---
 config/root_config.go                       | 15 +++------------
 filter/graceful_shutdown/consumer_filter.go |  5 ++---
 filter/graceful_shutdown/provider_filter.go |  5 ++---
 registry/protocol/protocol.go               |  2 +-
 6 files changed, 17 insertions(+), 27 deletions(-)

diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 0e27cee..01ebbbc 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -181,10 +181,10 @@ func waitingProviderProcessedTimeout(shutdownConfig *ShutdownConfig) {
 	}
 	deadline := time.Now().Add(timeout)
 
-	for time.Now().Before(deadline) && shutdownConfig.ProviderActiveCount > 0 {
+	for time.Now().Before(deadline) && shutdownConfig.ProviderActiveCount.Load() > 0 {
 		// sleep 10 ms and then we check it again
 		time.Sleep(10 * time.Millisecond)
-		logger.Infof("waiting for provider active invocation count = %d", shutdownConfig.ProviderActiveCount)
+		logger.Infof("waiting for provider active invocation count = %d", shutdownConfig.ProviderActiveCount.Load())
 	}
 }
 
@@ -195,7 +195,7 @@ func waitForSendingAndReceivingRequests() {
 		// ignore this step
 		return
 	}
-	rootConfig.Shutdown.RejectRequest = true
+	rootConfig.Shutdown.RejectRequest.Store(true)
 	waitingConsumerProcessedTimeout(rootConfig.Shutdown)
 }
 
@@ -206,10 +206,10 @@ func waitingConsumerProcessedTimeout(shutdownConfig *ShutdownConfig) {
 	}
 	deadline := time.Now().Add(timeout)
 
-	for time.Now().Before(deadline) && shutdownConfig.ConsumerActiveCount > 0 {
+	for time.Now().Before(deadline) && shutdownConfig.ConsumerActiveCount.Load() > 0 {
 		// sleep 10 ms and then we check it again
 		time.Sleep(10 * time.Millisecond)
-		logger.Infof("waiting for consumer active invocation count = %d", shutdownConfig.ConsumerActiveCount)
+		logger.Infof("waiting for consumer active invocation count = %d", shutdownConfig.ConsumerActiveCount.Load())
 	}
 }
 
diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go
index cab02ce..269dffb 100644
--- a/config/graceful_shutdown_config.go
+++ b/config/graceful_shutdown_config.go
@@ -23,6 +23,7 @@ import (
 
 import (
 	"github.com/creasty/defaults"
+	"go.uber.org/atomic"
 )
 
 import (
@@ -67,8 +68,8 @@ type ShutdownConfig struct {
 	// true -> new request will be rejected.
 	RejectRequest bool
 	// active invocation
-	ConsumerActiveCount int32
-	ProviderActiveCount int32
+	ConsumerActiveCount atomic.Int32
+	ProviderActiveCount atomic.Int32
 }
 
 // Prefix dubbo.shutdown
@@ -102,7 +103,7 @@ func (config *ShutdownConfig) GetConsumerUpdateWaitTime() time.Duration {
 	result, err := time.ParseDuration(config.ConsumerUpdateWaitTime)
 	if err != nil {
 		logger.Errorf("The ConsumerUpdateTimeout configuration is invalid: %s, and we will use the default value: %s, err: %v",
-			config.ConsumerActiveCount, defaultConsumerUpdateWaitTime.String(), err)
+			config.ConsumerActiveCount.Load(), defaultConsumerUpdateWaitTime.String(), err)
 		return defaultConsumerUpdateWaitTime
 	}
 	return result
diff --git a/config/root_config.go b/config/root_config.go
index e143b11..8f54467 100644
--- a/config/root_config.go
+++ b/config/root_config.go
@@ -101,20 +101,14 @@ func GetRootConfig() *RootConfig {
 }
 
 func GetProviderConfig() *ProviderConfig {
-	if err := check(); err != nil {
-		return NewProviderConfigBuilder().Build()
-	}
-	if rootConfig.Provider != nil {
+	if err := check(); err == nil && rootConfig.Provider != nil {
 		return rootConfig.Provider
 	}
 	return NewProviderConfigBuilder().Build()
 }
 
 func GetConsumerConfig() *ConsumerConfig {
-	if err := check(); err != nil {
-		return NewConsumerConfigBuilder().Build()
-	}
-	if rootConfig.Consumer != nil {
+	if err := check(); err == nil && rootConfig.Consumer != nil {
 		return rootConfig.Consumer
 	}
 	return NewConsumerConfigBuilder().Build()
@@ -125,10 +119,7 @@ func GetApplicationConfig() *ApplicationConfig {
 }
 
 func GetShutDown() *ShutdownConfig {
-	if err := check(); err != nil {
-		return NewShutDownConfigBuilder().Build()
-	}
-	if rootConfig.Shutdown != nil {
+	if err := check(); err == nil && rootConfig.Shutdown != nil {
 		return rootConfig.Shutdown
 	}
 	return NewShutDownConfigBuilder().Build()
diff --git a/filter/graceful_shutdown/consumer_filter.go b/filter/graceful_shutdown/consumer_filter.go
index 65ceda3..f49c661 100644
--- a/filter/graceful_shutdown/consumer_filter.go
+++ b/filter/graceful_shutdown/consumer_filter.go
@@ -20,7 +20,6 @@ package graceful_shutdown
 import (
 	"context"
 	"sync"
-	"sync/atomic"
 )
 
 import (
@@ -60,13 +59,13 @@ func newConsumerGracefulShutdownFilter() filter.Filter {
 
 // Invoke adds the requests count and block the new requests if application is closing
 func (f *consumerGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
-	atomic.AddInt32(&f.shutdownConfig.ConsumerActiveCount, 1)
+	f.shutdownConfig.ConsumerActiveCount.Inc()
 	return invoker.Invoke(ctx, invocation)
 }
 
 // OnResponse reduces the number of active processes then return the process result
 func (f *consumerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
-	atomic.AddInt32(&f.shutdownConfig.ConsumerActiveCount, -1)
+	f.shutdownConfig.ConsumerActiveCount.Dec()
 	return result
 }
 
diff --git a/filter/graceful_shutdown/provider_filter.go b/filter/graceful_shutdown/provider_filter.go
index 85f0c48..d428f1f 100644
--- a/filter/graceful_shutdown/provider_filter.go
+++ b/filter/graceful_shutdown/provider_filter.go
@@ -20,7 +20,6 @@ package graceful_shutdown
 import (
 	"context"
 	"sync"
-	"sync/atomic"
 )
 
 import (
@@ -63,13 +62,13 @@ func (f *providerGracefulShutdownFilter) Invoke(ctx context.Context, invoker pro
 		logger.Info("The application is closing, new request will be rejected.")
 		return f.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation)
 	}
-	atomic.AddInt32(&f.shutdownConfig.ProviderActiveCount, 1)
+	f.shutdownConfig.ProviderActiveCount.Inc()
 	return invoker.Invoke(ctx, invocation)
 }
 
 // OnResponse reduces the number of active processes then return the process result
 func (f *providerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
-	atomic.AddInt32(&f.shutdownConfig.ProviderActiveCount, -1)
+	f.shutdownConfig.ProviderActiveCount.Dec()
 	return result
 }
 
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index c1f22ee..b19d679 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -86,7 +86,7 @@ func (proto *registryProtocol) getRegistry(registryUrl *common.URL) registry.Reg
 		reg, err = extension.GetRegistry(registryUrl.Protocol, registryUrl)
 		if err != nil {
 			logger.Errorf("Registry can not connect success, program is going to panic.Error message is %s", err.Error())
-			panic(err.Error())
+			panic(err)
 		}
 		proto.registries.Store(registryUrl.Location, reg)
 	}