You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/08/27 14:03:58 UTC

[dubbo-go] branch 1.5 updated: Ftr: Added more event distribution types and improved event distribution mechanism for 1.5 (#1405)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.5 by this push:
     new 13e7a6d  Ftr: Added more event distribution types and improved event distribution mechanism for 1.5 (#1405)
13e7a6d is described below

commit 13e7a6dcce74e0874951a726d00490136e0fea5c
Author: ChangedenChan <ch...@gmail.com>
AuthorDate: Fri Aug 27 22:03:52 2021 +0800

    Ftr: Added more event distribution types and improved event distribution mechanism for 1.5 (#1405)
    
    * 改进旧版的事件分发机制,引入更多事件
    
    * 新增事件删除功能
    
    * 优化命名方式
    
    * 优化代码逻辑;合并ConfigLoaderHook相关Func至ConfigPostProcessor中;
    
    Co-authored-by: Changeden <ch...@unizone.tech>
---
 common/constant/key.go                     | 11 ++++
 common/extension/config_post_processor.go  |  5 ++
 config/config_loader.go                    | 15 +++++-
 config/config_loader_test.go               | 83 ++++++++++++++++++++++++++++++
 config/graceful_shutdown.go                |  8 +++
 config/interfaces/config_post_processor.go | 15 +++++-
 config/reference_config.go                 | 24 +++++++--
 config/service_config.go                   | 24 +++++++--
 8 files changed, 175 insertions(+), 10 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index 7240d10..b062b07 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -350,3 +350,14 @@ const (
 	// SERVICE_DISCOVERY_KEY indicate which service discovery instance will be used
 	SERVICE_DISCOVERY_KEY = "service_discovery"
 )
+
+// Loader Hook
+const (
+	HookEventBeforeReferenceConnect  = "before-reference-connect"
+	HookEventReferenceConnectSuccess = "reference-connect-success"
+	HookEventReferenceConnectFail    = "reference-connect-fail"
+
+	HookEventBeforeProviderConnect  = "before-service-listen"
+	HookEventProviderConnectSuccess = "service-listen-success"
+	HookEventProviderConnectFail    = "service-listen-fail"
+)
diff --git a/common/extension/config_post_processor.go b/common/extension/config_post_processor.go
index db126b7..e65bb06 100644
--- a/common/extension/config_post_processor.go
+++ b/common/extension/config_post_processor.go
@@ -35,6 +35,11 @@ func GetConfigPostProcessor(name string) interfaces.ConfigPostProcessor {
 	return processors[name]
 }
 
+// RemoveConfigPostProcessor remove process from processors.
+func RemoveConfigPostProcessor(name string) {
+	delete(processors, name)
+}
+
 // GetConfigPostProcessors returns all registered instances of ConfigPostProcessor.
 func GetConfigPostProcessors() []interfaces.ConfigPostProcessor {
 	ret := make([]interfaces.ConfigPostProcessor, 0, len(processors))
diff --git a/config/config_loader.go b/config/config_loader.go
index 14caf2c..7ec64aa 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -169,6 +169,7 @@ func loadConsumerConfig() {
 	for {
 		checkok := true
 		for _, refconfig := range consumerConfig.References {
+			referenceURL := refconfig.getValidURL()
 			if (refconfig.Check != nil && *refconfig.Check) ||
 				(refconfig.Check == nil && consumerConfig.Check != nil && *consumerConfig.Check) ||
 				(refconfig.Check == nil && consumerConfig.Check == nil) { // default to true
@@ -179,6 +180,7 @@ func loadConsumerConfig() {
 					if count > maxWait {
 						errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
 						logger.Error(errMsg)
+						refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectFail, &errMsg)
 						panic(errMsg)
 					}
 					time.Sleep(time.Second * 1)
@@ -186,13 +188,16 @@ func loadConsumerConfig() {
 				}
 				if refconfig.invoker == nil {
 					logger.Warnf("The interface %s invoker not exist, may you should check your interface config.", refconfig.InterfaceName)
+					continue
 				}
 			}
+			refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectSuccess, nil)
 		}
 		if checkok {
 			break
 		}
 	}
+	postAllConsumersConnectComplete()
 }
 
 func loadProviderConfig() {
@@ -247,11 +252,17 @@ func loadProviderConfig() {
 		svs.id = key
 		svs.Implement(rpcService)
 		svs.Protocols = providerConfig.Protocols
-		if err := svs.Export(); err != nil {
-			panic(fmt.Sprintf("service %s export failed! err: %#v", key, err))
+		err := svs.Export()
+		serviceURL := svs.getValidURL()
+		if err != nil {
+			errMsg := fmt.Sprintf("service %s export failed! err: %#v", key, err)
+			svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectFail, &errMsg)
+			panic(errMsg)
 		}
+		svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectSuccess, nil)
 	}
 	registerServiceInstance()
+	postAllProvidersConnectComplete()
 }
 
 // registerServiceInstance register service instance
diff --git a/config/config_loader_test.go b/config/config_loader_test.go
index 9c99efa..be51665 100644
--- a/config/config_loader_test.go
+++ b/config/config_loader_test.go
@@ -111,6 +111,89 @@ func TestLoad(t *testing.T) {
 	providerConfig = nil
 }
 
+type CustomEvent struct {
+	t *testing.T
+}
+
+// implements interfaces.ConfigPostProcessor's functions
+func (c CustomEvent) PostProcessReferenceConfig(u *common.URL, event string, errMsg *string) {
+	logger.Debug("PostProcessReferenceConfig Start")
+	logger.Debug("Event: ", event)
+	logger.Debug("Url: ", u)
+	if errMsg != nil {
+		logger.Debug("Error Message: ", *errMsg)
+	}
+	logger.Debug("PostProcessReferenceConfig End")
+	assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "consumer")
+}
+func (c CustomEvent) PostProcessServiceConfig(u *common.URL, event string, errMsg *string) {
+	logger.Debug("PostProcessServiceConfig Start")
+	logger.Debug("Event: ", event)
+	logger.Debug("Url: ", u)
+	if errMsg != nil {
+		logger.Debug("Error Message: ", *errMsg)
+	}
+	logger.Debug("PostProcessServiceConfig End")
+	assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "provider")
+}
+func (c CustomEvent) AllReferencesConnectComplete() {
+	logger.Debug("AllConsumersConnectComplete")
+}
+func (c CustomEvent) AllServicesListenComplete() {
+	logger.Debug("AllServicesListenComplete")
+}
+func (c CustomEvent) BeforeShutdown() {
+	logger.Debug("BeforeShutdown")
+}
+
+func TestLoadWithEventDispatch(t *testing.T) {
+	doInitConsumer()
+	doInitProvider()
+	for _, v := range providerConfig.Services {
+		v.export = true
+	}
+
+	ms := &MockService{}
+	SetConsumerService(ms)
+	SetProviderService(ms)
+
+	extension.SetProtocol("registry", GetProtocol)
+	extension.SetCluster(constant.ZONEAWARE_CLUSTER_NAME, cluster_impl.NewZoneAwareCluster)
+	extension.SetProxyFactory("default", proxy_factory.NewDefaultProxyFactory)
+	GetApplicationConfig().MetadataType = "mock"
+	var mm *mockMetadataService
+	extension.SetMetadataService("mock", func() (metadataService service.MetadataService, err error) {
+		if mm == nil {
+			mm = &mockMetadataService{
+				exportedServiceURLs: new(sync.Map),
+				lock:                new(sync.RWMutex),
+			}
+		}
+		return mm, nil
+	})
+
+	configPostProcessorName := "TestLoadWithEventDispatch"
+	extension.SetConfigPostProcessor(configPostProcessorName, CustomEvent{t})
+
+	Load()
+
+	assert.Equal(t, ms, GetRPCService(ms.Reference()))
+	ms2 := &struct {
+		MockService
+	}{}
+	RPCService(ms2)
+	assert.NotEqual(t, ms2, GetRPCService(ms2.Reference()))
+
+	conServices = map[string]common.RPCService{}
+	proServices = map[string]common.RPCService{}
+	err := common.ServiceMap.UnRegister("com.MockService", "mock",
+		common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
+	assert.Nil(t, err)
+	extension.RemoveConfigPostProcessor(configPostProcessorName)
+	consumerConfig = nil
+	providerConfig = nil
+}
+
 func TestLoadWithSingleReg(t *testing.T) {
 	doInitConsumerWithSingleRegistry()
 	mockInitProviderWithSingleRegistry()
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index 89ac2e3..f8f29c2 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -67,6 +67,7 @@ func GracefulShutdownInit() {
 			// gracefulShutdownOnce.Do(func() {
 			time.AfterFunc(totalTimeout(), func() {
 				logger.Warn("Shutdown gracefully timeout, application will shutdown immediately. ")
+				postBeforeShutdown()
 				os.Exit(0)
 			})
 			BeforeShutdown()
@@ -76,6 +77,7 @@ func GracefulShutdownInit() {
 					debug.WriteHeapDump(os.Stdout.Fd())
 				}
 			}
+			postBeforeShutdown()
 			os.Exit(0)
 		}
 	}()
@@ -223,3 +225,9 @@ func getConsumerProtocols() *gxset.HashSet {
 	}
 	return result
 }
+
+func postBeforeShutdown() {
+	for _, p := range extension.GetConfigPostProcessors() {
+		p.BeforeShutdown()
+	}
+}
diff --git a/config/interfaces/config_post_processor.go b/config/interfaces/config_post_processor.go
index 53dd717..4578c59 100644
--- a/config/interfaces/config_post_processor.go
+++ b/config/interfaces/config_post_processor.go
@@ -25,8 +25,19 @@ import (
 // ServiceConfig during deployment time.
 type ConfigPostProcessor interface {
 	// PostProcessReferenceConfig customizes ReferenceConfig's params.
-	PostProcessReferenceConfig(*common.URL)
+	// PostProcessReferenceConfig emit on refer reference (event: before-reference-connect, reference-connect-success, reference-connect-fail)
+	PostProcessReferenceConfig(url *common.URL, event string, errMsg *string)
 
 	// PostProcessServiceConfig customizes ServiceConfig's params.
-	PostProcessServiceConfig(*common.URL)
+	// PostProcessServiceConfig emit on export service (event: before-service-listen, service-listen-success, service-listen-fail)
+	PostProcessServiceConfig(url *common.URL, event string, errMsg *string)
+
+	// AllReferencesConnectComplete emit on all references export complete
+	AllReferencesConnectComplete()
+
+	// AllServicesListenComplete emit on all services export complete
+	AllServicesListenComplete()
+
+	// BeforeShutdown emit on before shutdown
+	BeforeShutdown()
 }
diff --git a/config/reference_config.go b/config/reference_config.go
index d3730f0..eb35ee3 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -101,7 +101,7 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
 	if c.ForceTag {
 		cfgURL.AddParam(constant.ForceUseTag, "true")
 	}
-	c.postProcessConfig(cfgURL)
+	c.postProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, nil)
 	if c.URL != "" {
 		// 1. user specified URL, could be peer-to-peer address, or register center's address.
 		urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
@@ -302,8 +302,26 @@ func publishConsumerDefinition(url *common.URL) {
 }
 
 // postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig.
-func (c *ReferenceConfig) postProcessConfig(url *common.URL) {
+func (c *ReferenceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
 	for _, p := range extension.GetConfigPostProcessors() {
-		p.PostProcessReferenceConfig(url)
+		p.PostProcessReferenceConfig(url, event, errMsg)
+	}
+}
+
+func (c *ReferenceConfig) getValidURL() *common.URL {
+	urls := c.urls
+	var u *common.URL
+	if urls != nil && len(urls) > 0 {
+		u = urls[0]
+	}
+	if u != nil && u.SubURL != nil {
+		return u.SubURL
+	}
+	return u
+}
+
+func postAllConsumersConnectComplete() {
+	for _, p := range extension.GetConfigPostProcessors() {
+		p.AllReferencesConnectComplete()
 	}
 }
diff --git a/config/service_config.go b/config/service_config.go
index d73190c..dc4902e 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -203,7 +203,7 @@ func (c *ServiceConfig) Export() error {
 		}
 
 		// post process the URL to be exported
-		c.postProcessConfig(ivkURL)
+		c.postProcessConfig(ivkURL, constant.HookEventBeforeProviderConnect, nil)
 		// config post processor may set "export" to false
 		if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
 			return nil
@@ -355,8 +355,26 @@ func publishServiceDefinition(url *common.URL) {
 }
 
 // postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
-func (c *ServiceConfig) postProcessConfig(url *common.URL) {
+func (c *ServiceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
 	for _, p := range extension.GetConfigPostProcessors() {
-		p.PostProcessServiceConfig(url)
+		p.PostProcessServiceConfig(url, event, errMsg)
+	}
+}
+
+func (c *ServiceConfig) getValidURL() *common.URL {
+	urls := c.GetExportedUrls()
+	var u *common.URL
+	if urls != nil && len(urls) > 0 {
+		u = urls[0]
+	}
+	if u != nil && u.SubURL != nil {
+		return u.SubURL
+	}
+	return u
+}
+
+func postAllProvidersConnectComplete() {
+	for _, p := range extension.GetConfigPostProcessors() {
+		p.AllServicesListenComplete()
 	}
 }