You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/08/27 14:03:58 UTC
[dubbo-go] branch 1.5 updated: Ftr: Added more event distribution
types and improved event distribution mechanism for 1.5 (#1405)
This is an automated email from the ASF dual-hosted git repository.
alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git
The following commit(s) were added to refs/heads/1.5 by this push:
new 13e7a6d Ftr: Added more event distribution types and improved event distribution mechanism for 1.5 (#1405)
13e7a6d is described below
commit 13e7a6dcce74e0874951a726d00490136e0fea5c
Author: ChangedenChan <ch...@gmail.com>
AuthorDate: Fri Aug 27 22:03:52 2021 +0800
Ftr: Added more event distribution types and improved event distribution mechanism for 1.5 (#1405)
* 改进旧版的事件分发机制,引入更多事件
* 新增事件删除功能
* 优化命名方式
* 优化代码逻辑;合并ConfigLoaderHook相关Func至ConfigPostProcessor中;
Co-authored-by: Changeden <ch...@unizone.tech>
---
common/constant/key.go | 11 ++++
common/extension/config_post_processor.go | 5 ++
config/config_loader.go | 15 +++++-
config/config_loader_test.go | 83 ++++++++++++++++++++++++++++++
config/graceful_shutdown.go | 8 +++
config/interfaces/config_post_processor.go | 15 +++++-
config/reference_config.go | 24 +++++++--
config/service_config.go | 24 +++++++--
8 files changed, 175 insertions(+), 10 deletions(-)
diff --git a/common/constant/key.go b/common/constant/key.go
index 7240d10..b062b07 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -350,3 +350,14 @@ const (
// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
SERVICE_DISCOVERY_KEY = "service_discovery"
)
+
+// Loader Hook
+const (
+ HookEventBeforeReferenceConnect = "before-reference-connect"
+ HookEventReferenceConnectSuccess = "reference-connect-success"
+ HookEventReferenceConnectFail = "reference-connect-fail"
+
+ HookEventBeforeProviderConnect = "before-service-listen"
+ HookEventProviderConnectSuccess = "service-listen-success"
+ HookEventProviderConnectFail = "service-listen-fail"
+)
diff --git a/common/extension/config_post_processor.go b/common/extension/config_post_processor.go
index db126b7..e65bb06 100644
--- a/common/extension/config_post_processor.go
+++ b/common/extension/config_post_processor.go
@@ -35,6 +35,11 @@ func GetConfigPostProcessor(name string) interfaces.ConfigPostProcessor {
return processors[name]
}
+// RemoveConfigPostProcessor remove process from processors.
+func RemoveConfigPostProcessor(name string) {
+ delete(processors, name)
+}
+
// GetConfigPostProcessors returns all registered instances of ConfigPostProcessor.
func GetConfigPostProcessors() []interfaces.ConfigPostProcessor {
ret := make([]interfaces.ConfigPostProcessor, 0, len(processors))
diff --git a/config/config_loader.go b/config/config_loader.go
index 14caf2c..7ec64aa 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -169,6 +169,7 @@ func loadConsumerConfig() {
for {
checkok := true
for _, refconfig := range consumerConfig.References {
+ referenceURL := refconfig.getValidURL()
if (refconfig.Check != nil && *refconfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
(refconfig.Check == nil && consumerConfig.Check == nil) { // default to true
@@ -179,6 +180,7 @@ func loadConsumerConfig() {
if count > maxWait {
errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
logger.Error(errMsg)
+ refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectFail, &errMsg)
panic(errMsg)
}
time.Sleep(time.Second * 1)
@@ -186,13 +188,16 @@ func loadConsumerConfig() {
}
if refconfig.invoker == nil {
logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", refconfig.InterfaceName)
+ continue
}
}
+ refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectSuccess, nil)
}
if checkok {
break
}
}
+ postAllConsumersConnectComplete()
}
func loadProviderConfig() {
@@ -247,11 +252,17 @@ func loadProviderConfig() {
svs.id = key
svs.Implement(rpcService)
svs.Protocols = providerConfig.Protocols
- if err := svs.Export(); err != nil {
- panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
+ err := svs.Export()
+ serviceURL := svs.getValidURL()
+ if err != nil {
+ errMsg := fmt.Sprintf("service %s export failed! err: %#v", key, err)
+ svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectFail, &errMsg)
+ panic(errMsg)
}
+ svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectSuccess, nil)
}
registerServiceInstance()
+ postAllProvidersConnectComplete()
}
// registerServiceInstance register service instance
diff --git a/config/config_loader_test.go b/config/config_loader_test.go
index 9c99efa..be51665 100644
--- a/config/config_loader_test.go
+++ b/config/config_loader_test.go
@@ -111,6 +111,89 @@ func TestLoad(t *testing.T) {
providerConfig = nil
}
+type CustomEvent struct {
+ t *testing.T
+}
+
+// implements interfaces.ConfigPostProcessor's functions
+func (c CustomEvent) PostProcessReferenceConfig(u *common.URL, event string, errMsg *string) {
+ logger.Debug("PostProcessReferenceConfig Start")
+ logger.Debug("Event: ", event)
+ logger.Debug("Url: ", u)
+ if errMsg != nil {
+ logger.Debug("Error Message: ", *errMsg)
+ }
+ logger.Debug("PostProcessReferenceConfig End")
+ assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "consumer")
+}
+func (c CustomEvent) PostProcessServiceConfig(u *common.URL, event string, errMsg *string) {
+ logger.Debug("PostProcessServiceConfig Start")
+ logger.Debug("Event: ", event)
+ logger.Debug("Url: ", u)
+ if errMsg != nil {
+ logger.Debug("Error Message: ", *errMsg)
+ }
+ logger.Debug("PostProcessServiceConfig End")
+ assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "provider")
+}
+func (c CustomEvent) AllReferencesConnectComplete() {
+ logger.Debug("AllConsumersConnectComplete")
+}
+func (c CustomEvent) AllServicesListenComplete() {
+ logger.Debug("AllServicesListenComplete")
+}
+func (c CustomEvent) BeforeShutdown() {
+ logger.Debug("BeforeShutdown")
+}
+
+func TestLoadWithEventDispatch(t *testing.T) {
+ doInitConsumer()
+ doInitProvider()
+ for _, v := range providerConfig.Services {
+ v.export = true
+ }
+
+ ms := &MockService{}
+ SetConsumerService(ms)
+ SetProviderService(ms)
+
+ extension.SetProtocol("registry", GetProtocol)
+ extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
+ extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
+ GetApplicationConfig().MetadataType = "mock"
+ var mm *mockMetadataService
+ extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) {
+ if mm == nil {
+ mm = &mockMetadataService{
+ exportedServiceURLs: new(sync.Map),
+ lock: new(sync.RWMutex),
+ }
+ }
+ return mm, nil
+ })
+
+ configPostProcessorName := "TestLoadWithEventDispatch"
+ extension.SetConfigPostProcessor(configPostProcessorName, CustomEvent{t})
+
+ Load()
+
+ assert.Equal(t, ms, GetRPCService(ms.Reference()))
+ ms2 := &struct {
+ MockService
+ }{}
+ RPCService(ms2)
+ assert.NotEqual(t, ms2, GetRPCService(ms2.Reference()))
+
+ conServices = map[string]common.RPCService{}
+ proServices = map[string]common.RPCService{}
+ err := common.ServiceMap.UnRegister("com.MockService", "mock",
+ common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
+ assert.Nil(t, err)
+ extension.RemoveConfigPostProcessor(configPostProcessorName)
+ consumerConfig = nil
+ providerConfig = nil
+}
+
func TestLoadWithSingleReg(t *testing.T) {
doInitConsumerWithSingleRegistry()
mockInitProviderWithSingleRegistry()
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 89ac2e3..f8f29c2 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -67,6 +67,7 @@ func GracefulShutdownInit() {
// gracefulShutdownOnce.Do(func() {
time.AfterFunc(totalTimeout(), func() {
logger.Warn("Shutdown gracefully timeout, application will shutdown immediately. ")
+ postBeforeShutdown()
os.Exit(0)
})
BeforeShutdown()
@@ -76,6 +77,7 @@ func GracefulShutdownInit() {
debug.WriteHeapDump(os.Stdout.Fd())
}
}
+ postBeforeShutdown()
os.Exit(0)
}
}()
@@ -223,3 +225,9 @@ func getConsumerProtocols() *gxset.HashSet {
}
return result
}
+
+func postBeforeShutdown() {
+ for _, p := range extension.GetConfigPostProcessors() {
+ p.BeforeShutdown()
+ }
+}
diff --git a/config/interfaces/config_post_processor.go b/config/interfaces/config_post_processor.go
index 53dd717..4578c59 100644
--- a/config/interfaces/config_post_processor.go
+++ b/config/interfaces/config_post_processor.go
@@ -25,8 +25,19 @@ import (
// ServiceConfig during deployment time.
type ConfigPostProcessor interface {
// PostProcessReferenceConfig customizes ReferenceConfig's params.
- PostProcessReferenceConfig(*common.URL)
+ // PostProcessReferenceConfig emit on refer reference (event: before-reference-connect, reference-connect-success, reference-connect-fail)
+ PostProcessReferenceConfig(url *common.URL, event string, errMsg *string)
// PostProcessServiceConfig customizes ServiceConfig's params.
- PostProcessServiceConfig(*common.URL)
+ // PostProcessServiceConfig emit on export service (event: before-service-listen, service-listen-success, service-listen-fail)
+ PostProcessServiceConfig(url *common.URL, event string, errMsg *string)
+
+ // AllReferencesConnectComplete emit on all references export complete
+ AllReferencesConnectComplete()
+
+ // AllServicesListenComplete emit on all services export complete
+ AllServicesListenComplete()
+
+ // BeforeShutdown emit on before shutdown
+ BeforeShutdown()
}
diff --git a/config/reference_config.go b/config/reference_config.go
index d3730f0..eb35ee3 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -101,7 +101,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
if c.ForceTag {
cfgURL.AddParam(constant.ForceUseTag, "true")
}
- c.postProcessConfig(cfgURL)
+ c.postProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, nil)
if c.URL != "" {
// 1. user specified URL, could be peer-to-peer address, or register center's address.
urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
@@ -302,8 +302,26 @@ func publishConsumerDefinition(url *common.URL) {
}
// postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig.
-func (c *ReferenceConfig) postProcessConfig(url *common.URL) {
+func (c *ReferenceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
for _, p := range extension.GetConfigPostProcessors() {
- p.PostProcessReferenceConfig(url)
+ p.PostProcessReferenceConfig(url, event, errMsg)
+ }
+}
+
+func (c *ReferenceConfig) getValidURL() *common.URL {
+ urls := c.urls
+ var u *common.URL
+ if urls != nil && len(urls) > 0 {
+ u = urls[0]
+ }
+ if u != nil && u.SubURL != nil {
+ return u.SubURL
+ }
+ return u
+}
+
+func postAllConsumersConnectComplete() {
+ for _, p := range extension.GetConfigPostProcessors() {
+ p.AllReferencesConnectComplete()
}
}
diff --git a/config/service_config.go b/config/service_config.go
index d73190c..dc4902e 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -203,7 +203,7 @@ func (c *ServiceConfig) Export() error {
}
// post process the URL to be exported
- c.postProcessConfig(ivkURL)
+ c.postProcessConfig(ivkURL, constant.HookEventBeforeProviderConnect, nil)
// config post processor may set "export" to false
if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
return nil
@@ -355,8 +355,26 @@ func publishServiceDefinition(url *common.URL) {
}
// postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
-func (c *ServiceConfig) postProcessConfig(url *common.URL) {
+func (c *ServiceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
for _, p := range extension.GetConfigPostProcessors() {
- p.PostProcessServiceConfig(url)
+ p.PostProcessServiceConfig(url, event, errMsg)
+ }
+}
+
+func (c *ServiceConfig) getValidURL() *common.URL {
+ urls := c.GetExportedUrls()
+ var u *common.URL
+ if urls != nil && len(urls) > 0 {
+ u = urls[0]
+ }
+ if u != nil && u.SubURL != nil {
+ return u.SubURL
+ }
+ return u
+}
+
+func postAllProvidersConnectComplete() {
+ for _, p := range extension.GetConfigPostProcessors() {
+ p.AllServicesListenComplete()
}
}