You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/09/26 05:07:40 UTC

[dubbo-go] branch 1.5 updated: Ftr: Modify event dispatch interface and optimit dispatch functions (#1453)

This is an automated email from the ASF dual-hosted git repository.

alexstocks pushed a commit to branch 1.5
in repository https://gitbox.apache.org/repos/asf/dubbo-go.git


The following commit(s) were added to refs/heads/1.5 by this push:
     new 16e3744  Ftr: Modify event dispatch interface and optimit dispatch functions (#1453)
16e3744 is described below

commit 16e3744bcd36b1291291f8e97ae8c7bd0ec7d9fb
Author: ChangedenChan <ch...@gmail.com>
AuthorDate: Sun Sep 26 13:07:35 2021 +0800

    Ftr: Modify event dispatch interface and optimit dispatch functions (#1453)
    
    * Modify event dispatch interface and optimit dispatch functions
    
    * 增加注释
    
    * 优化代码
    
    * 优化导包
    
    * 优化代码逻辑
    
    * 优化代码逻辑
    
    * 优化代码逻辑
    
    * 变更文件名
    
    * 完善单元测试
    
    * 优化函数名
    
    * Add read/write lock
    
    * 调整读写锁逻辑
    
    * 增加线程安全
    
    * 优化变量初始化方式
    
    Co-authored-by: Changeden <ch...@unizone.tech>
---
 common/constant/key.go                     |  12 +-
 common/extension/config_load_processor.go  | 183 +++++++++++++++++++++++++++++
 config/config_loader.go                    |  14 ++-
 config/config_loader_test.go               |  48 +++++---
 config/graceful_shutdown.go                |  10 +-
 config/interfaces/config_load_processor.go |  56 +++++++++
 config/interfaces/config_post_processor.go |  15 +--
 config/reference_config.go                 |  18 +--
 config/service_config.go                   |  18 +--
 9 files changed, 310 insertions(+), 64 deletions(-)

diff --git a/common/constant/key.go b/common/constant/key.go
index b062b07..40499ab 100644
--- a/common/constant/key.go
+++ b/common/constant/key.go
@@ -357,7 +357,13 @@ const (
 	HookEventReferenceConnectSuccess = "reference-connect-success"
 	HookEventReferenceConnectFail    = "reference-connect-fail"
 
-	HookEventBeforeProviderConnect  = "before-service-listen"
-	HookEventProviderConnectSuccess = "service-listen-success"
-	HookEventProviderConnectFail    = "service-listen-fail"
+	HookEventBeforeServiceListen  = "before-service-listen"
+	HookEventServiceListenSuccess = "service-listen-success"
+	HookEventServiceListenFail    = "service-listen-fail"
+
+	LoadProcessReferenceConfigFunctionName        = "LoadProcessReferenceConfig"
+	LoadProcessServiceConfigFunctionName          = "LoadProcessServiceConfig"
+	AfterAllReferencesConnectCompleteFunctionName = "AfterAllReferencesConnectComplete"
+	AfterAllServicesListenCompleteFunctionName    = "AfterAllServicesListenComplete"
+	BeforeShutdownFunctionName                    = "BeforeShutdown"
 )
diff --git a/common/extension/config_load_processor.go b/common/extension/config_load_processor.go
new file mode 100644
index 0000000..fc7bf4b
--- /dev/null
+++ b/common/extension/config_load_processor.go
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package extension
+
+import (
+	"reflect"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/config/interfaces"
+)
+
+var (
+	configLoadProcessorHolder = &interfaces.ConfigLoadProcessorHolder{}
+
+	loadProcessors      = make(map[string]interfaces.ConfigLoadProcessor)
+	loadProcessorValues = make(map[string]reflect.Value)
+
+	referenceURL = make(map[string][]*common.URL)
+	serviceURL   = make(map[string][]*common.URL)
+)
+
+// SetConfigLoadProcessor registers a ConfigLoadProcessor with the given name.
+func SetConfigLoadProcessor(name string, processor interfaces.ConfigLoadProcessor) {
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	loadProcessors[name] = processor
+	loadProcessorValues[name] = reflect.ValueOf(processor)
+}
+
+// GetConfigLoadProcessor finds a ConfigLoadProcessor by name.
+func GetConfigLoadProcessor(name string) interfaces.ConfigLoadProcessor {
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	return loadProcessors[name]
+}
+
+// RemoveConfigLoadProcessor remove process from processors.
+func RemoveConfigLoadProcessor(name string) {
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	delete(loadProcessors, name)
+	delete(loadProcessorValues, name)
+}
+
+// GetConfigLoadProcessors returns all registered instances of ConfigLoadProcessor.
+func GetConfigLoadProcessors() []interfaces.ConfigLoadProcessor {
+	ret := make([]interfaces.ConfigLoadProcessor, 0, len(loadProcessors))
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	for _, v := range loadProcessors {
+		ret = append(ret, v)
+	}
+	return ret
+}
+
+// GetReferenceURL returns the URL of all clones of references
+func GetReferenceURL() map[string][]*common.URL {
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	urlMap := make(map[string][]*common.URL)
+	for event, urls := range referenceURL {
+		var list []*common.URL
+		for _, url := range urls {
+			list = append(list, url)
+		}
+		urlMap[event] = list
+	}
+	return urlMap
+}
+
+// GetServiceURL returns the URL of all clones of services
+func GetServiceURL() map[string][]*common.URL {
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	urlMap := make(map[string][]*common.URL)
+	for event, urls := range serviceURL {
+		var list []*common.URL
+		for _, url := range urls {
+			list = append(list, url)
+		}
+		urlMap[event] = list
+	}
+	return urlMap
+}
+
+// ResetURL remove all URL
+func ResetURL() {
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	for k := range referenceURL {
+		referenceURL[k] = nil
+		delete(referenceURL, k)
+	}
+	for k := range serviceURL {
+		serviceURL[k] = nil
+		delete(serviceURL, k)
+	}
+}
+
+// emit
+func emit(funcName string, val ...interface{}) {
+	var values []reflect.Value
+	for _, arg := range val {
+		values = append(values, reflect.ValueOf(arg))
+	}
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	for _, p := range loadProcessorValues {
+		p.MethodByName(funcName).Call(values)
+	}
+}
+
+// LoadProcessReferenceConfig emit reference config load event
+func LoadProcessReferenceConfig(url *common.URL, event string, errMsg *string) {
+	emitReferenceOrService(&referenceURL, constant.HookEventBeforeReferenceConnect, // return raw URL when event is before
+		constant.LoadProcessReferenceConfigFunctionName, url, event, errMsg)
+}
+
+// LoadProcessServiceConfig emit service config load event
+func LoadProcessServiceConfig(url *common.URL, event string, errMsg *string) {
+	emitReferenceOrService(&serviceURL, constant.HookEventBeforeServiceListen, // return raw URL when event is before
+		constant.LoadProcessServiceConfigFunctionName, url, event, errMsg)
+}
+
+func emitReferenceOrService(urlMap *map[string][]*common.URL, ignoreCloneEvent string,
+	funcName string, url *common.URL, event string, errMsg *string) {
+	configLoadProcessorHolder.Lock()
+	if !(event == ignoreCloneEvent || url == nil) {
+		url = url.Clone()
+	}
+	configLoadProcessorHolder.Unlock()
+	emit(funcName, url, event, errMsg)
+	configLoadProcessorHolder.Lock()
+	defer configLoadProcessorHolder.Unlock()
+	(*urlMap)[event] = append((*urlMap)[event], url)
+}
+
+// AllReferencesConnectComplete emit all references config load complete event
+func AllReferencesConnectComplete() {
+	referenceURL := GetReferenceURL()
+	configLoadProcessorHolder.Lock()
+	binder := interfaces.ConfigLoadProcessorURLBinder{
+		Success: referenceURL[constant.HookEventReferenceConnectSuccess],
+		Fail:    referenceURL[constant.HookEventReferenceConnectFail],
+	}
+	configLoadProcessorHolder.Unlock()
+	emit(constant.AfterAllReferencesConnectCompleteFunctionName, binder)
+}
+
+// AllServicesListenComplete emit all services config load complete event
+func AllServicesListenComplete() {
+	serviceURL := GetServiceURL()
+	configLoadProcessorHolder.Lock()
+	binder := interfaces.ConfigLoadProcessorURLBinder{
+		Success: serviceURL[constant.HookEventServiceListenSuccess],
+		Fail:    serviceURL[constant.HookEventServiceListenFail],
+	}
+	configLoadProcessorHolder.Unlock()
+	emit(constant.AfterAllServicesListenCompleteFunctionName, binder)
+}
+
+// BeforeShutdown emit before os.Exit(0)
+func BeforeShutdown() {
+	emit(constant.BeforeShutdownFunctionName)
+}
diff --git a/config/config_loader.go b/config/config_loader.go
index 7ec64aa..d9288f9 100644
--- a/config/config_loader.go
+++ b/config/config_loader.go
@@ -180,7 +180,7 @@ func loadConsumerConfig() {
 					if count > maxWait {
 						errMsg := fmt.Sprintf("Failed to check the status of the service %v. No provider available for the service to the consumer use dubbo version %v", refconfig.InterfaceName, constant.Version)
 						logger.Error(errMsg)
-						refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectFail, &errMsg)
+						refconfig.loadProcessConfig(referenceURL, constant.HookEventReferenceConnectFail, &errMsg)
 						panic(errMsg)
 					}
 					time.Sleep(time.Second * 1)
@@ -191,13 +191,13 @@ func loadConsumerConfig() {
 					continue
 				}
 			}
-			refconfig.postProcessConfig(referenceURL, constant.HookEventReferenceConnectSuccess, nil)
+			refconfig.loadProcessConfig(referenceURL, constant.HookEventReferenceConnectSuccess, nil)
 		}
 		if checkok {
 			break
 		}
 	}
-	postAllConsumersConnectComplete()
+	extension.AllReferencesConnectComplete()
 }
 
 func loadProviderConfig() {
@@ -256,13 +256,15 @@ func loadProviderConfig() {
 		serviceURL := svs.getValidURL()
 		if err != nil {
 			errMsg := fmt.Sprintf("service %s export failed! err: %#v", key, err)
-			svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectFail, &errMsg)
+			svs.loadProcessConfig(serviceURL, constant.HookEventServiceListenFail, &errMsg)
 			panic(errMsg)
 		}
-		svs.postProcessConfig(serviceURL, constant.HookEventProviderConnectSuccess, nil)
+		if serviceURL != nil {
+			svs.loadProcessConfig(serviceURL, constant.HookEventServiceListenSuccess, nil)
+		}
 	}
 	registerServiceInstance()
-	postAllProvidersConnectComplete()
+	extension.AllServicesListenComplete()
 }
 
 // registerServiceInstance register service instance
diff --git a/config/config_loader_test.go b/config/config_loader_test.go
index be51665..05e9499 100644
--- a/config/config_loader_test.go
+++ b/config/config_loader_test.go
@@ -40,6 +40,7 @@ import (
 	"github.com/apache/dubbo-go/common/extension"
 	"github.com/apache/dubbo-go/common/logger"
 	"github.com/apache/dubbo-go/common/proxy/proxy_factory"
+	"github.com/apache/dubbo-go/config/interfaces"
 	"github.com/apache/dubbo-go/config_center"
 	"github.com/apache/dubbo-go/metadata/service"
 	"github.com/apache/dubbo-go/registry"
@@ -115,32 +116,42 @@ type CustomEvent struct {
 	t *testing.T
 }
 
-// implements interfaces.ConfigPostProcessor's functions
-func (c CustomEvent) PostProcessReferenceConfig(u *common.URL, event string, errMsg *string) {
-	logger.Debug("PostProcessReferenceConfig Start")
+// implements interfaces.ConfigLoadProcessor's functions
+func (c CustomEvent) LoadProcessReferenceConfig(u *common.URL, event string, errMsg *string) {
+	logger.Debug("LoadProcessReferenceConfig Start")
 	logger.Debug("Event: ", event)
 	logger.Debug("Url: ", u)
 	if errMsg != nil {
 		logger.Debug("Error Message: ", *errMsg)
 	}
 	logger.Debug("PostProcessReferenceConfig End")
-	assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "consumer")
+	assert.Equal(c.t, "consumer", u.GetParam(constant.SIDE_KEY, ""))
 }
-func (c CustomEvent) PostProcessServiceConfig(u *common.URL, event string, errMsg *string) {
-	logger.Debug("PostProcessServiceConfig Start")
+func (c CustomEvent) LoadProcessServiceConfig(u *common.URL, event string, errMsg *string) {
+	logger.Debug("LoadProcessServiceConfig Start")
 	logger.Debug("Event: ", event)
 	logger.Debug("Url: ", u)
 	if errMsg != nil {
 		logger.Debug("Error Message: ", *errMsg)
 	}
 	logger.Debug("PostProcessServiceConfig End")
-	assert.Equal(c.t, u.GetParam(constant.SIDE_KEY, ""), "provider")
-}
-func (c CustomEvent) AllReferencesConnectComplete() {
-	logger.Debug("AllConsumersConnectComplete")
-}
-func (c CustomEvent) AllServicesListenComplete() {
-	logger.Debug("AllServicesListenComplete")
+	assert.Equal(c.t, "provider", u.GetParam(constant.SIDE_KEY, ""))
+}
+func (c CustomEvent) AfterAllReferencesConnectComplete(urls interfaces.ConfigLoadProcessorURLBinder) {
+	logger.Debug("AfterAllReferencesConnectComplete")
+	logger.Debug("Success Url: ", urls.Success)
+	logger.Debug("Fail Url: ", urls.Fail)
+	assert.NotNil(c.t, urls)
+	assert.NotNil(c.t, urls.Success)
+	assert.Nil(c.t, urls.Fail)
+}
+func (c CustomEvent) AfterAllServicesListenComplete(urls interfaces.ConfigLoadProcessorURLBinder) {
+	logger.Debug("AfterAllServicesListenComplete")
+	logger.Debug("Success Url: ", urls.Success)
+	logger.Debug("Fail Url: ", urls.Fail)
+	assert.NotNil(c.t, urls)
+	assert.NotNil(c.t, urls.Success)
+	assert.Nil(c.t, urls.Fail)
 }
 func (c CustomEvent) BeforeShutdown() {
 	logger.Debug("BeforeShutdown")
@@ -172,8 +183,8 @@ func TestLoadWithEventDispatch(t *testing.T) {
 		return mm, nil
 	})
 
-	configPostProcessorName := "TestLoadWithEventDispatch"
-	extension.SetConfigPostProcessor(configPostProcessorName, CustomEvent{t})
+	configLoadProcessorName := "TestLoadWithEventDispatch"
+	extension.SetConfigLoadProcessor(configLoadProcessorName, CustomEvent{t})
 
 	Load()
 
@@ -189,7 +200,12 @@ func TestLoadWithEventDispatch(t *testing.T) {
 	err := common.ServiceMap.UnRegister("com.MockService", "mock",
 		common.ServiceKey("com.MockService", "huadong_idc", "1.0.0"))
 	assert.Nil(t, err)
-	extension.RemoveConfigPostProcessor(configPostProcessorName)
+	extension.RemoveConfigLoadProcessor(configLoadProcessorName)
+	assert.Less(t, 0, len(extension.GetReferenceURL()))
+	assert.Less(t, 0, len(extension.GetServiceURL()))
+	extension.ResetURL()
+	assert.Equal(t, 0, len(extension.GetReferenceURL()))
+	assert.Equal(t, 0, len(extension.GetServiceURL()))
 	consumerConfig = nil
 	providerConfig = nil
 }
diff --git a/config/graceful_shutdown.go b/config/graceful_shutdown.go
index f8f29c2..c2c166d 100644
--- a/config/graceful_shutdown.go
+++ b/config/graceful_shutdown.go
@@ -67,7 +67,7 @@ func GracefulShutdownInit() {
 			// gracefulShutdownOnce.Do(func() {
 			time.AfterFunc(totalTimeout(), func() {
 				logger.Warn("Shutdown gracefully timeout, application will shutdown immediately. ")
-				postBeforeShutdown()
+				extension.BeforeShutdown()
 				os.Exit(0)
 			})
 			BeforeShutdown()
@@ -77,7 +77,7 @@ func GracefulShutdownInit() {
 					debug.WriteHeapDump(os.Stdout.Fd())
 				}
 			}
-			postBeforeShutdown()
+			extension.BeforeShutdown()
 			os.Exit(0)
 		}
 	}()
@@ -225,9 +225,3 @@ func getConsumerProtocols() *gxset.HashSet {
 	}
 	return result
 }
-
-func postBeforeShutdown() {
-	for _, p := range extension.GetConfigPostProcessors() {
-		p.BeforeShutdown()
-	}
-}
diff --git a/config/interfaces/config_load_processor.go b/config/interfaces/config_load_processor.go
new file mode 100644
index 0000000..19c54b0
--- /dev/null
+++ b/config/interfaces/config_load_processor.go
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package interfaces
+
+import (
+	"sync"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+)
+
+// ConfigLoadProcessor is an extension to give users a chance to customize configs against ReferenceConfig and
+// ServiceConfig during deployment time.
+type ConfigLoadProcessor interface {
+	// LoadProcessReferenceConfig customizes ReferenceConfig's params.
+	// LoadProcessReferenceConfig emits on refer reference (event: before-reference-connect, reference-connect-success, reference-connect-fail)
+	LoadProcessReferenceConfig(url *common.URL, event string, errMsg *string)
+
+	// LoadProcessServiceConfig customizes ServiceConfig's params.
+	// LoadProcessServiceConfig emits on export service (event: before-service-listen, service-listen-success, service-listen-fail)
+	LoadProcessServiceConfig(url *common.URL, event string, errMsg *string)
+
+	// AfterAllReferencesConnectComplete emits on all references export complete
+	AfterAllReferencesConnectComplete(urls ConfigLoadProcessorURLBinder)
+
+	// AfterAllServicesListenComplete emits on all services export complete
+	AfterAllServicesListenComplete(urls ConfigLoadProcessorURLBinder)
+
+	// BeforeShutdown emits on before shutdown
+	BeforeShutdown()
+}
+
+type ConfigLoadProcessorHolder struct {
+	sync.Mutex
+}
+
+type ConfigLoadProcessorURLBinder struct {
+	Success []*common.URL
+	Fail    []*common.URL
+}
diff --git a/config/interfaces/config_post_processor.go b/config/interfaces/config_post_processor.go
index 4578c59..53dd717 100644
--- a/config/interfaces/config_post_processor.go
+++ b/config/interfaces/config_post_processor.go
@@ -25,19 +25,8 @@ import (
 // ServiceConfig during deployment time.
 type ConfigPostProcessor interface {
 	// PostProcessReferenceConfig customizes ReferenceConfig's params.
-	// PostProcessReferenceConfig emit on refer reference (event: before-reference-connect, reference-connect-success, reference-connect-fail)
-	PostProcessReferenceConfig(url *common.URL, event string, errMsg *string)
+	PostProcessReferenceConfig(*common.URL)
 
 	// PostProcessServiceConfig customizes ServiceConfig's params.
-	// PostProcessServiceConfig emit on export service (event: before-service-listen, service-listen-success, service-listen-fail)
-	PostProcessServiceConfig(url *common.URL, event string, errMsg *string)
-
-	// AllReferencesConnectComplete emit on all references export complete
-	AllReferencesConnectComplete()
-
-	// AllServicesListenComplete emit on all services export complete
-	AllServicesListenComplete()
-
-	// BeforeShutdown emit on before shutdown
-	BeforeShutdown()
+	PostProcessServiceConfig(*common.URL)
 }
diff --git a/config/reference_config.go b/config/reference_config.go
index eb35ee3..97ac688 100644
--- a/config/reference_config.go
+++ b/config/reference_config.go
@@ -101,7 +101,8 @@ func (c *ReferenceConfig) Refer(_ interface{}) {
 	if c.ForceTag {
 		cfgURL.AddParam(constant.ForceUseTag, "true")
 	}
-	c.postProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, nil)
+	c.loadProcessConfig(cfgURL, constant.HookEventBeforeReferenceConnect, nil)
+	c.postProcessConfig(cfgURL)
 	if c.URL != "" {
 		// 1. user specified URL, could be peer-to-peer address, or register center's address.
 		urlStrings := gxstrings.RegSplit(c.URL, "\\s*[;]+\\s*")
@@ -302,12 +303,17 @@ func publishConsumerDefinition(url *common.URL) {
 }
 
 // postProcessConfig asks registered ConfigPostProcessor to post-process the current ReferenceConfig.
-func (c *ReferenceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
+func (c *ReferenceConfig) postProcessConfig(url *common.URL) {
 	for _, p := range extension.GetConfigPostProcessors() {
-		p.PostProcessReferenceConfig(url, event, errMsg)
+		p.PostProcessReferenceConfig(url)
 	}
 }
 
+// loadProcessConfig asks registered ConfigLoadProcessor to post-process the current ReferenceConfig.
+func (c *ReferenceConfig) loadProcessConfig(url *common.URL, event string, errMsg *string) {
+	extension.LoadProcessReferenceConfig(url, event, errMsg)
+}
+
 func (c *ReferenceConfig) getValidURL() *common.URL {
 	urls := c.urls
 	var u *common.URL
@@ -319,9 +325,3 @@ func (c *ReferenceConfig) getValidURL() *common.URL {
 	}
 	return u
 }
-
-func postAllConsumersConnectComplete() {
-	for _, p := range extension.GetConfigPostProcessors() {
-		p.AllReferencesConnectComplete()
-	}
-}
diff --git a/config/service_config.go b/config/service_config.go
index dc4902e..4452fef 100644
--- a/config/service_config.go
+++ b/config/service_config.go
@@ -203,7 +203,8 @@ func (c *ServiceConfig) Export() error {
 		}
 
 		// post process the URL to be exported
-		c.postProcessConfig(ivkURL, constant.HookEventBeforeProviderConnect, nil)
+		c.loadProcessConfig(ivkURL, constant.HookEventBeforeServiceListen, nil)
+		c.postProcessConfig(ivkURL)
 		// config post processor may set "export" to false
 		if !ivkURL.GetParamBool(constant.EXPORT_KEY, true) {
 			return nil
@@ -355,12 +356,17 @@ func publishServiceDefinition(url *common.URL) {
 }
 
 // postProcessConfig asks registered ConfigPostProcessor to post-process the current ServiceConfig.
-func (c *ServiceConfig) postProcessConfig(url *common.URL, event string, errMsg *string) {
+func (c *ServiceConfig) postProcessConfig(url *common.URL) {
 	for _, p := range extension.GetConfigPostProcessors() {
-		p.PostProcessServiceConfig(url, event, errMsg)
+		p.PostProcessServiceConfig(url)
 	}
 }
 
+// loadProcessConfig asks registered ConfigLoadProcessor to post-process the current ServiceConfig.
+func (c *ServiceConfig) loadProcessConfig(url *common.URL, event string, errMsg *string) {
+	extension.LoadProcessServiceConfig(url, event, errMsg)
+}
+
 func (c *ServiceConfig) getValidURL() *common.URL {
 	urls := c.GetExportedUrls()
 	var u *common.URL
@@ -372,9 +378,3 @@ func (c *ServiceConfig) getValidURL() *common.URL {
 	}
 	return u
 }
-
-func postAllProvidersConnectComplete() {
-	for _, p := range extension.GetConfigPostProcessors() {
-		p.AllServicesListenComplete()
-	}
-}