You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2022/02/01 03:30:17 UTC
[dubbo-go] branch 3.0 updated: graceful pr refine (#1731)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/3.0 by this push:
new 3d09243 graceful pr refine (#1731)
3d09243 is described below
commit 3d092438dbbb3dd0db5e6c8c72b10d2b4778fe94
Author: binbin.zhang <bb...@163.com>
AuthorDate: Tue Feb 1 11:30:09 2022 +0800
graceful pr refine (#1731)
---
config/graceful_shutdown.go | 10 +++++-----
config/graceful_shutdown_config.go | 7 ++++---
config/root_config.go | 15 +++------------
filter/graceful_shutdown/consumer_filter.go | 5 ++---
filter/graceful_shutdown/provider_filter.go | 5 ++---
registry/protocol/protocol.go | 2 +-
6 files changed, 17 insertions(+), 27 deletions(-)
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 0e27cee..01ebbbc 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -181,10 +181,10 @@ func waitingProviderProcessedTimeout(shutdownConfig *ShutdownConfig) {
}
deadline := time.Now().Add(timeout)
- for time.Now().Before(deadline) && shutdownConfig.ProviderActiveCount > 0 {
+ for time.Now().Before(deadline) && shutdownConfig.ProviderActiveCount.Load() > 0 {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
- logger.Infof("waiting for provider active invocation count = %d", shutdownConfig.ProviderActiveCount)
+ logger.Infof("waiting for provider active invocation count = %d", shutdownConfig.ProviderActiveCount.Load())
}
}
@@ -195,7 +195,7 @@ func waitForSendingAndReceivingRequests() {
// ignore this step
return
}
- rootConfig.Shutdown.RejectRequest = true
+ rootConfig.Shutdown.RejectRequest.Store(true)
waitingConsumerProcessedTimeout(rootConfig.Shutdown)
}
@@ -206,10 +206,10 @@ func waitingConsumerProcessedTimeout(shutdownConfig *ShutdownConfig) {
}
deadline := time.Now().Add(timeout)
- for time.Now().Before(deadline) && shutdownConfig.ConsumerActiveCount > 0 {
+ for time.Now().Before(deadline) && shutdownConfig.ConsumerActiveCount.Load() > 0 {
// sleep 10 ms and then we check it again
time.Sleep(10 * time.Millisecond)
- logger.Infof("waiting for consumer active invocation count = %d", shutdownConfig.ConsumerActiveCount)
+ logger.Infof("waiting for consumer active invocation count = %d", shutdownConfig.ConsumerActiveCount.Load())
}
}
diff --git a/config/graceful_shutdown_config.go b/config/graceful_shutdown_config.go
index cab02ce..269dffb 100644
--- a/config/graceful_shutdown_config.go
+++ b/config/graceful_shutdown_config.go
@@ -23,6 +23,7 @@ import (
import (
"github.com/creasty/defaults"
+ "go.uber.org/atomic"
)
import (
@@ -67,8 +68,8 @@ type ShutdownConfig struct {
// true -> new request will be rejected.
RejectRequest bool
// active invocation
- ConsumerActiveCount int32
- ProviderActiveCount int32
+ ConsumerActiveCount atomic.Int32
+ ProviderActiveCount atomic.Int32
}
// Prefix dubbo.shutdown
@@ -102,7 +103,7 @@ func (config *ShutdownConfig) GetConsumerUpdateWaitTime() time.Duration {
result, err := time.ParseDuration(config.ConsumerUpdateWaitTime)
if err != nil {
logger.Errorf("The ConsumerUpdateTimeout configuration is invalid: %s, and we will use the default value: %s, err: %v",
- config.ConsumerActiveCount, defaultConsumerUpdateWaitTime.String(), err)
+ config.ConsumerActiveCount.Load(), defaultConsumerUpdateWaitTime.String(), err)
return defaultConsumerUpdateWaitTime
}
return result
diff --git a/config/root_config.go b/config/root_config.go
index e143b11..8f54467 100644
--- a/config/root_config.go
+++ b/config/root_config.go
@@ -101,20 +101,14 @@ func GetRootConfig() *RootConfig {
}
func GetProviderConfig() *ProviderConfig {
- if err := check(); err != nil {
- return NewProviderConfigBuilder().Build()
- }
- if rootConfig.Provider != nil {
+ if err := check(); err == nil && rootConfig.Provider != nil {
return rootConfig.Provider
}
return NewProviderConfigBuilder().Build()
}
func GetConsumerConfig() *ConsumerConfig {
- if err := check(); err != nil {
- return NewConsumerConfigBuilder().Build()
- }
- if rootConfig.Consumer != nil {
+ if err := check(); err == nil && rootConfig.Consumer != nil {
return rootConfig.Consumer
}
return NewConsumerConfigBuilder().Build()
@@ -125,10 +119,7 @@ func GetApplicationConfig() *ApplicationConfig {
}
func GetShutDown() *ShutdownConfig {
- if err := check(); err != nil {
- return NewShutDownConfigBuilder().Build()
- }
- if rootConfig.Shutdown != nil {
+ if err := check(); err == nil && rootConfig.Shutdown != nil {
return rootConfig.Shutdown
}
return NewShutDownConfigBuilder().Build()
diff --git a/filter/graceful_shutdown/consumer_filter.go b/filter/graceful_shutdown/consumer_filter.go
index 65ceda3..f49c661 100644
--- a/filter/graceful_shutdown/consumer_filter.go
+++ b/filter/graceful_shutdown/consumer_filter.go
@@ -20,7 +20,6 @@ package graceful_shutdown
import (
"context"
"sync"
- "sync/atomic"
)
import (
@@ -60,13 +59,13 @@ func newConsumerGracefulShutdownFilter() filter.Filter {
// Invoke adds the requests count and block the new requests if application is closing
func (f *consumerGracefulShutdownFilter) Invoke(ctx context.Context, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- atomic.AddInt32(&f.shutdownConfig.ConsumerActiveCount, 1)
+ f.shutdownConfig.ConsumerActiveCount.Inc()
return invoker.Invoke(ctx, invocation)
}
// OnResponse reduces the number of active processes then return the process result
func (f *consumerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- atomic.AddInt32(&f.shutdownConfig.ConsumerActiveCount, -1)
+ f.shutdownConfig.ConsumerActiveCount.Dec()
return result
}
diff --git a/filter/graceful_shutdown/provider_filter.go b/filter/graceful_shutdown/provider_filter.go
index 85f0c48..d428f1f 100644
--- a/filter/graceful_shutdown/provider_filter.go
+++ b/filter/graceful_shutdown/provider_filter.go
@@ -20,7 +20,6 @@ package graceful_shutdown
import (
"context"
"sync"
- "sync/atomic"
)
import (
@@ -63,13 +62,13 @@ func (f *providerGracefulShutdownFilter) Invoke(ctx context.Context, invoker pro
logger.Info("The application is closing, new request will be rejected.")
return f.getRejectHandler().RejectedExecution(invoker.GetURL(), invocation)
}
- atomic.AddInt32(&f.shutdownConfig.ProviderActiveCount, 1)
+ f.shutdownConfig.ProviderActiveCount.Inc()
return invoker.Invoke(ctx, invocation)
}
// OnResponse reduces the number of active processes then return the process result
func (f *providerGracefulShutdownFilter) OnResponse(ctx context.Context, result protocol.Result, invoker protocol.Invoker, invocation protocol.Invocation) protocol.Result {
- atomic.AddInt32(&f.shutdownConfig.ProviderActiveCount, -1)
+ f.shutdownConfig.ProviderActiveCount.Dec()
return result
}
diff --git a/registry/protocol/protocol.go b/registry/protocol/protocol.go
index c1f22ee..b19d679 100644
--- a/registry/protocol/protocol.go
+++ b/registry/protocol/protocol.go
@@ -86,7 +86,7 @@ func (proto *registryProtocol) getRegistry(registryUrl *common.URL) registry.Reg
reg, err = extension.GetRegistry(registryUrl.Protocol, registryUrl)
if err != nil {
logger.Errorf("Registry can not connect success, program is going to panic.Error message is %s", err.Error())
- panic(err.Error())
+ panic(err)
}
proto.registries.Store(registryUrl.Location, reg)
}