You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@dubbo.apache.org by GitBox <gi...@apache.org> on 2020/07/11 16:14:55 UTC

[GitHub] [dubbo-go] zouyx commented on a change in pull request #604: Ftr: Application-level Registry Model

zouyx commented on a change in pull request #604:
URL: https://github.com/apache/dubbo-go/pull/604#discussion_r453206146



##########
File path: common/observer/listenable.go
##########
@@ -0,0 +1,142 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package observer
+
+import (
+	"reflect"
+	"sort"
+	"sync"
+)
+
+// Listenable could add and remove the event listener
+type Listenable interface {
+	AddEventListener(listener EventListener)
+	AddEventListeners(listenersSlice []EventListener)
+	RemoveEventListener(listener EventListener)
+	RemoveEventListeners(listenersSlice []EventListener)
+	GetAllEventListeners() []EventListener
+	RemoveAllEventListeners()
+}
+
+// BaseListener base listenable
+type BaseListener struct {
+	Listenable
+	ListenersCache map[reflect.Type][]EventListener
+	Mutex          sync.RWMutex

Review comment:
       why this lock is public ?

##########
File path: metadata/mapping/dynamic/service_name_mapping.go
##########
@@ -28,18 +29,26 @@ import (
 )
 
 import (
+	commonCfg "github.com/apache/dubbo-go/common/config"
 	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
 	"github.com/apache/dubbo-go/config"
 	"github.com/apache/dubbo-go/config_center"
-	"github.com/apache/dubbo-go/metadata"
+	"github.com/apache/dubbo-go/metadata/mapping"
 )
 
 const (
-	defaultGroup = config_center.DEFAULT_GROUP
+	defaultGroup = "mapping"

Review comment:
       Once you change this variable , will it cause compatibility issues?

##########
File path: registry/servicediscovery/synthesizer/subscribed_urls_synthesizer_factory.go
##########
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package synthesizer
+
+var (
+	synthesizers []SubscribedURLsSynthesizer
+)
+
+// nolint
+func AddSynthesizer(synthesizer SubscribedURLsSynthesizer) {
+	synthesizers = append(synthesizers, synthesizer)

Review comment:
       should you add lock here?

##########
File path: remoting/etcdv3/client.go
##########
@@ -130,6 +131,26 @@ func ValidateClient(container clientFacade, opts ...Option) error {
 	return nil
 }
 
+//  NewServiceDiscoveryClient

Review comment:
       ```suggestion
   //  nolint
   ```

##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+	"bytes"
+	"encoding/json"
+	"strconv"
+	"strings"
+	"sync"
+)
+
+import (
+	cm "github.com/Workiva/go-datastructures/common"
+	gxset "github.com/dubbogo/gost/container/set"
+	gxnet "github.com/dubbogo/gost/net"
+	perrors "github.com/pkg/errors"
+	"go.uber.org/atomic"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/common/observer"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/metadata/mapping"
+	"github.com/apache/dubbo-go/metadata/service"
+	"github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/registry/event"
+	"github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+const (
+	protocolName = "service-discovery"
+)
+
+func init() {
+	extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+	lock                             sync.RWMutex
+	url                              *common.URL
+	serviceDiscovery                 registry.ServiceDiscovery
+	subscribedServices               *gxset.HashSet
+	serviceNameMapping               mapping.ServiceNameMapping
+	metaDataService                  service.MetadataService
+	registeredListeners              *gxset.HashSet
+	subscribedURLsSynthesizers       []synthesizer.SubscribedURLsSynthesizer
+	serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+	tryInitMetadataService()
+
+	serviceDiscovery, err := creatServiceDiscovery(url)
+	if err != nil {
+		return nil, err
+	}
+	subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+	subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+	serviceNameMapping := extension.GetGlobalServiceNameMapping()
+	metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "could not init metadata service")
+	}
+	return &serviceDiscoveryRegistry{
+		url:                              url,
+		serviceDiscovery:                 serviceDiscovery,
+		subscribedServices:               subscribedServices,
+		subscribedURLsSynthesizers:       subscribedURLsSynthesizers,
+		registeredListeners:              gxset.NewSet(),
+		serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL, 8),
+		serviceNameMapping:               serviceNameMapping,
+		metaDataService:                  metaDataService,
+	}, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+	sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+	if !ok {
+		return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+	}
+	originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "Create service discovery fialed")
+	}
+	return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+	set := gxset.NewSet()
+	if len(literalServices) == 0 {
+		return set
+	}
+	var splitServices = strings.Split(literalServices, ",")
+	for _, s := range splitServices {
+		if len(s) != 0 {
+			set.Add(s)
+		}
+	}
+	return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+	return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+	return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+	// TODO(whether available depends on metadata service and service discovery)
+	return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+	err := s.serviceDiscovery.Destroy()
+	if err != nil {
+		logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+	}
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	ok, err := s.metaDataService.ExportURL(url)
+
+	if err != nil {
+		logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+		return err
+	}
+	if !ok {
+		logger.Warnf("The URL[%s] has been registry!", url.String())
+	}
+
+	// we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+	// But we don't want to design a similar bootstrap class.
+	ins, err := createInstance(url)
+	if err != nil {
+		return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+	}
+
+	err = s.serviceDiscovery.Register(ins)
+	if err != nil {
+		return perrors.WithMessage(err, "register the service failed")
+	}
+
+	err = s.metaDataService.PublishServiceDefinition(url)
+	if err != nil {
+		return perrors.WithMessage(err, "publish the service definition failed. ")
+	}
+	return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+		url.GetParam(constant.GROUP_KEY, ""),
+		url.GetParam(constant.Version, ""),
+		url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+	appConfig := config.GetApplicationConfig()
+	port, err := strconv.ParseInt(url.Port, 10, 32)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+	}
+
+	host := url.Ip
+	if len(host) == 0 {
+		host, err = gxnet.GetLocalIP()
+		if err != nil {
+			return nil, perrors.WithMessage(err, "could not get the local Ip")
+		}
+	}
+
+	// usually we will add more metadata
+	metadata := make(map[string]string, 8)
+	metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+	return &registry.DefaultServiceInstance{
+		ServiceName: appConfig.Name,
+		Host:        host,
+		Port:        int(port),
+		Id:          host + constant.KEY_SEPARATOR + url.Port,
+		Enable:      true,
+		Healthy:     true,
+		Metadata:    metadata,
+	}, nil
+}
+
+func shouldRegister(url common.URL) bool {
+	side := url.GetParam(constant.SIDE_KEY, "")
+	if side == constant.PROVIDER_PROTOCOL {
+		return true
+	}
+	logger.Debugf("The URL should not be register.", url.String())
+	return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	_, err := s.metaDataService.SubscribeURL(*url)
+	if err != nil {
+		return perrors.WithMessage(err, "subscribe url error: "+url.String())
+	}
+	services := s.getServices(*url)
+	if services.Empty() {
+		return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+			"subscription url:%s", url.String())
+	}
+	for _, srv := range services.Values() {
+		serviceName := srv.(string)
+		serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+		s.subscribe(url, notify, serviceName, serviceInstances)
+		listener := &registry.ServiceInstancesChangedListener{
+			ServiceName: serviceName,
+			ChangedNotify: &InstanceChangeNotify{
+				notify:                   notify,
+				serviceDiscoveryRegistry: s,
+			},
+		}
+		s.registerServiceInstancesChangedListener(*url, listener)
+	}
+	return nil
+}
+
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+	listenerId := listener.ServiceName + ":" + getUrlKey(url)
+	if !s.subscribedServices.Contains(listenerId) {
+		err := s.serviceDiscovery.AddListener(listener)
+		if err != nil {
+			logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+		}
+	}
+
+}
+
+func getUrlKey(url common.URL) string {
+	var bf bytes.Buffer
+	if len(url.Protocol) != 0 {
+		bf.WriteString(url.Protocol)
+		bf.WriteString("://")
+	}
+	if len(url.Location) != 0 {
+		bf.WriteString(url.Location)
+		bf.WriteString(":")
+		bf.WriteString(url.Port)
+	}
+	if len(url.Path) != 0 {
+		bf.WriteString("/")
+		bf.WriteString(url.Path)
+	}
+	bf.WriteString("?")
+	appendParam(bf, constant.VERSION_KEY, url)
+	appendParam(bf, constant.GROUP_KEY, url)
+	appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+	return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+	buffer.WriteString(paramKey)
+	buffer.WriteString("=")
+	buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+	serviceName string, serviceInstances []registry.ServiceInstance) {
+	if len(serviceInstances) == 0 {
+		logger.Warnf("here is no instance in service[name : %s]", serviceName)
+		return
+	}
+	var subscribedURLs []common.URL
+	subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+	if len(subscribedURLs) == 0 {
+		subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+	}
+	for _, url := range subscribedURLs {
+		notify.Notify(&registry.ServiceEvent{
+			Action:  remoting.EventTypeAdd,
+			Service: url,
+		})
+	}
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+	var urls []common.URL
+	for _, syn := range s.subscribedURLsSynthesizers {
+		if syn.Support(subscribedURL) {
+			urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+		}
+	}
+	return urls
+}
+
+func shouldSubscribe(url common.URL) bool {
+	return !shouldRegister(url)
+}
+
+func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet {
+	services := gxset.NewSet()
+	serviceNames := url.GetParam(constant.PROVIDER_BY, "")
+	if len(serviceNames) != 0 {
+		services = parseServices(serviceNames)
+	}
+	if services.Empty() {
+		services = s.findMappedServices(url)
+		if services.Empty() {
+			return s.subscribedServices
+		}
+	}
+	return services
+}
+
+func (s *serviceDiscoveryRegistry) findMappedServices(url common.URL) *gxset.HashSet {
+	serviceInterface := url.GetParam(constant.INTERFACE_KEY, url.Path)
+	group := url.GetParam(constant.GROUP_KEY, "")
+	version := url.GetParam(constant.VERSION_KEY, "")
+	protocol := url.Protocol
+	serviceNames, err := s.serviceNameMapping.Get(serviceInterface, group, version, protocol)
+	if err != nil {
+		logger.Errorf("get serviceInterface:[%s] group:[%s] version:[%s] protocol:[%s] from "+
+			"serviceNameMap error:%s", err.Error())
+		return gxset.NewSet()
+	}
+	return serviceNames
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrls(subscribedURL common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+	var filterInstances []registry.ServiceInstance
+	for _, s := range serviceInstances {
+		if !s.IsEnable() || !s.IsHealthy() {
+			continue
+		}
+		metaData := s.GetMetadata()
+		_, ok1 := metaData[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME]
+		_, ok2 := metaData[constant.METADATA_SERVICE_URLS_PROPERTY_NAME]
+		if !ok1 && !ok2 {
+			continue
+		}
+		filterInstances = append(filterInstances, s)
+	}
+	if len(filterInstances) == 0 {
+		return []common.URL{}
+	}
+	s.prepareServiceRevisionExportedURLs(filterInstances)
+	subscribedURLs := s.cloneExportedURLs(subscribedURL, filterInstances)
+	return subscribedURLs
+}
+
+// comparator is defined as Comparator for skip list to compare the URL
+type comparator common.URL
+
+// Compare is defined as Comparator for skip list to compare the URL
+func (c comparator) Compare(comp cm.Comparator) int {
+	a := common.URL(c).String()
+	b := common.URL(comp.(comparator)).String()
+	switch {
+	case a > b:
+		return 1
+	case a < b:
+		return -1
+	default:
+		return 0
+	}
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+	var urls []common.URL
+	metadataStorageType := getExportedStoreType(serviceInstance)
+	proxyFactory := extension.GetMetadataServiceProxyFactory(metadataStorageType)
+	if proxyFactory == nil {
+		return urls
+	}
+	metadataService := proxyFactory.GetProxy(serviceInstance)
+	if metadataService == nil {
+		return urls
+	}
+	result, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
+	if err != nil {
+		logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance)
+		return urls
+	}
+
+	ret := make([]common.URL, 0, len(result))
+	for _, ui := range result {
+
+		u, err := common.NewURL(ui.(string))
+
+		if err != nil {
+			logger.Errorf("could not parse the url string to URL structure: %s", ui.(string), err)
+			continue
+		}
+		ret = append(ret, u)
+	}
+	return ret
+}
+
+func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	s.lock.Lock()
+	// 1. expunge stale
+	s.expungeStaleRevisionExportedURLs(serviceInstances)
+	// 2. Initialize
+	s.initRevisionExportedURLs(serviceInstances)
+	s.lock.Unlock()
+}
+
+func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	serviceName := serviceInstances[0].GetServiceName()
+	revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
+	if !exist {
+		return
+	}
+	existRevision := gxset.NewSet()
+	for k := range revisionExportedURLsMap {
+		existRevision.Add(k)
+	}
+	currentRevision := gxset.NewSet()
+	for _, s := range serviceInstances {
+		rv := getExportedServicesRevision(s)
+		if len(rv) != 0 {
+			currentRevision.Add(rv)
+		}
+	}
+	// staleRevisions = existedRevisions(copy) - currentRevisions
+	staleRevision := gxset.NewSet(existRevision.Values()...)
+	staleRevision.Remove(currentRevision.Values()...)
+	// remove exported URLs if staled
+	for _, s := range staleRevision.Values() {
+		delete(revisionExportedURLsMap, s.(string))
+	}
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	// initialize the revision exported URLs that the selected service instance exported
+	s.initSelectedRevisionExportedURLs(serviceInstances)
+	// initialize the revision exported URLs that other service instances exported
+	for _, serviceInstance := range serviceInstances {
+		s.initRevisionExportedURLsByInst(serviceInstance)
+	}
+}
+
+func (s *serviceDiscoveryRegistry) initSelectedRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	for range serviceInstances {
+		selectServiceInstance := s.selectServiceInstance(serviceInstances)
+		revisionExportedURLs := s.initRevisionExportedURLsByInst(selectServiceInstance)
+		if len(revisionExportedURLs) != 0 {
+			// If the result is valid,break
+			break
+		}
+	}
+}
+
+func (s *serviceDiscoveryRegistry) selectServiceInstance(serviceInstances []registry.ServiceInstance) registry.ServiceInstance {
+	size := len(serviceInstances)
+	if size == 0 {
+		return nil
+	}
+	if size == 1 {
+		return serviceInstances[0]
+	}
+	selectorName := s.url.GetParam(constant.SERVICE_INSTANCE_SELECTOR, "random")
+	selector, err := extension.GetServiceInstanceSelector(selectorName)
+	if err != nil {
+		logger.Errorf("get service instance selector cathe error:%s", err.Error())
+		return nil
+	}
+	return selector.Select(*s.url, serviceInstances)
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+	if serviceInstance == nil {
+		return []common.URL{}
+	}
+	serviceName := serviceInstance.GetServiceName()
+	revision := getExportedServicesRevision(serviceInstance)
+	revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName]
+	if revisionExportedURLsMap == nil {
+		revisionExportedURLsMap = make(map[string][]common.URL, 4)
+		s.serviceRevisionExportedURLsCache[serviceName] = revisionExportedURLsMap
+	}
+	revisionExportedURLs := revisionExportedURLsMap[revision]
+	firstGet := false
+	if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 {
+		if len(revisionExportedURLsMap) > 0 {
+			// The case is that current ServiceInstance with the different revision
+			logger.Warnf("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s"+
+				", please make sure the service [name : %s] is changing or not.", serviceInstance.GetId(),
+				serviceInstance.GetHost(), serviceInstance.GetPort(), revision, serviceInstance.GetServiceName())
+		} else {
+			firstGet = true
+		}
+		revisionExportedURLs = s.getExportedUrlsByInst(serviceInstance)
+		if revisionExportedURLs != nil {
+			revisionExportedURLsMap[revision] = revisionExportedURLs
+			logger.Debugf("Get the exported URLs[size : %s, first : %s] from the target service "+
+				"instance [id: %s , service : %s , host : %s , port : %s , revision : %s]",
+				len(revisionExportedURLs), firstGet, serviceInstance.GetId(), serviceInstance.GetServiceName(),
+				serviceInstance.GetHost(), serviceInstance.GetPort(), revision)
+		}
+	} else {
+		// Else, The cache is hit
+		logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+
+			"[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet,
+			serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(),
+			serviceInstance.GetPort(), revision)
+	}
+	return revisionExportedURLs
+}
+
+func getExportedServicesRevision(serviceInstance registry.ServiceInstance) string {
+	metaData := serviceInstance.GetMetadata()
+	return metaData[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME]
+}
+
+func getExportedStoreType(serviceInstance registry.ServiceInstance) string {
+	metaData := serviceInstance.GetMetadata()
+	result, ok := metaData[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME]
+	if !ok {
+		return constant.DEFAULT_METADATA_STORAGE_TYPE
+	}
+	return result
+}
+
+func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsances []registry.ServiceInstance) []common.URL {
+	if serviceInsances == nil || len(serviceInsances) == 0 {

Review comment:
       ```suggestion
   	if len(serviceInsances) == 0 {
   ```

##########
File path: remoting/zookeeper/curator_discovery/service_discovery.go
##########
@@ -0,0 +1,280 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package curator_discovery
+
+import (
+	"encoding/json"
+	"path"
+	"strings"
+	"sync"
+)
+
+import (
+	perrors "github.com/pkg/errors"
+)
+
+import (
+	"github.com/dubbogo/go-zookeeper/zk"

Review comment:
        - -?

##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+	"bytes"
+	"encoding/json"
+	"strconv"
+	"strings"
+	"sync"
+)
+
+import (
+	cm "github.com/Workiva/go-datastructures/common"
+	gxset "github.com/dubbogo/gost/container/set"
+	gxnet "github.com/dubbogo/gost/net"
+	perrors "github.com/pkg/errors"
+	"go.uber.org/atomic"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/common/observer"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/metadata/mapping"
+	"github.com/apache/dubbo-go/metadata/service"
+	"github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/registry/event"
+	"github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+const (
+	protocolName = "service-discovery"
+)
+
+func init() {
+	extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+	lock                             sync.RWMutex
+	url                              *common.URL
+	serviceDiscovery                 registry.ServiceDiscovery
+	subscribedServices               *gxset.HashSet
+	serviceNameMapping               mapping.ServiceNameMapping
+	metaDataService                  service.MetadataService
+	registeredListeners              *gxset.HashSet
+	subscribedURLsSynthesizers       []synthesizer.SubscribedURLsSynthesizer
+	serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+	tryInitMetadataService()
+
+	serviceDiscovery, err := creatServiceDiscovery(url)
+	if err != nil {
+		return nil, err
+	}
+	subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+	subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+	serviceNameMapping := extension.GetGlobalServiceNameMapping()
+	metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "could not init metadata service")
+	}
+	return &serviceDiscoveryRegistry{
+		url:                              url,
+		serviceDiscovery:                 serviceDiscovery,
+		subscribedServices:               subscribedServices,
+		subscribedURLsSynthesizers:       subscribedURLsSynthesizers,
+		registeredListeners:              gxset.NewSet(),
+		serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL),
+		serviceNameMapping:               serviceNameMapping,
+		metaDataService:                  metaDataService,
+	}, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+	sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+	if !ok {
+		return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+	}
+	originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "Create service discovery fialed")
+	}
+	return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+	set := gxset.NewSet()
+	if len(literalServices) == 0 {
+		return set
+	}
+	var splitServices = strings.Split(literalServices, ",")
+	for _, s := range splitServices {
+		if len(s) != 0 {
+			set.Add(s)
+		}
+	}
+	return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+	return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+	return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+	// TODO(whether available depends on metadata service and service discovery)
+	return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+	err := s.serviceDiscovery.Destroy()
+	if err != nil {
+		logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+	}
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	ok, err := s.metaDataService.ExportURL(url)
+
+	if err != nil {
+		logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+		return err
+	}
+	if !ok {
+		logger.Warnf("The URL[%s] has been registry!", url.String())
+	}
+
+	// we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+	// But we don't want to design a similar bootstrap class.
+	ins, err := createInstance(url)
+	if err != nil {
+		return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+	}
+
+	err = s.serviceDiscovery.Register(ins)
+	if err != nil {
+		return perrors.WithMessage(err, "register the service failed")
+	}
+
+	err = s.metaDataService.PublishServiceDefinition(url)
+	if err != nil {
+		return perrors.WithMessage(err, "publish the service definition failed. ")
+	}
+	return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+		url.GetParam(constant.GROUP_KEY, ""),
+		url.GetParam(constant.Version, ""),
+		url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+	appConfig := config.GetApplicationConfig()
+	port, err := strconv.ParseInt(url.Port, 10, 32)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+	}
+
+	host := url.Ip
+	if len(host) == 0 {
+		host, err = gxnet.GetLocalIP()
+		if err != nil {
+			return nil, perrors.WithMessage(err, "could not get the local Ip")
+		}
+	}
+
+	// usually we will add more metadata
+	metadata := make(map[string]string, 8)
+	metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+	return &registry.DefaultServiceInstance{
+		ServiceName: appConfig.Name,
+		Host:        host,
+		Port:        int(port),
+		Id:          host + constant.KEY_SEPARATOR + url.Port,
+		Enable:      true,
+		Healthy:     true,
+		Metadata:    metadata,
+	}, nil
+}
+
+func shouldRegister(url common.URL) bool {
+	side := url.GetParam(constant.SIDE_KEY, "")
+	if side == constant.PROVIDER_PROTOCOL {
+		return true
+	}
+	logger.Debugf("The URL should not be register.", url.String())
+	return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	_, err := s.metaDataService.SubscribeURL(*url)
+	if err != nil {
+		return perrors.WithMessage(err, "subscribe url error: "+url.String())
+	}
+	services := s.getServices(*url)
+	if services.Empty() {
+		return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+			"subscription url:%s", url.String())
+	}
+	for _, srv := range services.Values() {
+		serviceName := srv.(string)
+		serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+		s.subscribe(url, notify, serviceName, serviceInstances)
+		listener := &registry.ServiceInstancesChangedListener{
+			ServiceName: serviceName,
+			ChangedNotify: &InstanceChangeNotify{
+				notify:                   notify,
+				serviceDiscoveryRegistry: s,
+			},
+		}
+		s.registerServiceInstancesChangedListener(*url, listener)
+	}
+
+	return nil
+}

Review comment:
       where? He has already added a blank line between `Subscribe ` and `registerServiceInstancesChangedListener `

##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+	"bytes"
+	"encoding/json"
+	"strconv"
+	"strings"
+	"sync"
+)
+
+import (
+	cm "github.com/Workiva/go-datastructures/common"
+	gxset "github.com/dubbogo/gost/container/set"
+	gxnet "github.com/dubbogo/gost/net"
+	perrors "github.com/pkg/errors"
+	"go.uber.org/atomic"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/common/observer"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/metadata/mapping"
+	"github.com/apache/dubbo-go/metadata/service"
+	"github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/registry/event"
+	"github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+const (
+	protocolName = "service-discovery"
+)
+
+func init() {
+	extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+	lock                             sync.RWMutex
+	url                              *common.URL
+	serviceDiscovery                 registry.ServiceDiscovery
+	subscribedServices               *gxset.HashSet
+	serviceNameMapping               mapping.ServiceNameMapping
+	metaDataService                  service.MetadataService
+	registeredListeners              *gxset.HashSet
+	subscribedURLsSynthesizers       []synthesizer.SubscribedURLsSynthesizer
+	serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+	tryInitMetadataService()
+
+	serviceDiscovery, err := creatServiceDiscovery(url)
+	if err != nil {
+		return nil, err
+	}
+	subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+	subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+	serviceNameMapping := extension.GetGlobalServiceNameMapping()
+	metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "could not init metadata service")
+	}
+	return &serviceDiscoveryRegistry{
+		url:                              url,
+		serviceDiscovery:                 serviceDiscovery,
+		subscribedServices:               subscribedServices,
+		subscribedURLsSynthesizers:       subscribedURLsSynthesizers,
+		registeredListeners:              gxset.NewSet(),
+		serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL, 8),
+		serviceNameMapping:               serviceNameMapping,
+		metaDataService:                  metaDataService,
+	}, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+	sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+	if !ok {
+		return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+	}
+	originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "Create service discovery fialed")
+	}
+	return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+	set := gxset.NewSet()
+	if len(literalServices) == 0 {
+		return set
+	}
+	var splitServices = strings.Split(literalServices, ",")
+	for _, s := range splitServices {
+		if len(s) != 0 {
+			set.Add(s)
+		}
+	}
+	return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+	return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+	return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+	// TODO(whether available depends on metadata service and service discovery)
+	return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+	err := s.serviceDiscovery.Destroy()
+	if err != nil {
+		logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+	}
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	ok, err := s.metaDataService.ExportURL(url)
+
+	if err != nil {
+		logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+		return err
+	}
+	if !ok {
+		logger.Warnf("The URL[%s] has been registry!", url.String())
+	}
+
+	// we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+	// But we don't want to design a similar bootstrap class.
+	ins, err := createInstance(url)
+	if err != nil {
+		return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+	}
+
+	err = s.serviceDiscovery.Register(ins)
+	if err != nil {
+		return perrors.WithMessage(err, "register the service failed")
+	}
+
+	err = s.metaDataService.PublishServiceDefinition(url)
+	if err != nil {
+		return perrors.WithMessage(err, "publish the service definition failed. ")
+	}
+	return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+		url.GetParam(constant.GROUP_KEY, ""),
+		url.GetParam(constant.Version, ""),
+		url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+	appConfig := config.GetApplicationConfig()
+	port, err := strconv.ParseInt(url.Port, 10, 32)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+	}
+
+	host := url.Ip
+	if len(host) == 0 {
+		host, err = gxnet.GetLocalIP()
+		if err != nil {
+			return nil, perrors.WithMessage(err, "could not get the local Ip")
+		}
+	}
+
+	// usually we will add more metadata
+	metadata := make(map[string]string, 8)
+	metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+	return &registry.DefaultServiceInstance{
+		ServiceName: appConfig.Name,
+		Host:        host,
+		Port:        int(port),
+		Id:          host + constant.KEY_SEPARATOR + url.Port,
+		Enable:      true,
+		Healthy:     true,
+		Metadata:    metadata,
+	}, nil
+}
+
+func shouldRegister(url common.URL) bool {
+	side := url.GetParam(constant.SIDE_KEY, "")
+	if side == constant.PROVIDER_PROTOCOL {
+		return true
+	}
+	logger.Debugf("The URL should not be register.", url.String())
+	return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	_, err := s.metaDataService.SubscribeURL(*url)
+	if err != nil {
+		return perrors.WithMessage(err, "subscribe url error: "+url.String())
+	}
+	services := s.getServices(*url)
+	if services.Empty() {
+		return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+			"subscription url:%s", url.String())
+	}
+	for _, srv := range services.Values() {
+		serviceName := srv.(string)
+		serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+		s.subscribe(url, notify, serviceName, serviceInstances)
+		listener := &registry.ServiceInstancesChangedListener{
+			ServiceName: serviceName,
+			ChangedNotify: &InstanceChangeNotify{
+				notify:                   notify,
+				serviceDiscoveryRegistry: s,
+			},
+		}
+		s.registerServiceInstancesChangedListener(*url, listener)
+	}
+	return nil
+}
+
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+	listenerId := listener.ServiceName + ":" + getUrlKey(url)
+	if !s.subscribedServices.Contains(listenerId) {
+		err := s.serviceDiscovery.AddListener(listener)
+		if err != nil {
+			logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+		}
+	}
+
+}
+
+func getUrlKey(url common.URL) string {
+	var bf bytes.Buffer
+	if len(url.Protocol) != 0 {
+		bf.WriteString(url.Protocol)
+		bf.WriteString("://")
+	}
+	if len(url.Location) != 0 {
+		bf.WriteString(url.Location)
+		bf.WriteString(":")
+		bf.WriteString(url.Port)
+	}
+	if len(url.Path) != 0 {
+		bf.WriteString("/")
+		bf.WriteString(url.Path)
+	}
+	bf.WriteString("?")
+	appendParam(bf, constant.VERSION_KEY, url)
+	appendParam(bf, constant.GROUP_KEY, url)
+	appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+	return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+	buffer.WriteString(paramKey)
+	buffer.WriteString("=")
+	buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+	serviceName string, serviceInstances []registry.ServiceInstance) {
+	if len(serviceInstances) == 0 {
+		logger.Warnf("here is no instance in service[name : %s]", serviceName)
+		return
+	}
+	var subscribedURLs []common.URL
+	subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+	if len(subscribedURLs) == 0 {
+		subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+	}
+	for _, url := range subscribedURLs {
+		notify.Notify(&registry.ServiceEvent{
+			Action:  remoting.EventTypeAdd,
+			Service: url,
+		})
+	}
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+	var urls []common.URL
+	for _, syn := range s.subscribedURLsSynthesizers {
+		if syn.Support(subscribedURL) {
+			urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+		}
+	}
+	return urls
+}
+
+func shouldSubscribe(url common.URL) bool {
+	return !shouldRegister(url)
+}
+
+func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet {
+	services := gxset.NewSet()
+	serviceNames := url.GetParam(constant.PROVIDER_BY, "")
+	if len(serviceNames) != 0 {

Review comment:
       ```suggestion
   	if len(serviceNames) > 0 {
   ```
   ?

##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+	"bytes"
+	"encoding/json"
+	"strconv"
+	"strings"
+	"sync"
+)
+
+import (
+	cm "github.com/Workiva/go-datastructures/common"
+	gxset "github.com/dubbogo/gost/container/set"
+	gxnet "github.com/dubbogo/gost/net"
+	perrors "github.com/pkg/errors"
+	"go.uber.org/atomic"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/common/observer"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/metadata/mapping"
+	"github.com/apache/dubbo-go/metadata/service"
+	"github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/registry/event"
+	"github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+const (
+	protocolName = "service-discovery"
+)
+
+func init() {
+	extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+	lock                             sync.RWMutex
+	url                              *common.URL
+	serviceDiscovery                 registry.ServiceDiscovery
+	subscribedServices               *gxset.HashSet
+	serviceNameMapping               mapping.ServiceNameMapping
+	metaDataService                  service.MetadataService
+	registeredListeners              *gxset.HashSet
+	subscribedURLsSynthesizers       []synthesizer.SubscribedURLsSynthesizer
+	serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+	tryInitMetadataService()
+
+	serviceDiscovery, err := creatServiceDiscovery(url)
+	if err != nil {
+		return nil, err
+	}
+	subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+	subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+	serviceNameMapping := extension.GetGlobalServiceNameMapping()
+	metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "could not init metadata service")
+	}
+	return &serviceDiscoveryRegistry{
+		url:                              url,
+		serviceDiscovery:                 serviceDiscovery,
+		subscribedServices:               subscribedServices,
+		subscribedURLsSynthesizers:       subscribedURLsSynthesizers,
+		registeredListeners:              gxset.NewSet(),
+		serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL, 8),
+		serviceNameMapping:               serviceNameMapping,
+		metaDataService:                  metaDataService,
+	}, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+	sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+	if !ok {
+		return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+	}
+	originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "Create service discovery fialed")
+	}
+	return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+	set := gxset.NewSet()
+	if len(literalServices) == 0 {
+		return set
+	}
+	var splitServices = strings.Split(literalServices, ",")
+	for _, s := range splitServices {
+		if len(s) != 0 {
+			set.Add(s)
+		}
+	}
+	return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+	return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+	return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+	// TODO(whether available depends on metadata service and service discovery)
+	return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+	err := s.serviceDiscovery.Destroy()
+	if err != nil {
+		logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+	}
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	ok, err := s.metaDataService.ExportURL(url)
+
+	if err != nil {
+		logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+		return err
+	}
+	if !ok {
+		logger.Warnf("The URL[%s] has been registry!", url.String())
+	}
+
+	// we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+	// But we don't want to design a similar bootstrap class.
+	ins, err := createInstance(url)
+	if err != nil {
+		return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+	}
+
+	err = s.serviceDiscovery.Register(ins)
+	if err != nil {
+		return perrors.WithMessage(err, "register the service failed")
+	}
+
+	err = s.metaDataService.PublishServiceDefinition(url)
+	if err != nil {
+		return perrors.WithMessage(err, "publish the service definition failed. ")
+	}
+	return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+		url.GetParam(constant.GROUP_KEY, ""),
+		url.GetParam(constant.Version, ""),
+		url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+	appConfig := config.GetApplicationConfig()
+	port, err := strconv.ParseInt(url.Port, 10, 32)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+	}
+
+	host := url.Ip
+	if len(host) == 0 {
+		host, err = gxnet.GetLocalIP()
+		if err != nil {
+			return nil, perrors.WithMessage(err, "could not get the local Ip")
+		}
+	}
+
+	// usually we will add more metadata
+	metadata := make(map[string]string, 8)
+	metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+	return &registry.DefaultServiceInstance{
+		ServiceName: appConfig.Name,
+		Host:        host,
+		Port:        int(port),
+		Id:          host + constant.KEY_SEPARATOR + url.Port,
+		Enable:      true,
+		Healthy:     true,
+		Metadata:    metadata,
+	}, nil
+}
+
+func shouldRegister(url common.URL) bool {
+	side := url.GetParam(constant.SIDE_KEY, "")
+	if side == constant.PROVIDER_PROTOCOL {
+		return true
+	}
+	logger.Debugf("The URL should not be register.", url.String())
+	return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	_, err := s.metaDataService.SubscribeURL(*url)
+	if err != nil {
+		return perrors.WithMessage(err, "subscribe url error: "+url.String())
+	}
+	services := s.getServices(*url)
+	if services.Empty() {
+		return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+			"subscription url:%s", url.String())
+	}
+	for _, srv := range services.Values() {
+		serviceName := srv.(string)
+		serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+		s.subscribe(url, notify, serviceName, serviceInstances)
+		listener := &registry.ServiceInstancesChangedListener{
+			ServiceName: serviceName,
+			ChangedNotify: &InstanceChangeNotify{
+				notify:                   notify,
+				serviceDiscoveryRegistry: s,
+			},
+		}
+		s.registerServiceInstancesChangedListener(*url, listener)
+	}
+	return nil
+}
+
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+	listenerId := listener.ServiceName + ":" + getUrlKey(url)
+	if !s.subscribedServices.Contains(listenerId) {
+		err := s.serviceDiscovery.AddListener(listener)
+		if err != nil {
+			logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+		}
+	}
+
+}
+
+func getUrlKey(url common.URL) string {
+	var bf bytes.Buffer
+	if len(url.Protocol) != 0 {
+		bf.WriteString(url.Protocol)
+		bf.WriteString("://")
+	}
+	if len(url.Location) != 0 {
+		bf.WriteString(url.Location)
+		bf.WriteString(":")
+		bf.WriteString(url.Port)
+	}
+	if len(url.Path) != 0 {
+		bf.WriteString("/")
+		bf.WriteString(url.Path)
+	}
+	bf.WriteString("?")
+	appendParam(bf, constant.VERSION_KEY, url)
+	appendParam(bf, constant.GROUP_KEY, url)
+	appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+	return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+	buffer.WriteString(paramKey)
+	buffer.WriteString("=")
+	buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+	serviceName string, serviceInstances []registry.ServiceInstance) {
+	if len(serviceInstances) == 0 {
+		logger.Warnf("here is no instance in service[name : %s]", serviceName)
+		return
+	}
+	var subscribedURLs []common.URL
+	subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+	if len(subscribedURLs) == 0 {
+		subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+	}
+	for _, url := range subscribedURLs {
+		notify.Notify(&registry.ServiceEvent{
+			Action:  remoting.EventTypeAdd,
+			Service: url,
+		})
+	}
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+	var urls []common.URL
+	for _, syn := range s.subscribedURLsSynthesizers {
+		if syn.Support(subscribedURL) {
+			urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+		}
+	}
+	return urls
+}
+
+func shouldSubscribe(url common.URL) bool {
+	return !shouldRegister(url)
+}
+
+func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet {
+	services := gxset.NewSet()
+	serviceNames := url.GetParam(constant.PROVIDER_BY, "")
+	if len(serviceNames) != 0 {
+		services = parseServices(serviceNames)
+	}
+	if services.Empty() {
+		services = s.findMappedServices(url)
+		if services.Empty() {
+			return s.subscribedServices
+		}
+	}
+	return services
+}
+
+func (s *serviceDiscoveryRegistry) findMappedServices(url common.URL) *gxset.HashSet {
+	serviceInterface := url.GetParam(constant.INTERFACE_KEY, url.Path)
+	group := url.GetParam(constant.GROUP_KEY, "")
+	version := url.GetParam(constant.VERSION_KEY, "")
+	protocol := url.Protocol
+	serviceNames, err := s.serviceNameMapping.Get(serviceInterface, group, version, protocol)
+	if err != nil {
+		logger.Errorf("get serviceInterface:[%s] group:[%s] version:[%s] protocol:[%s] from "+
+			"serviceNameMap error:%s", err.Error())
+		return gxset.NewSet()
+	}
+	return serviceNames
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrls(subscribedURL common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+	var filterInstances []registry.ServiceInstance
+	for _, s := range serviceInstances {
+		if !s.IsEnable() || !s.IsHealthy() {
+			continue
+		}
+		metaData := s.GetMetadata()
+		_, ok1 := metaData[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME]
+		_, ok2 := metaData[constant.METADATA_SERVICE_URLS_PROPERTY_NAME]
+		if !ok1 && !ok2 {
+			continue
+		}
+		filterInstances = append(filterInstances, s)
+	}
+	if len(filterInstances) == 0 {
+		return []common.URL{}
+	}
+	s.prepareServiceRevisionExportedURLs(filterInstances)
+	subscribedURLs := s.cloneExportedURLs(subscribedURL, filterInstances)
+	return subscribedURLs
+}
+
+// comparator is defined as Comparator for skip list to compare the URL
+type comparator common.URL
+
+// Compare is defined as Comparator for skip list to compare the URL
+func (c comparator) Compare(comp cm.Comparator) int {
+	a := common.URL(c).String()
+	b := common.URL(comp.(comparator)).String()
+	switch {
+	case a > b:
+		return 1
+	case a < b:
+		return -1
+	default:
+		return 0
+	}
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+	var urls []common.URL
+	metadataStorageType := getExportedStoreType(serviceInstance)
+	proxyFactory := extension.GetMetadataServiceProxyFactory(metadataStorageType)
+	if proxyFactory == nil {
+		return urls
+	}
+	metadataService := proxyFactory.GetProxy(serviceInstance)
+	if metadataService == nil {
+		return urls
+	}
+	result, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
+	if err != nil {
+		logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance)
+		return urls
+	}
+
+	ret := make([]common.URL, 0, len(result))
+	for _, ui := range result {
+
+		u, err := common.NewURL(ui.(string))
+
+		if err != nil {
+			logger.Errorf("could not parse the url string to URL structure: %s", ui.(string), err)
+			continue
+		}
+		ret = append(ret, u)
+	}
+	return ret
+}
+
+func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	s.lock.Lock()
+	// 1. expunge stale
+	s.expungeStaleRevisionExportedURLs(serviceInstances)
+	// 2. Initialize
+	s.initRevisionExportedURLs(serviceInstances)
+	s.lock.Unlock()
+}
+
+func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	serviceName := serviceInstances[0].GetServiceName()
+	revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
+	if !exist {
+		return
+	}
+	existRevision := gxset.NewSet()
+	for k := range revisionExportedURLsMap {
+		existRevision.Add(k)
+	}
+	currentRevision := gxset.NewSet()
+	for _, s := range serviceInstances {
+		rv := getExportedServicesRevision(s)
+		if len(rv) != 0 {
+			currentRevision.Add(rv)
+		}
+	}
+	// staleRevisions = existedRevisions(copy) - currentRevisions
+	staleRevision := gxset.NewSet(existRevision.Values()...)
+	staleRevision.Remove(currentRevision.Values()...)
+	// remove exported URLs if staled
+	for _, s := range staleRevision.Values() {
+		delete(revisionExportedURLsMap, s.(string))
+	}
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	// initialize the revision exported URLs that the selected service instance exported
+	s.initSelectedRevisionExportedURLs(serviceInstances)
+	// initialize the revision exported URLs that other service instances exported
+	for _, serviceInstance := range serviceInstances {
+		s.initRevisionExportedURLsByInst(serviceInstance)
+	}
+}
+
+func (s *serviceDiscoveryRegistry) initSelectedRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	for range serviceInstances {
+		selectServiceInstance := s.selectServiceInstance(serviceInstances)
+		revisionExportedURLs := s.initRevisionExportedURLsByInst(selectServiceInstance)
+		if len(revisionExportedURLs) != 0 {

Review comment:
       ```suggestion
   		if len(revisionExportedURLs) > 0 {
   ```
   
   ?

##########
File path: registry/servicediscovery/service_discovery_registry.go
##########
@@ -0,0 +1,713 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package servicediscovery
+
+import (
+	"bytes"
+	"encoding/json"
+	"strconv"
+	"strings"
+	"sync"
+)
+
+import (
+	cm "github.com/Workiva/go-datastructures/common"
+	gxset "github.com/dubbogo/gost/container/set"
+	gxnet "github.com/dubbogo/gost/net"
+	perrors "github.com/pkg/errors"
+	"go.uber.org/atomic"
+)
+
+import (
+	"github.com/apache/dubbo-go/common"
+	"github.com/apache/dubbo-go/common/constant"
+	"github.com/apache/dubbo-go/common/extension"
+	"github.com/apache/dubbo-go/common/logger"
+	"github.com/apache/dubbo-go/common/observer"
+	"github.com/apache/dubbo-go/config"
+	"github.com/apache/dubbo-go/metadata/mapping"
+	"github.com/apache/dubbo-go/metadata/service"
+	"github.com/apache/dubbo-go/metadata/service/exporter/configurable"
+	"github.com/apache/dubbo-go/registry"
+	"github.com/apache/dubbo-go/registry/event"
+	"github.com/apache/dubbo-go/registry/servicediscovery/synthesizer"
+	"github.com/apache/dubbo-go/remoting"
+)
+
+const (
+	protocolName = "service-discovery"
+)
+
+func init() {
+	extension.SetRegistry(protocolName, newServiceDiscoveryRegistry)
+}
+
+// serviceDiscoveryRegistry is the implementation of application-level registry.
+// It's completely different from other registry implementations
+// This implementation is based on ServiceDiscovery abstraction and ServiceNameMapping
+// In order to keep compatible with interface-level registry,
+// this implementation is
+type serviceDiscoveryRegistry struct {
+	lock                             sync.RWMutex
+	url                              *common.URL
+	serviceDiscovery                 registry.ServiceDiscovery
+	subscribedServices               *gxset.HashSet
+	serviceNameMapping               mapping.ServiceNameMapping
+	metaDataService                  service.MetadataService
+	registeredListeners              *gxset.HashSet
+	subscribedURLsSynthesizers       []synthesizer.SubscribedURLsSynthesizer
+	serviceRevisionExportedURLsCache map[string]map[string][]common.URL
+}
+
+func newServiceDiscoveryRegistry(url *common.URL) (registry.Registry, error) {
+
+	tryInitMetadataService()
+
+	serviceDiscovery, err := creatServiceDiscovery(url)
+	if err != nil {
+		return nil, err
+	}
+	subscribedServices := parseServices(url.GetParam(constant.SUBSCRIBED_SERVICE_NAMES_KEY, ""))
+	subscribedURLsSynthesizers := synthesizer.GetAllSynthesizer()
+	serviceNameMapping := extension.GetGlobalServiceNameMapping()
+	metaDataService, err := extension.GetMetadataService(config.GetApplicationConfig().MetadataType)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "could not init metadata service")
+	}
+	return &serviceDiscoveryRegistry{
+		url:                              url,
+		serviceDiscovery:                 serviceDiscovery,
+		subscribedServices:               subscribedServices,
+		subscribedURLsSynthesizers:       subscribedURLsSynthesizers,
+		registeredListeners:              gxset.NewSet(),
+		serviceRevisionExportedURLsCache: make(map[string]map[string][]common.URL, 8),
+		serviceNameMapping:               serviceNameMapping,
+		metaDataService:                  metaDataService,
+	}, nil
+}
+
+func (s *serviceDiscoveryRegistry) UnRegister(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	return s.metaDataService.UnexportURL(url)
+}
+
+func (s *serviceDiscoveryRegistry) UnSubscribe(url *common.URL, listener registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	return s.metaDataService.UnsubscribeURL(*url)
+}
+
+func creatServiceDiscovery(url *common.URL) (registry.ServiceDiscovery, error) {
+	sdcName := url.GetParam(constant.SERVICE_DISCOVERY_KEY, "")
+	sdc, ok := config.GetBaseConfig().GetServiceDiscoveries(sdcName)
+	if !ok {
+		return nil, perrors.Errorf("The service discovery with name: %s is not found", sdcName)
+	}
+	originServiceDiscovery, err := extension.GetServiceDiscovery(sdc.Protocol, sdcName)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "Create service discovery fialed")
+	}
+	return event.NewEventPublishingServiceDiscovery(originServiceDiscovery), nil
+}
+
+func parseServices(literalServices string) *gxset.HashSet {
+	set := gxset.NewSet()
+	if len(literalServices) == 0 {
+		return set
+	}
+	var splitServices = strings.Split(literalServices, ",")
+	for _, s := range splitServices {
+		if len(s) != 0 {
+			set.Add(s)
+		}
+	}
+	return set
+}
+
+func (s *serviceDiscoveryRegistry) GetServiceDiscovery() registry.ServiceDiscovery {
+	return s.serviceDiscovery
+}
+
+func (s *serviceDiscoveryRegistry) GetUrl() common.URL {
+	return *s.url
+}
+
+func (s *serviceDiscoveryRegistry) IsAvailable() bool {
+	// TODO(whether available depends on metadata service and service discovery)
+	return true
+}
+
+func (s *serviceDiscoveryRegistry) Destroy() {
+	err := s.serviceDiscovery.Destroy()
+	if err != nil {
+		logger.Errorf("destroy serviceDiscovery catch error:%s", err.Error())
+	}
+}
+
+func (s *serviceDiscoveryRegistry) Register(url common.URL) error {
+	if !shouldRegister(url) {
+		return nil
+	}
+	ok, err := s.metaDataService.ExportURL(url)
+
+	if err != nil {
+		logger.Errorf("The URL[%s] registry catch error:%s!", url.String(), err.Error())
+		return err
+	}
+	if !ok {
+		logger.Warnf("The URL[%s] has been registry!", url.String())
+	}
+
+	// we try to register this instance. Dubbo do this in org.apache.dubbo.config.bootstrap.DubboBootstrap
+	// But we don't want to design a similar bootstrap class.
+	ins, err := createInstance(url)
+	if err != nil {
+		return perrors.WithMessage(err, "could not create servcie instance, please check your service url")
+	}
+
+	err = s.serviceDiscovery.Register(ins)
+	if err != nil {
+		return perrors.WithMessage(err, "register the service failed")
+	}
+
+	err = s.metaDataService.PublishServiceDefinition(url)
+	if err != nil {
+		return perrors.WithMessage(err, "publish the service definition failed. ")
+	}
+	return s.serviceNameMapping.Map(url.GetParam(constant.INTERFACE_KEY, ""),
+		url.GetParam(constant.GROUP_KEY, ""),
+		url.GetParam(constant.Version, ""),
+		url.Protocol)
+}
+
+func createInstance(url common.URL) (registry.ServiceInstance, error) {
+	appConfig := config.GetApplicationConfig()
+	port, err := strconv.ParseInt(url.Port, 10, 32)
+	if err != nil {
+		return nil, perrors.WithMessage(err, "invalid port: "+url.Port)
+	}
+
+	host := url.Ip
+	if len(host) == 0 {
+		host, err = gxnet.GetLocalIP()
+		if err != nil {
+			return nil, perrors.WithMessage(err, "could not get the local Ip")
+		}
+	}
+
+	// usually we will add more metadata
+	metadata := make(map[string]string, 8)
+	metadata[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME] = appConfig.MetadataType
+
+	return &registry.DefaultServiceInstance{
+		ServiceName: appConfig.Name,
+		Host:        host,
+		Port:        int(port),
+		Id:          host + constant.KEY_SEPARATOR + url.Port,
+		Enable:      true,
+		Healthy:     true,
+		Metadata:    metadata,
+	}, nil
+}
+
+func shouldRegister(url common.URL) bool {
+	side := url.GetParam(constant.SIDE_KEY, "")
+	if side == constant.PROVIDER_PROTOCOL {
+		return true
+	}
+	logger.Debugf("The URL should not be register.", url.String())
+	return false
+}
+
+func (s *serviceDiscoveryRegistry) Subscribe(url *common.URL, notify registry.NotifyListener) error {
+	if !shouldSubscribe(*url) {
+		return nil
+	}
+	_, err := s.metaDataService.SubscribeURL(*url)
+	if err != nil {
+		return perrors.WithMessage(err, "subscribe url error: "+url.String())
+	}
+	services := s.getServices(*url)
+	if services.Empty() {
+		return perrors.Errorf("Should has at least one way to know which services this interface belongs to, "+
+			"subscription url:%s", url.String())
+	}
+	for _, srv := range services.Values() {
+		serviceName := srv.(string)
+		serviceInstances := s.serviceDiscovery.GetInstances(serviceName)
+		s.subscribe(url, notify, serviceName, serviceInstances)
+		listener := &registry.ServiceInstancesChangedListener{
+			ServiceName: serviceName,
+			ChangedNotify: &InstanceChangeNotify{
+				notify:                   notify,
+				serviceDiscoveryRegistry: s,
+			},
+		}
+		s.registerServiceInstancesChangedListener(*url, listener)
+	}
+	return nil
+}
+
+func (s *serviceDiscoveryRegistry) registerServiceInstancesChangedListener(url common.URL, listener *registry.ServiceInstancesChangedListener) {
+	listenerId := listener.ServiceName + ":" + getUrlKey(url)
+	if !s.subscribedServices.Contains(listenerId) {
+		err := s.serviceDiscovery.AddListener(listener)
+		if err != nil {
+			logger.Errorf("add listener[%s] catch error,url:%s err:%s", listenerId, url.String(), err.Error())
+		}
+	}
+
+}
+
+func getUrlKey(url common.URL) string {
+	var bf bytes.Buffer
+	if len(url.Protocol) != 0 {
+		bf.WriteString(url.Protocol)
+		bf.WriteString("://")
+	}
+	if len(url.Location) != 0 {
+		bf.WriteString(url.Location)
+		bf.WriteString(":")
+		bf.WriteString(url.Port)
+	}
+	if len(url.Path) != 0 {
+		bf.WriteString("/")
+		bf.WriteString(url.Path)
+	}
+	bf.WriteString("?")
+	appendParam(bf, constant.VERSION_KEY, url)
+	appendParam(bf, constant.GROUP_KEY, url)
+	appendParam(bf, constant.NACOS_PROTOCOL_KEY, url)
+	return bf.String()
+}
+
+func appendParam(buffer bytes.Buffer, paramKey string, url common.URL) {
+	buffer.WriteString(paramKey)
+	buffer.WriteString("=")
+	buffer.WriteString(url.GetParam(paramKey, ""))
+}
+
+func (s *serviceDiscoveryRegistry) subscribe(url *common.URL, notify registry.NotifyListener,
+	serviceName string, serviceInstances []registry.ServiceInstance) {
+	if len(serviceInstances) == 0 {
+		logger.Warnf("here is no instance in service[name : %s]", serviceName)
+		return
+	}
+	var subscribedURLs []common.URL
+	subscribedURLs = append(subscribedURLs, s.getExportedUrls(*url, serviceInstances)...)
+	if len(subscribedURLs) == 0 {
+		subscribedURLs = s.synthesizeSubscribedURLs(url, serviceInstances)
+	}
+	for _, url := range subscribedURLs {
+		notify.Notify(&registry.ServiceEvent{
+			Action:  remoting.EventTypeAdd,
+			Service: url,
+		})
+	}
+
+}
+
+func (s *serviceDiscoveryRegistry) synthesizeSubscribedURLs(subscribedURL *common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+	var urls []common.URL
+	for _, syn := range s.subscribedURLsSynthesizers {
+		if syn.Support(subscribedURL) {
+			urls = append(urls, syn.Synthesize(subscribedURL, serviceInstances)...)
+		}
+	}
+	return urls
+}
+
+func shouldSubscribe(url common.URL) bool {
+	return !shouldRegister(url)
+}
+
+func (s *serviceDiscoveryRegistry) getServices(url common.URL) *gxset.HashSet {
+	services := gxset.NewSet()
+	serviceNames := url.GetParam(constant.PROVIDER_BY, "")
+	if len(serviceNames) != 0 {
+		services = parseServices(serviceNames)
+	}
+	if services.Empty() {
+		services = s.findMappedServices(url)
+		if services.Empty() {
+			return s.subscribedServices
+		}
+	}
+	return services
+}
+
+func (s *serviceDiscoveryRegistry) findMappedServices(url common.URL) *gxset.HashSet {
+	serviceInterface := url.GetParam(constant.INTERFACE_KEY, url.Path)
+	group := url.GetParam(constant.GROUP_KEY, "")
+	version := url.GetParam(constant.VERSION_KEY, "")
+	protocol := url.Protocol
+	serviceNames, err := s.serviceNameMapping.Get(serviceInterface, group, version, protocol)
+	if err != nil {
+		logger.Errorf("get serviceInterface:[%s] group:[%s] version:[%s] protocol:[%s] from "+
+			"serviceNameMap error:%s", err.Error())
+		return gxset.NewSet()
+	}
+	return serviceNames
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrls(subscribedURL common.URL, serviceInstances []registry.ServiceInstance) []common.URL {
+	var filterInstances []registry.ServiceInstance
+	for _, s := range serviceInstances {
+		if !s.IsEnable() || !s.IsHealthy() {
+			continue
+		}
+		metaData := s.GetMetadata()
+		_, ok1 := metaData[constant.METADATA_SERVICE_URL_PARAMS_PROPERTY_NAME]
+		_, ok2 := metaData[constant.METADATA_SERVICE_URLS_PROPERTY_NAME]
+		if !ok1 && !ok2 {
+			continue
+		}
+		filterInstances = append(filterInstances, s)
+	}
+	if len(filterInstances) == 0 {
+		return []common.URL{}
+	}
+	s.prepareServiceRevisionExportedURLs(filterInstances)
+	subscribedURLs := s.cloneExportedURLs(subscribedURL, filterInstances)
+	return subscribedURLs
+}
+
+// comparator is defined as Comparator for skip list to compare the URL
+type comparator common.URL
+
+// Compare is defined as Comparator for skip list to compare the URL
+func (c comparator) Compare(comp cm.Comparator) int {
+	a := common.URL(c).String()
+	b := common.URL(comp.(comparator)).String()
+	switch {
+	case a > b:
+		return 1
+	case a < b:
+		return -1
+	default:
+		return 0
+	}
+}
+
+func (s *serviceDiscoveryRegistry) getExportedUrlsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+	var urls []common.URL
+	metadataStorageType := getExportedStoreType(serviceInstance)
+	proxyFactory := extension.GetMetadataServiceProxyFactory(metadataStorageType)
+	if proxyFactory == nil {
+		return urls
+	}
+	metadataService := proxyFactory.GetProxy(serviceInstance)
+	if metadataService == nil {
+		return urls
+	}
+	result, err := metadataService.GetExportedURLs(constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE, constant.ANY_VALUE)
+	if err != nil {
+		logger.Errorf("get exported urls catch error:%s,instance:%+v", err.Error(), serviceInstance)
+		return urls
+	}
+
+	ret := make([]common.URL, 0, len(result))
+	for _, ui := range result {
+
+		u, err := common.NewURL(ui.(string))
+
+		if err != nil {
+			logger.Errorf("could not parse the url string to URL structure: %s", ui.(string), err)
+			continue
+		}
+		ret = append(ret, u)
+	}
+	return ret
+}
+
+func (s *serviceDiscoveryRegistry) prepareServiceRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	s.lock.Lock()
+	// 1. expunge stale
+	s.expungeStaleRevisionExportedURLs(serviceInstances)
+	// 2. Initialize
+	s.initRevisionExportedURLs(serviceInstances)
+	s.lock.Unlock()
+}
+
+func (s *serviceDiscoveryRegistry) expungeStaleRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	serviceName := serviceInstances[0].GetServiceName()
+	revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
+	if !exist {
+		return
+	}
+	existRevision := gxset.NewSet()
+	for k := range revisionExportedURLsMap {
+		existRevision.Add(k)
+	}
+	currentRevision := gxset.NewSet()
+	for _, s := range serviceInstances {
+		rv := getExportedServicesRevision(s)
+		if len(rv) != 0 {
+			currentRevision.Add(rv)
+		}
+	}
+	// staleRevisions = existedRevisions(copy) - currentRevisions
+	staleRevision := gxset.NewSet(existRevision.Values()...)
+	staleRevision.Remove(currentRevision.Values()...)
+	// remove exported URLs if staled
+	for _, s := range staleRevision.Values() {
+		delete(revisionExportedURLsMap, s.(string))
+	}
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	// initialize the revision exported URLs that the selected service instance exported
+	s.initSelectedRevisionExportedURLs(serviceInstances)
+	// initialize the revision exported URLs that other service instances exported
+	for _, serviceInstance := range serviceInstances {
+		s.initRevisionExportedURLsByInst(serviceInstance)
+	}
+}
+
+func (s *serviceDiscoveryRegistry) initSelectedRevisionExportedURLs(serviceInstances []registry.ServiceInstance) {
+	for range serviceInstances {
+		selectServiceInstance := s.selectServiceInstance(serviceInstances)
+		revisionExportedURLs := s.initRevisionExportedURLsByInst(selectServiceInstance)
+		if len(revisionExportedURLs) != 0 {
+			// If the result is valid,break
+			break
+		}
+	}
+}
+
+func (s *serviceDiscoveryRegistry) selectServiceInstance(serviceInstances []registry.ServiceInstance) registry.ServiceInstance {
+	size := len(serviceInstances)
+	if size == 0 {
+		return nil
+	}
+	if size == 1 {
+		return serviceInstances[0]
+	}
+	selectorName := s.url.GetParam(constant.SERVICE_INSTANCE_SELECTOR, "random")
+	selector, err := extension.GetServiceInstanceSelector(selectorName)
+	if err != nil {
+		logger.Errorf("get service instance selector cathe error:%s", err.Error())
+		return nil
+	}
+	return selector.Select(*s.url, serviceInstances)
+}
+
+func (s *serviceDiscoveryRegistry) initRevisionExportedURLsByInst(serviceInstance registry.ServiceInstance) []common.URL {
+	if serviceInstance == nil {
+		return []common.URL{}
+	}
+	serviceName := serviceInstance.GetServiceName()
+	revision := getExportedServicesRevision(serviceInstance)
+	revisionExportedURLsMap := s.serviceRevisionExportedURLsCache[serviceName]
+	if revisionExportedURLsMap == nil {
+		revisionExportedURLsMap = make(map[string][]common.URL, 4)
+		s.serviceRevisionExportedURLsCache[serviceName] = revisionExportedURLsMap
+	}
+	revisionExportedURLs := revisionExportedURLsMap[revision]
+	firstGet := false
+	if revisionExportedURLs == nil || len(revisionExportedURLs) == 0 {
+		if len(revisionExportedURLsMap) > 0 {
+			// The case is that current ServiceInstance with the different revision
+			logger.Warnf("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s"+
+				", please make sure the service [name : %s] is changing or not.", serviceInstance.GetId(),
+				serviceInstance.GetHost(), serviceInstance.GetPort(), revision, serviceInstance.GetServiceName())
+		} else {
+			firstGet = true
+		}
+		revisionExportedURLs = s.getExportedUrlsByInst(serviceInstance)
+		if revisionExportedURLs != nil {
+			revisionExportedURLsMap[revision] = revisionExportedURLs
+			logger.Debugf("Get the exported URLs[size : %s, first : %s] from the target service "+
+				"instance [id: %s , service : %s , host : %s , port : %s , revision : %s]",
+				len(revisionExportedURLs), firstGet, serviceInstance.GetId(), serviceInstance.GetServiceName(),
+				serviceInstance.GetHost(), serviceInstance.GetPort(), revision)
+		}
+	} else {
+		// Else, The cache is hit
+		logger.Debugf("Get the exported URLs[size : %s] from cache, the instance"+
+			"[id: %s , service : %s , host : %s , port : %s , revision : %s]", len(revisionExportedURLs), firstGet,
+			serviceInstance.GetId(), serviceInstance.GetServiceName(), serviceInstance.GetHost(),
+			serviceInstance.GetPort(), revision)
+	}
+	return revisionExportedURLs
+}
+
+func getExportedServicesRevision(serviceInstance registry.ServiceInstance) string {
+	metaData := serviceInstance.GetMetadata()
+	return metaData[constant.EXPORTED_SERVICES_REVISION_PROPERTY_NAME]
+}
+
+func getExportedStoreType(serviceInstance registry.ServiceInstance) string {
+	metaData := serviceInstance.GetMetadata()
+	result, ok := metaData[constant.METADATA_STORAGE_TYPE_PROPERTY_NAME]
+	if !ok {
+		return constant.DEFAULT_METADATA_STORAGE_TYPE
+	}
+	return result
+}
+
+func (s *serviceDiscoveryRegistry) cloneExportedURLs(url common.URL, serviceInsances []registry.ServiceInstance) []common.URL {
+	if serviceInsances == nil || len(serviceInsances) == 0 {
+		return []common.URL{}
+	}
+	var clonedExportedURLs []common.URL
+	removeParamSet := gxset.NewSet()
+	removeParamSet.Add(constant.PID_KEY)
+	removeParamSet.Add(constant.TIMESTAMP_KEY)
+	for _, serviceInstance := range serviceInsances {
+		templateExportURLs := s.getTemplateExportedURLs(url, serviceInstance)
+		host := serviceInstance.GetHost()
+		for _, u := range templateExportURLs {
+			port := strconv.Itoa(getProtocolPort(serviceInstance, u.Protocol))
+			if u.Location != host || u.Port != port {
+				u.Port = port                  // reset port
+				u.Location = host + ":" + port // reset host
+			}
+
+			cloneUrl := u.CloneExceptParams(removeParamSet)
+			clonedExportedURLs = append(clonedExportedURLs, *cloneUrl)
+		}
+	}
+	return clonedExportedURLs
+
+}
+
+type endpoint struct {
+	Port     int    `json:"port, omitempty"`
+	Protocol string `json:"protocol, omitempty"`
+}
+
+func getProtocolPort(serviceInstance registry.ServiceInstance, protocol string) int {
+	md := serviceInstance.GetMetadata()
+	rawEndpoints := md[constant.SERVICE_INSTANCE_ENDPOINTS]
+	if len(rawEndpoints) == 0 {
+		return -1
+	}
+	var endpoints []endpoint
+	err := json.Unmarshal([]byte(rawEndpoints), &endpoints)
+	if err != nil {
+		logger.Errorf("json umarshal rawEndpoints[%s] catch error:%s", rawEndpoints, err.Error())
+		return -1
+	}
+	for _, e := range endpoints {
+		if e.Protocol == protocol {
+			return e.Port
+		}
+	}
+	return -1
+}
+func (s *serviceDiscoveryRegistry) getTemplateExportedURLs(url common.URL, serviceInstance registry.ServiceInstance) []common.URL {
+	exportedURLs := s.getRevisionExportedURLs(serviceInstance)
+	if len(exportedURLs) == 0 {
+		return []common.URL{}
+	}
+	return filterSubscribedURLs(url, exportedURLs)
+}
+
+func (s *serviceDiscoveryRegistry) getRevisionExportedURLs(serviceInstance registry.ServiceInstance) []common.URL {
+	if serviceInstance == nil {
+		return []common.URL{}
+	}
+	serviceName := serviceInstance.GetServiceName()
+	revision := getExportedServicesRevision(serviceInstance)
+	s.lock.RLock()
+	revisionExportedURLsMap, exist := s.serviceRevisionExportedURLsCache[serviceName]
+	if !exist {
+		return []common.URL{}
+	}
+	exportedURLs, exist := revisionExportedURLsMap[revision]
+	if !exist {
+		return []common.URL{}
+	}
+	s.lock.RUnlock()
+	// Get a copy from source in order to prevent the caller trying to change the cached data
+	cloneExportedURLs := make([]common.URL, len(exportedURLs))
+	copy(cloneExportedURLs, exportedURLs)
+	return cloneExportedURLs
+}
+
+func filterSubscribedURLs(subscribedURL common.URL, exportedURLs []common.URL) []common.URL {
+	var filterExportedURLs []common.URL
+	for _, url := range exportedURLs {
+		if url.GetParam(constant.INTERFACE_KEY, url.Path) != subscribedURL.GetParam(constant.INTERFACE_KEY, url.Path) {
+			break
+		}
+		if url.GetParam(constant.VERSION_KEY, "") != subscribedURL.GetParam(constant.VERSION_KEY, "") {
+			break
+		}
+		if url.GetParam(constant.GROUP_KEY, "") != subscribedURL.GetParam(constant.GROUP_KEY, "") {
+			break
+		}
+		if len(subscribedURL.Protocol) != 0 {
+			if subscribedURL.Protocol != url.Protocol {
+				break
+			}
+		}
+		filterExportedURLs = append(filterExportedURLs, url)
+	}
+	return filterExportedURLs
+}
+
+type InstanceChangeNotify struct {
+	notify                   registry.NotifyListener
+	serviceDiscoveryRegistry *serviceDiscoveryRegistry
+}
+
+func (icn *InstanceChangeNotify) Notify(event observer.Event) {
+
+	if se, ok := event.(*registry.ServiceInstancesChangedEvent); ok {
+		sdr := icn.serviceDiscoveryRegistry
+		sdr.subscribe(sdr.url, icn.notify, se.ServiceName, se.Instances)
+	}
+}
+
+var (
+	exporting = &atomic.Bool{}
+)
+
+// tryInitMetadataService will try to initialize metadata service
+// TODO (move to somewhere)

Review comment:
       move to where?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
users@infra.apache.org



---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@dubbo.apache.org
For additional commands, e-mail: notifications-help@dubbo.apache.org