You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/04 07:09:39 UTC

[dubbo] 02/27: service instance subscription

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 20cf141f2e8588e1695c37c14b4d89a268b96452
Author: ken.lj <ke...@gmail.com>
AuthorDate: Thu Jun 4 11:52:08 2020 +0800

    service instance subscription
---
 .../org/apache/dubbo/metadata/MetadataInfo.java    | 181 +++++++
 .../org/apache/dubbo/registry/NotifyListener.java  |   6 +
 .../registry/client/DefaultServiceInstance.java    |  18 +
 .../dubbo/registry/client/InstanceAddressURL.java  | 132 +++++
 .../dubbo/registry/client/ServiceDiscovery.java    |   3 +-
 .../registry/client/ServiceDiscoveryRegistry.java  | 592 ++-------------------
 .../dubbo/registry/client/ServiceInstance.java     |   4 +
 .../client/event/ServiceInstancesChangedEvent.java |  12 +-
 .../listener/ServiceInstancesChangedListener.java  | 113 +++-
 .../registry/integration/RegistryDirectory.java    |  33 ++
 10 files changed, 544 insertions(+), 550 deletions(-)

diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
new file mode 100644
index 0000000..20933fb
--- /dev/null
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.metadata;
+
+import org.apache.dubbo.common.URL;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetadataInfo implements Serializable {
+    private String app;
+    private String revision;
+    private Map<String, ServiceInfo> services;
+
+    public MetadataInfo() {
+    }
+
+    public MetadataInfo(String app, String revision, Map<String, ServiceInfo> services) {
+        this.app = app;
+        this.revision = revision;
+        this.services = services == null ? new HashMap<>() : services;
+    }
+
+    public String getApp() {
+        return app;
+    }
+
+    public void setApp(String app) {
+        this.app = app;
+    }
+
+    public String getRevision() {
+        return revision;
+    }
+
+    public void setRevision(String revision) {
+        this.revision = revision;
+    }
+
+    public Map<String, ServiceInfo> getServices() {
+        return services;
+    }
+
+    public void setServices(Map<String, ServiceInfo> services) {
+        this.services = services;
+    }
+
+    public ServiceInfo getServiceInfo(String serviceKey) {
+        return services.get(serviceKey);
+    }
+
+    public String getParameter(String key, String serviceKey) {
+        ServiceInfo serviceInfo = services.get(serviceKey);
+        if (serviceInfo == null) {
+            return null;
+        }
+        return serviceInfo.getParameter(key);
+    }
+
+    public Map<String, String> getParameters(String serviceKey) {
+        ServiceInfo serviceInfo = services.get(serviceKey);
+        if (serviceInfo == null) {
+            return Collections.emptyMap();
+        }
+        return serviceInfo.getParams();
+    }
+
+    public static class ServiceInfo implements Serializable {
+        private String name;
+        private String group;
+        private String version;
+        private String protocol;
+        private String registry;
+        private Map<String, String> params;
+
+        private transient Map<String, Map<String, String>> methodParams;
+        private transient String serviceKey;
+
+        public ServiceInfo() {
+        }
+
+        public ServiceInfo(String name, String group, String version, String protocol, String registry, Map<String, String> params) {
+            this.name = name;
+            this.group = group;
+            this.version = version;
+            this.protocol = protocol;
+            this.registry = registry;
+            this.params = params == null ? new HashMap<>() : params;
+
+            this.serviceKey = URL.buildKey(name, group, version);
+        }
+
+        public String getServiceKey() {
+            return serviceKey;
+        }
+
+        public String getName() {
+            return name;
+        }
+
+        public void setName(String name) {
+            this.name = name;
+        }
+
+        public String getGroup() {
+            return group;
+        }
+
+        public void setGroup(String group) {
+            this.group = group;
+        }
+
+        public String getVersion() {
+            return version;
+        }
+
+        public void setVersion(String version) {
+            this.version = version;
+        }
+
+        public String getProtocol() {
+            return protocol;
+        }
+
+        public void setProtocol(String protocol) {
+            this.protocol = protocol;
+        }
+
+        public String getRegistry() {
+            return registry;
+        }
+
+        public void setRegistry(String registry) {
+            this.registry = registry;
+        }
+
+        public Map<String, String> getParams() {
+            if (params == null) {
+                return Collections.emptyMap();
+            }
+            return params;
+        }
+
+        public void setParams(Map<String, String> params) {
+            this.params = params;
+        }
+
+        public String getParameter(String key) {
+            return params.get(key);
+        }
+
+        public String getMethodParameter(String method, String key, String defaultValue) {
+            if (methodParams == null) {
+                methodParams = URL.toMethodParameters(params);
+            }
+
+            Map<String, String> keyMap = methodParams.get(method);
+            String value = null;
+            if (keyMap != null) {
+                value = keyMap.get(key);
+            }
+            return value == null ? defaultValue : value;
+        }
+    }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
index 0e41cc9..5b54000 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.registry;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 
 import java.util.List;
 
@@ -41,4 +42,9 @@ public interface NotifyListener {
      */
     void notify(List<URL> urls);
 
+    default void addServiceListener(ServiceInstancesChangedListener instanceListener) {
+    }
+
+    default void notifyServiceInstances() {
+    }
 }
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index 366f607..6c88e8c 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -16,6 +16,9 @@
  */
 package org.apache.dubbo.registry.client;
 
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
+
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
@@ -43,6 +46,8 @@ public class DefaultServiceInstance implements ServiceInstance {
 
     private Map<String, String> metadata = new HashMap<>();
 
+    private MetadataInfo serviceMetadata;
+
     public DefaultServiceInstance() {
     }
 
@@ -125,6 +130,19 @@ public class DefaultServiceInstance implements ServiceInstance {
         this.metadata = metadata;
     }
 
+    public MetadataInfo getServiceMetadata() {
+        return serviceMetadata;
+    }
+
+    public void setServiceMetadata(MetadataInfo serviceMetadata) {
+        this.serviceMetadata = serviceMetadata;
+    }
+
+    @Override
+    public URL toURL(String protocol, String path, String interfaceName, String group, String version, String serviceKey) {
+        return new InstanceAddressURL(protocol, host, port, path, interfaceName, group, version, serviceKey);
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) return true;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
new file mode 100644
index 0000000..a4ece68
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.metadata.MetadataInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+public class InstanceAddressURL extends URL {
+
+    private MetadataInfo metadataInfo; // points to metadata of one revision
+    private String interfaceName;
+    private String group;
+    private String version;
+    private String serviceKey;
+
+    public InstanceAddressURL(String protocol, String host, int port, String path, String interfaceName, String group, String version, String serviceKey) {
+        super(protocol, host, port, path);
+        this.interfaceName = interfaceName;
+        this.group = group;
+        this.version = version;
+        this.serviceKey = serviceKey;
+    }
+
+    @Override
+    public String getServiceInterface() {
+        return interfaceName;
+    }
+
+    public String getGroup() {
+        return group;
+    }
+
+    public String getVersion() {
+        return version;
+    }
+
+    @Override
+    public String getServiceKey() {
+        return serviceKey;
+    }
+
+    @Override
+    public String getParameter(String key) {
+        if (VERSION_KEY.equals(key)) {
+            return getVersion();
+        } else if (GROUP_KEY.equals(key)) {
+            return getGroup();
+        } else if (INTERFACE_KEY.equals(key)) {
+            return getServiceInterface();
+        }
+
+        String value = super.getParameter(key);
+        if (StringUtils.isEmpty(value)) {
+            value = metadataInfo.getParameter(key, this.getServiceKey());
+        }
+        return value;
+    }
+
+    @Override
+    public String getParameter(String key, String defaultValue) {
+        if (VERSION_KEY.equals(key)) {
+            return getVersion();
+        } else if (GROUP_KEY.equals(key)) {
+            return getGroup();
+        } else if (INTERFACE_KEY.equals(key)) {
+            return getServiceInterface();
+        }
+
+        String value = getParameter(key);
+        if (StringUtils.isEmpty(value)) {
+            return defaultValue;
+        }
+        return value;
+    }
+
+    @Override
+    public String getMethodParameter(String method, String key) {
+        Map<String, Map<String, String>> instanceMethodParams = super.getMethodParameters();
+        Map<String, String> keyMap = instanceMethodParams.get(method);
+        String value = null;
+        if (keyMap != null) {
+            value = keyMap.get(key);
+        }
+
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getServiceKey());
+        value = serviceInfo.getMethodParameter(method, key, value);
+        return value;
+    }
+
+    @Override
+    public Map<String, String> getParameters() {
+        Map<String, String> instanceParams = super.getParameters();
+        Map<String, String> metadataParams = metadataInfo.getParameters(getServiceKey());
+        int i = instanceParams == null ? 0 : instanceParams.size();
+        int j = metadataParams == null ? 0 : metadataParams.size();
+        Map<String, String> params = new HashMap<>((int) ((i + j) / 0.75) + 1);
+        if (instanceParams != null) {
+            params.putAll(instanceParams);
+        }
+        if (metadataParams != null) {
+            params.putAll(metadataParams);
+        }
+        return params;
+    }
+
+    public void setMetadata(MetadataInfo metadataInfo) {
+        this.metadataInfo = metadataInfo;
+    }
+
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index f2d428a..bc73cd0 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -27,7 +27,6 @@ import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 
-import java.util.Collection;
 import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
@@ -247,7 +246,7 @@ public interface ServiceDiscovery extends Prioritized {
      * @param serviceName      the name of service whose service instances have been changed
      * @param serviceInstances the service instances have been changed
      */
-    default void dispatchServiceInstancesChangedEvent(String serviceName, Collection<ServiceInstance> serviceInstances) {
+    default void dispatchServiceInstancesChangedEvent(String serviceName, List<ServiceInstance> serviceInstances) {
         dispatchServiceInstancesChangedEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
     }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index db03a3c..6232850 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -17,71 +17,54 @@
 package org.apache.dubbo.registry.client;
 
 import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
 import org.apache.dubbo.common.extension.ExtensionLoader;
 import org.apache.dubbo.common.extension.SPI;
 import org.apache.dubbo.common.logger.Logger;
 import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.metadata.ServiceNameMapping;
 import org.apache.dubbo.metadata.WritableMetadataService;
 import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.Registry;
-import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
 import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
 import org.apache.dubbo.registry.client.metadata.SubscribedURLsSynthesizer;
-import org.apache.dubbo.registry.client.metadata.proxy.MetadataServiceProxyFactory;
-import org.apache.dubbo.registry.client.selector.ServiceInstanceSelector;
 import org.apache.dubbo.registry.support.FailbackRegistry;
 
 import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
 import java.util.LinkedHashMap;
 import java.util.LinkedHashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Set;
-import java.util.SortedSet;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
 import java.util.stream.Collectors;
 
 import static java.lang.String.format;
-import static java.util.Collections.emptyList;
-import static org.apache.dubbo.common.URLBuilder.from;
-import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR_CHAR;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.unmodifiableSet;
+import static java.util.stream.Collectors.toSet;
+import static java.util.stream.Stream.of;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
 import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
 import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDED_BY;
 import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
 import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE;
 import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY;
-import static org.apache.dubbo.common.extension.ExtensionLoader.getExtensionLoader;
 import static org.apache.dubbo.common.function.ThrowableAction.execute;
 import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
-import static org.apache.dubbo.common.utils.CollectionUtils.isNotEmpty;
-import static org.apache.dubbo.common.utils.StringUtils.splitToSet;
-import static org.apache.dubbo.metadata.MetadataService.toURLs;
+import static org.apache.dubbo.common.utils.StringUtils.isBlank;
 import static org.apache.dubbo.registry.client.ServiceDiscoveryFactory.getExtension;
-import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
 import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getMetadataStorageType;
-import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getProtocolPort;
 
 /**
  * Being different to the traditional registry, {@link ServiceDiscoveryRegistry} that is a new service-oriented
@@ -104,7 +87,7 @@ import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataU
  * {@link ServiceNameMapping} will help to figure out one or more services that exported correlative Dubbo services. If
  * the service names can be found, the exported {@link URL URLs} will be get from the remote {@link MetadataService}
  * being deployed on all {@link ServiceInstance instances} of services. The whole process runs under the
- * {@link #subscribeURLs(URL, List, String, Collection)} method. It's very expensive to invoke
+ * {@link #subscribeURLs(URL, NotifyListener, String, Collection)} method. It's very expensive to invoke
  * {@link MetadataService} for each {@link ServiceInstance service instance}, thus {@link ServiceDiscoveryRegistry}
  * introduces a cache to optimize the calculation with "revisions". If the revisions of N
  * {@link ServiceInstance service instances} are same, {@link MetadataService} is invoked just only once, and then it
@@ -126,7 +109,7 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
 
     private final ServiceDiscovery serviceDiscovery;
 
-    private Set<String> subscribedServices;
+    private final Set<String> subscribedServices;
 
     private final ServiceNameMapping serviceNameMapping;
 
@@ -134,9 +117,8 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
 
     private final Set<String> registeredListeners = new LinkedHashSet<>();
 
-    private final List<SubscribedURLsSynthesizer> subscribedURLsSynthesizers;
-
-    private final ReadWriteLock lock = new ReentrantReadWriteLock();
+    /* app - listener */
+    private final Map<String, ServiceInstancesChangedListener> serviceListeners = new HashMap<>();
 
     /**
      * A cache for all URLs of services that the subscribed services exported
@@ -148,10 +130,10 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
     public ServiceDiscoveryRegistry(URL registryURL) {
         super(registryURL);
         this.serviceDiscovery = createServiceDiscovery(registryURL);
+        this.subscribedServices = parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY));
         this.serviceNameMapping = ServiceNameMapping.getDefaultExtension();
         String metadataStorageType = getMetadataStorageType(registryURL);
         this.writableMetadataService = WritableMetadataService.getExtension(metadataStorageType);
-        this.subscribedURLsSynthesizers = initSubscribedURLsSynthesizers();
     }
 
     public ServiceDiscovery getServiceDiscovery() {
@@ -159,16 +141,6 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
     }
 
     /**
-     * Get the subscribed services from the specified registry {@link URL url}
-     *
-     * @param registryURL the specified registry {@link URL url}
-     * @return non-null
-     */
-    public static Set<String> getSubscribedServices(URL registryURL) {
-        return parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY));
-    }
-
-    /**
      * Create the {@link ServiceDiscovery} from the registry {@link URL}
      *
      * @param registryURL the {@link URL} to connect the registry
@@ -317,73 +289,24 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
         writableMetadataService.subscribeURL(url);
 
         Set<String> serviceNames = getServices(url);
-
-        List<URL> subscribedURLs = new LinkedList<>();
-
-        serviceNames.forEach(serviceName -> {
-
-            subscribeURLs(url, subscribedURLs, serviceName);
-
-            // register ServiceInstancesChangedListener
-            registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {
-
-                @Override
-                public void onEvent(ServiceInstancesChangedEvent event) {
-                    List<URL> subscribedURLs = new LinkedList<>();
-                    Set<String> others = new HashSet<>(serviceNames);
-                    others.remove(serviceName);
-
-                    // Collect the subscribedURLs
-                    subscribeURLs(url, subscribedURLs, serviceName, () -> event.getServiceInstances());
-                    subscribeURLs(url, subscribedURLs, others.toString(), () -> getServiceInstances(others));
-
-                    // Notify all
-                    notifyAllSubscribedURLs(url, subscribedURLs, listener);
-
-                }
-            });
-        });
-
-        // Notify all
-        notifyAllSubscribedURLs(url, subscribedURLs, listener);
-
-    }
-
-    private void notifyAllSubscribedURLs(URL url, List<URL> subscribedURLs, NotifyListener listener) {
-
-        if (subscribedURLs.isEmpty()) {
-            // Add the EMPTY_PROTOCOL URL
-            subscribedURLs.add(from(url).setProtocol(EMPTY_PROTOCOL).removeParameter(CATEGORY_KEY).build());
+        if (CollectionUtils.isEmpty(serviceNames)) {
+            throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
         }
 
-        // Notify all
-        listener.notify(subscribedURLs);
+        serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));
     }
 
-    private List<ServiceInstance> getServiceInstances(Set<String> serviceNames) {
-        if (isEmpty(serviceNames)) {
-            return emptyList();
-        }
-        List<ServiceInstance> allServiceInstances = new LinkedList<>();
-        for (String serviceName : serviceNames) {
-            List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
-            if (!isEmpty(serviceInstances)) {
-                allServiceInstances.addAll(serviceInstances);
-            }
-        }
-        return allServiceInstances;
-    }
-
-    protected void subscribeURLs(URL subscribedURL, List<URL> subscribedURLs,
-                                 String serviceName, Supplier<Collection<ServiceInstance>> serviceInstancesSupplier) {
-        Collection<ServiceInstance> serviceInstances = serviceInstancesSupplier.get();
-        subscribeURLs(subscribedURL, subscribedURLs, serviceName, serviceInstances);
-    }
-
-
-    protected void subscribeURLs(URL url, List<URL> subscribedURLs, String serviceName) {
-        List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
-        subscribeURLs(url, subscribedURLs, serviceName, serviceInstances);
+    protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
+        // register ServiceInstancesChangedListener
+        ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceName,
+                k -> new ServiceInstancesChangedListener(serviceName) {
+                    @Override
+                    protected void notifyAddresses() {
+                        listener.notifyServiceInstances();
+                    }
+                });
+        listener.addServiceListener(serviceListener);
+        registerServiceInstancesChangedListener(url, serviceListener);
     }
 
     /**
@@ -403,432 +326,24 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
         return listener.getServiceName() + ":" + url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY);
     }
 
-    /**
-     * Subscribe the {@link URL URLs} that the specified service exported are
-     * {@link #getExportedURLs(ServiceInstance) get} from {@link MetadataService} if present, or try to
-     * be {@link #synthesizeSubscribedURLs(URL, Collection) synthesized} by
-     * the instances of {@link SubscribedURLsSynthesizer}
-     *
-     * @param subscribedURL    the subscribed {@link URL url}
-     * @param subscribedURLs   {@link NotifyListener}
-     * @param serviceName
-     * @param serviceInstances
-     * @see #getExportedURLs(URL, Collection)
-     * @see #synthesizeSubscribedURLs(URL, Collection)
-     */
-    protected void subscribeURLs(URL subscribedURL, List<URL> subscribedURLs, String serviceName,
-                                 Collection<ServiceInstance> serviceInstances) {
-
-        if (isEmpty(serviceInstances)) {
-            logger.warn(format("There is no instance in service[name : %s]", serviceName));
-            return;
-        }
-
-        /**
-         * Add the exported URLs from {@link MetadataService}
-         */
-        subscribedURLs.addAll(getExportedURLs(subscribedURL, serviceInstances));
-
-        if (subscribedURLs.isEmpty()) { // If empty, try to synthesize
-            /**
-             * Add the subscribed URLs that were synthesized
-             */
-            subscribedURLs.addAll(synthesizeSubscribedURLs(subscribedURL, serviceInstances));
-        }
-    }
-
-    /**
-     * Get the exported {@link URL URLs} from the  {@link MetadataService} in the specified
-     * {@link ServiceInstance service instances}
-     *
-     * @param subscribedURL the subscribed {@link URL url}
-     * @param instances     {@link ServiceInstance service instances}
-     * @return the exported {@link URL URLs} if present, or <code>{@link Collections#emptyList() empty list}</code>
-     */
-    private List<URL> getExportedURLs(URL subscribedURL, Collection<ServiceInstance> instances) {
-
-        // local service instances could be mutable
-        List<ServiceInstance> serviceInstances = instances.stream()
-                .filter(ServiceInstance::isEnabled)
-                .filter(ServiceInstance::isHealthy)
-                .filter(ServiceInstanceMetadataUtils::isDubboServiceInstance)
-                .collect(Collectors.toList());
-
-        int size = serviceInstances.size();
-
-        if (size == 0) {
-            return emptyList();
-        }
-
-        // Prepare revision exported URLs
-        prepareServiceRevisionExportedURLs(serviceInstances);
-
-        // Clone the subscribed URLs from the template URLs
-        List<URL> subscribedURLs = cloneExportedURLs(subscribedURL, serviceInstances);
-
-        // clear local service instances
-        serviceInstances.clear();
-
-        return subscribedURLs;
-    }
-
-    /**
-     * Prepare the {@link #serviceRevisionExportedURLsCache} exclusively
-     *
-     * @param serviceInstances {@link ServiceInstance service instances}
-     * @see #expungeStaleRevisionExportedURLs(List)
-     * @see #initializeRevisionExportedURLs(List)
-     */
-    private void prepareServiceRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
-        executeExclusively(() -> {
-            // 1. expunge stale
-            expungeStaleRevisionExportedURLs(serviceInstances);
-            // 2. Initialize
-            initializeRevisionExportedURLs(serviceInstances);
-        });
-    }
-
-    /**
-     * Initialize the {@link URL URLs} that {@link ServiceInstance service instances} exported into
-     * {@link #serviceRevisionExportedURLsCache the cache}.
-     * <p>
-     * Typically, the {@link URL URLs} that one {@link ServiceInstance service instance} exported can be get from
-     * the same instances' {@link MetadataService}, but the cost is very expensive if there are a lot of instances
-     * in this service. Thus, the exported {@link URL URls} should be cached  and stored into
-     * {@link #serviceRevisionExportedURLsCache the cache}.
-     * <p>
-     * In most cases, {@link #serviceRevisionExportedURLsCache the cache} only holds a single list of exported URLs for
-     * each service because there is no difference on the Dubbo services(interfaces) between the service instances.
-     * However, if there are one or more upgrading or increasing Dubbo services that are deploying on the some of
-     * instances, other instances still maintain the previous ones, in this way, there are two versions of the services,
-     * they are called "revisions", in other words, one revision associates a list of exported URLs that can be reused
-     * for other instances with same revision, and one service allows one or more revisions.
-     *
-     * @param serviceInstances {@link ServiceInstance service instances}
-     */
-    private void initializeRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
-        // initialize the revision exported URLs that the selected service instance exported
-        initializeSelectedRevisionExportedURLs(serviceInstances);
-        // initialize the revision exported URLs that other service instances exported
-        serviceInstances.forEach(this::initializeRevisionExportedURLs);
-    }
-
-    /**
-     * Initialize the {@link URL URLs} that the {@link #selectServiceInstance(List) selected service instance} exported
-     * into {@link #serviceRevisionExportedURLsCache the cache}.
-     *
-     * @param serviceInstances {@link ServiceInstance service instances}
-     */
-    private void initializeSelectedRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
-        // Try to initialize revision exported URLs until success
-        for (int i = 0; i < serviceInstances.size(); i++) {
-            // select a instance of {@link ServiceInstance}
-            ServiceInstance selectedInstance = selectServiceInstance(serviceInstances);
-            List<URL> revisionExportedURLs = initializeRevisionExportedURLs(selectedInstance);
-            if (isNotEmpty(revisionExportedURLs)) {    // If the result is valid
-                break;
-            }
-        }
-    }
-
-    /**
-     * Expunge the revision exported {@link URL URLs} in {@link #serviceRevisionExportedURLsCache the cache} if
-     * some revisions of {@link ServiceInstance service instance} had been out of date possibly
-     *
-     * @param serviceInstances {@link ServiceInstance service instances}
-     */
-    private void expungeStaleRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
-
-        String serviceName = serviceInstances.get(0).getServiceName();
-        // revisionExportedURLsMap is mutable
-        Map<String, List<URL>> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
-
-        if (revisionExportedURLsMap.isEmpty()) { // if empty, return immediately
-            return;
-        }
-
-        Set<String> existedRevisions = revisionExportedURLsMap.keySet(); // read-only
-        Set<String> currentRevisions = serviceInstances.stream()
-                .map(ServiceInstanceMetadataUtils::getExportedServicesRevision)
-                .collect(Collectors.toSet());
-        // staleRevisions = existedRevisions(copy) - currentRevisions
-        Set<String> staleRevisions = new HashSet<>(existedRevisions);
-        staleRevisions.removeAll(currentRevisions);
-        // remove exported URLs if staled
-        staleRevisions.forEach(revisionExportedURLsMap::remove);
-    }
-
-    /**
-     * Clone the exported URLs that are based on {@link #getTemplateExportedURLs(URL, ServiceInstance) the template URLs}
-     * from the some of {@link ServiceInstance service instances} with different revisions
-     *
-     * @param subscribedURL    the subscribed {@link URL url}
-     * @param serviceInstances {@link ServiceInstance service instances}
-     * @return non-null
-     */
-    private List<URL> cloneExportedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
-
-        if (isEmpty(serviceInstances)) {
-            return emptyList();
-        }
-
-        List<URL> clonedExportedURLs = new LinkedList<>();
-
-        serviceInstances.forEach(serviceInstance -> {
-
-            String host = serviceInstance.getHost();
-
-            getTemplateExportedURLs(subscribedURL, serviceInstance)
-                    .stream()
-                    .map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
-                    .map(templateURL -> templateURL.removeParameter(PID_KEY))
-                    .map(templateURL -> {
-                        String protocol = templateURL.getProtocol();
-                        int port = getProtocolPort(serviceInstance, protocol);
-                        if (Objects.equals(templateURL.getHost(), host)
-                                && Objects.equals(templateURL.getPort(), port)) { // use templateURL if equals
-                            return templateURL;
-                        }
-
-                        URLBuilder clonedURLBuilder = from(templateURL) // remove the parameters from the template URL
-                                .setHost(host)  // reset the host
-                                .setPort(port); // reset the port
-
-                        return clonedURLBuilder.build();
-                    })
-                    .forEach(clonedExportedURLs::add);
-        });
-        return clonedExportedURLs;
-    }
-
-
-    /**
-     * Select one {@link ServiceInstance} by {@link ServiceInstanceSelector the strategy} if there are more that one
-     * instances in order to avoid the hot spot appearing the some instance
-     *
-     * @param serviceInstances the {@link List list} of {@link ServiceInstance}
-     * @return <code>null</code> if <code>serviceInstances</code> is empty.
-     * @see ServiceInstanceSelector
-     */
-    private ServiceInstance selectServiceInstance(List<ServiceInstance> serviceInstances) {
-        int size = serviceInstances.size();
-        if (size == 0) {
-            return null;
-        } else if (size == 1) {
-            return serviceInstances.get(0);
-        }
-        ServiceInstanceSelector selector = getExtensionLoader(ServiceInstanceSelector.class).getAdaptiveExtension();
-        return selector.select(getUrl(), serviceInstances);
-    }
-
-    /**
-     * Get the template exported {@link URL urls} from the specified {@link ServiceInstance}.
-     * <p>
-     * First, put the revision {@link ServiceInstance service instance}
-     * associating {@link #getExportedURLs(ServiceInstance) exported URLs} into cache.
-     * <p>
-     * And then compare a new {@link ServiceInstance service instances'} revision with cached one,If they are equal,
-     * return the cached template {@link URL urls} immediately, or to get template {@link URL urls} that the provider
-     * {@link ServiceInstance instance} exported via executing {@link ##getExportedURLs(ServiceInstance) (ServiceInstance)}
-     * method.
-     * <p>
-     * Eventually, the retrieving result will be cached and returned.
-     *
-     * @param subscribedURL    the subscribed {@link URL url}
-     * @param selectedInstance the {@link ServiceInstance}
-     *                         associating with the {@link URL urls}
-     * @return non-null {@link List} of {@link URL urls}
-     */
-    private List<URL> getTemplateExportedURLs(URL subscribedURL, ServiceInstance selectedInstance) {
-
-        List<URL> exportedURLs = getRevisionExportedURLs(selectedInstance);
-
-        if (isEmpty(exportedURLs)) {
-            return emptyList();
-        }
-
-        return filterSubscribedURLs(subscribedURL, exportedURLs);
-    }
-
-    /**
-     * Initialize the URLs that the specified {@link ServiceInstance service instance} exported
-     *
-     * @param serviceInstance the {@link ServiceInstance} exports the Dubbo Services
-     * @return the {@link URL URLs} that the {@link ServiceInstance} exported, it's calculated from
-     * The invocation to remote {@link MetadataService}, or get from {@link #serviceRevisionExportedURLsCache cache} if
-     * {@link ServiceInstanceMetadataUtils#getExportedServicesRevision(ServiceInstance) revision} is hit
-     */
-    private List<URL> initializeRevisionExportedURLs(ServiceInstance serviceInstance) {
-
-        if (serviceInstance == null) {
-            return emptyList();
-        }
-
-        String serviceName = serviceInstance.getServiceName();
-        // get the revision from the specified {@link ServiceInstance}
-        String revision = getExportedServicesRevision(serviceInstance);
-
-        Map<String, List<URL>> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
-
-        List<URL> revisionExportedURLs = revisionExportedURLsMap.get(revision);
-
-        boolean firstGet = false;
-
-        if (revisionExportedURLs == null) { // The hit is missing in cache
-
-            if (!revisionExportedURLsMap.isEmpty()) { // The case is that current ServiceInstance with the different revision
-                if (logger.isWarnEnabled()) {
-                    logger.warn(format("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s" +
-                                    ", please make sure the service [name : %s] is changing or not.",
-                            serviceInstance.getId(),
-                            serviceInstance.getHost(),
-                            serviceInstance.getPort(),
-                            revision,
-                            serviceInstance.getServiceName()
-                    ));
-                }
-            } else { // Else, it's the first time to get the exported URLs
-                firstGet = true;
-            }
-
-            revisionExportedURLs = getExportedURLs(serviceInstance);
-
-            if (revisionExportedURLs != null) { // just allow the valid result into exportedURLsMap
-
-                revisionExportedURLsMap.put(revision, revisionExportedURLs);
-
-                if (logger.isDebugEnabled()) {
-                    logger.debug(format("Get the exported URLs[size : %s, first : %s] from the target service " +
-                                    "instance [id: %s , service : %s , host : %s , port : %s , revision : %s]",
-                            revisionExportedURLs.size(), firstGet,
-                            serviceInstance.getId(),
-                            serviceInstance.getServiceName(),
-                            serviceInstance.getHost(),
-                            serviceInstance.getPort(),
-                            revision
-                    ));
-                }
-            }
-        } else { // Else, The cache is hit
-            if (logger.isDebugEnabled()) {
-                logger.debug(format("Get the exported URLs[size : %s] from cache, the instance" +
-                                "[id: %s , service : %s , host : %s , port : %s , revision : %s]",
-                        revisionExportedURLs.size(),
-                        serviceInstance.getId(),
-                        serviceInstance.getServiceName(),
-                        serviceInstance.getHost(),
-                        serviceInstance.getPort(),
-                        revision
-                ));
-            }
-        }
-
-        return revisionExportedURLs;
-    }
-
     private Map<String, List<URL>> getRevisionExportedURLsMap(String serviceName) {
         return serviceRevisionExportedURLsCache.computeIfAbsent(serviceName, s -> new LinkedHashMap());
     }
 
     /**
-     * Get all services {@link URL URLs} that the specified {@link ServiceInstance service instance} exported from cache
-     *
-     * @param serviceInstance the {@link ServiceInstance} exports the Dubbo Services
-     * @return the same as {@link #getExportedURLs(ServiceInstance)}
-     */
-    private List<URL> getRevisionExportedURLs(ServiceInstance serviceInstance) {
-
-        if (serviceInstance == null) {
-            return emptyList();
-        }
-
-        String serviceName = serviceInstance.getServiceName();
-        // get the revision from the specified {@link ServiceInstance}
-        String revision = getExportedServicesRevision(serviceInstance);
-
-        return getRevisionExportedURLs(serviceName, revision);
-    }
-
-    private List<URL> getRevisionExportedURLs(String serviceName, String revision) {
-        return executeShared(() -> {
-            Map<String, List<URL>> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
-            List<URL> exportedURLs = revisionExportedURLsMap.get(revision);
-            // Get a copy from source in order to prevent the caller trying to change the cached data
-            return exportedURLs != null ? new ArrayList<>(exportedURLs) : emptyList();
-        });
-    }
-
-    /**
-     * Get all services {@link URL URLs} that the specified {@link ServiceInstance service instance} exported
-     * via the proxy to invoke the {@link MetadataService}
-     *
-     * @param providerServiceInstance the {@link ServiceInstance} exported the Dubbo services
-     * @return The possible result :
-     * <ol>
-     * <li>The normal result</li>
-     * <li>The empty result if the {@link ServiceInstance service instance} did not export yet</li>
-     * <li><code>null</code> if there is an invocation error on {@link MetadataService} proxy</li>
-     * </ol>
-     * @see MetadataServiceProxyFactory
-     * @see MetadataService
-     */
-    private List<URL> getExportedURLs(ServiceInstance providerServiceInstance) {
-
-        List<URL> exportedURLs = null;
-
-        String metadataStorageType = getMetadataStorageType(providerServiceInstance);
-
-        try {
-            MetadataService metadataService = MetadataServiceProxyFactory.getExtension(metadataStorageType)
-                    .getProxy(providerServiceInstance);
-            if (metadataService != null) {
-                SortedSet<String> urls = metadataService.getExportedURLs();
-                exportedURLs = toURLs(urls);
-            }
-        } catch (Throwable e) {
-            if (logger.isErrorEnabled()) {
-                logger.error(format("Failed to get the exported URLs from the target service instance[%s]",
-                        providerServiceInstance), e);
-            }
-            exportedURLs = null; // set the result to be null if failed to get
-        }
-        return exportedURLs;
-    }
-
-    private void executeExclusively(Runnable runnable) {
-        Lock writeLock = lock.writeLock();
-        writeLock.lock();
-        try {
-            runnable.run();
-        } finally {
-            writeLock.unlock();
-        }
-    }
-
-    private <T> T executeShared(Supplier<T> supplier) {
-        Lock readLock = lock.readLock();
-        readLock.lock();
-        try {
-            return supplier.get();
-        } finally {
-            readLock.unlock();
-        }
-    }
-
-    /**
      * Synthesize new subscribed {@link URL URLs} from old one
      *
      * @param subscribedURL
      * @param serviceInstances
      * @return non-null
      */
-    private Collection<? extends URL> synthesizeSubscribedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
-        return subscribedURLsSynthesizers.stream()
-                .filter(synthesizer -> synthesizer.supports(subscribedURL))
-                .map(synthesizer -> synthesizer.synthesize(subscribedURL, serviceInstances))
-                .flatMap(Collection::stream)
-                .collect(Collectors.toList());
-    }
+//    private Collection<? extends URL> synthesizeSubscribedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
+//        return subscribedURLsSynthesizers.stream()
+//                .filter(synthesizer -> synthesizer.supports(subscribedURL))
+//                .map(synthesizer -> synthesizer.synthesize(subscribedURL, serviceInstances))
+//                .flatMap(Collection::stream)
+//                .collect(Collectors.toList());
+//    }
 
     /**
      * 1.developer explicitly specifies the application name this interface belongs to
@@ -837,11 +352,9 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
      *
      * @param subscribedURL
      * @return
-     * @throws IllegalStateException If no service name is not found
      */
-    protected Set<String> getServices(URL subscribedURL) throws IllegalStateException {
-
-        Set<String> subscribedServices = null;
+    protected Set<String> getServices(URL subscribedURL) {
+        Set<String> subscribedServices = new LinkedHashSet<>();
 
         String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
         if (StringUtils.isNotEmpty(serviceNames)) {
@@ -850,21 +363,19 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
 
         if (isEmpty(subscribedServices)) {
             subscribedServices = findMappedServices(subscribedURL);
+            if (isEmpty(subscribedServices)) {
+                subscribedServices = getSubscribedServices();
+            }
         }
-
-        if (isEmpty(subscribedServices)) {
-            subscribedServices = getSubscribedServices();
-        }
-
-        if (isEmpty(subscribedServices)) {
-            throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + subscribedURL);
-        }
-
         return subscribedServices;
     }
 
     public static Set<String> parseServices(String literalServices) {
-        return splitToSet(literalServices, COMMA_SEPARATOR_CHAR, true);
+        return isBlank(literalServices) ? emptySet() :
+                unmodifiableSet(of(literalServices.split(","))
+                        .map(String::trim)
+                        .filter(StringUtils::isNotEmpty)
+                        .collect(toSet()));
     }
 
     /**
@@ -873,20 +384,21 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
      * @return non-null
      */
     public Set<String> getSubscribedServices() {
-        if (subscribedServices == null) {
-            subscribedServices = findMappedServices(getUrl());
-        }
         return subscribedServices;
     }
 
     /**
      * Get the mapped services name by the specified {@link URL}
      *
-     * @param url the specified {@link URL}
-     * @return empty {@link Set} if not found
+     * @param subscribedURL
+     * @return
      */
-    protected Set<String> findMappedServices(URL url) {
-        return serviceNameMapping.get(url);
+    protected Set<String> findMappedServices(URL subscribedURL) {
+        String serviceInterface = subscribedURL.getServiceInterface();
+        String group = subscribedURL.getParameter(GROUP_KEY);
+        String version = subscribedURL.getParameter(VERSION_KEY);
+        String protocol = subscribedURL.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL);
+        return serviceNameMapping.get(serviceInterface, group, version, protocol);
     }
 
     /**
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
index 896af9c..164d146 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
@@ -16,6 +16,8 @@
  */
 package org.apache.dubbo.registry.client;
 
+import org.apache.dubbo.common.URL;
+
 import java.io.Serializable;
 import java.util.Map;
 
@@ -115,4 +117,6 @@ public interface ServiceInstance extends Serializable {
      */
     boolean equals(Object another);
 
+    URL toURL(String protocol, String path, String interfaceName, String group, String version, String serviceKey);
+
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/ServiceInstancesChangedEvent.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/ServiceInstancesChangedEvent.java
index 4b91265..b206eb7 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/ServiceInstancesChangedEvent.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/ServiceInstancesChangedEvent.java
@@ -20,9 +20,9 @@ import org.apache.dubbo.event.Event;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 
-import java.util.Collection;
+import java.util.List;
 
-import static java.util.Collections.unmodifiableCollection;
+import static java.util.Collections.unmodifiableList;
 
 /**
  * An event raised after the {@link ServiceInstance instances} of one service has been changed.
@@ -34,17 +34,17 @@ public class ServiceInstancesChangedEvent extends Event {
 
     private final String serviceName;
 
-    private final Collection<ServiceInstance> serviceInstances;
+    private final List<ServiceInstance> serviceInstances;
 
     /**
      * @param serviceName      The name of service that was changed
      * @param serviceInstances all {@link ServiceInstance service instances}
      * @throws IllegalArgumentException if source is null.
      */
-    public ServiceInstancesChangedEvent(String serviceName, Collection<ServiceInstance> serviceInstances) {
+    public ServiceInstancesChangedEvent(String serviceName, List<ServiceInstance> serviceInstances) {
         super(serviceName);
         this.serviceName = serviceName;
-        this.serviceInstances = unmodifiableCollection(serviceInstances);
+        this.serviceInstances = unmodifiableList(serviceInstances);
     }
 
     /**
@@ -58,7 +58,7 @@ public class ServiceInstancesChangedEvent extends Event {
     /**
      * @return all {@link ServiceInstance service instances}
      */
-    public Collection<ServiceInstance> getServiceInstances() {
+    public List<ServiceInstance> getServiceInstances() {
         return serviceInstances;
     }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 1028301..33f0bcc 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -16,11 +16,30 @@
  */
 package org.apache.dubbo.registry.client.event.listener;
 
+import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.event.ConditionalEventListener;
 import org.apache.dubbo.event.EventListener;
+import org.apache.dubbo.metadata.MetadataInfo;
+import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo;
+import org.apache.dubbo.metadata.MetadataService;
+import org.apache.dubbo.metadata.MetadataUtils;
+import org.apache.dubbo.metadata.store.RemoteMetadataServiceImpl;
+import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
 
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
 import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
+import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
 
 /**
  * The Service Discovery Changed {@link EventListener Event Listener}
@@ -32,6 +51,16 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
 
     private final String serviceName;
 
+    private List<ServiceInstance> instances;
+
+    private Map<String, List<ServiceInstance>> revisionToInstances;
+
+    private Map<String, MetadataInfo> revisionToMetadata;
+
+    private Map<String, Set<String>> serviceToRevisions;
+
+    private Map<String, List<ServiceInstance>> serviceToInstances;
+
     protected ServiceInstancesChangedListener(String serviceName) {
         this.serviceName = serviceName;
     }
@@ -41,7 +70,87 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
      *
      * @param event {@link ServiceInstancesChangedEvent}
      */
-    public abstract void onEvent(ServiceInstancesChangedEvent event);
+    public void onEvent(ServiceInstancesChangedEvent event) {
+        instances = event.getServiceInstances();
+
+        Map<String, List<ServiceInstance>> localRevisionToInstances = new HashMap<>();
+        Map<String, MetadataInfo> localRevisionToMetadata = new HashMap<>();
+        Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
+        for (ServiceInstance instance : instances) {
+            String revision = getExportedServicesRevision(instance);
+            Collection<ServiceInstance> rInstances = localRevisionToInstances.computeIfAbsent(revision, r -> new ArrayList<>());
+            rInstances.add(instance);
+
+            MetadataInfo metadata = revisionToMetadata.get(revision);
+            if (metadata != null) {
+                localRevisionToMetadata.put(revision, metadata);
+            } else {
+                metadata = getMetadataInfo(instance);
+                localRevisionToMetadata.put(revision, getMetadataInfo(instance));
+            }
+            parse(revision, metadata, localServiceToRevisions);
+        }
+
+        this.revisionToInstances = localRevisionToInstances;
+        this.revisionToMetadata = localRevisionToMetadata;
+        this.serviceToRevisions = localServiceToRevisions;
+
+        Map<String, List<ServiceInstance>> localServiceToInstances = new HashMap<>();
+        for (String serviceKey : localServiceToRevisions.keySet()) {
+            if (CollectionUtils.equals(localRevisionToInstances.keySet(), localServiceToRevisions.get(serviceKey))) {
+                localServiceToInstances.put(serviceKey, instances);
+            }
+        }
+
+        this.serviceToInstances = localServiceToInstances;
+    }
+
+    public List<ServiceInstance> getInstances(String serviceKey) {
+        if (serviceToInstances.containsKey(serviceKey)) {
+            return serviceToInstances.get(serviceKey);
+        }
+
+        Set<String> revisions = serviceToRevisions.get(serviceKey);
+        List<ServiceInstance> allInstances = new LinkedList<>();
+        for (String r : revisions) {
+            allInstances.addAll(revisionToInstances.get(r));
+        }
+        return allInstances;
+    }
+
+    private Map<String, Set<String>> parse(String revision, MetadataInfo metadata, Map<String, Set<String>> localServiceToRevisions) {
+        Map<String, ServiceInfo> serviceInfos = metadata.getServices();
+        for (Map.Entry<String, ServiceInfo> serviceInfo : serviceInfos.entrySet()) {
+            String serviceKey = serviceInfo.getValue().getServiceKey();
+            Set<String> set = localServiceToRevisions.computeIfAbsent(serviceKey, k -> new HashSet<>());
+            set.add(revision);
+        }
+
+        return localServiceToRevisions;
+    }
+
+    private MetadataInfo getMetadataInfo(ServiceInstance instance) {
+        String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
+        String cluster = ServiceInstanceMetadataUtils.getRemoteCluster(instance);
+
+        MetadataInfo metadataInfo;
+        try {
+            if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
+                RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getConsumerRemoteMetadataService(consumerUrl.getServiceKey());
+                metadataInfo = remoteMetadataService.getMetadata(serviceName, revision);
+            } else {
+                MetadataService localMetadataService = MetadataUtils.getLocalMetadataService();
+                metadataInfo = localMetadataService.getMetadataInfo();
+            }
+        } catch (Exception e) {
+            // TODO, load metadata backup
+            metadataInfo = null;
+        }
+        return metadataInfo;
+    }
+
+
+    protected abstract void notifyAddresses();
 
     /**
      * Get the correlative service name
@@ -57,7 +166,7 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
      * @return If service name matches, return <code>true</code>, or <code>false</code>
      */
     public final boolean accept(ServiceInstancesChangedEvent event) {
-        return Objects.equals(getServiceName(), event.getServiceName());
+        return Objects.equals(serviceName, event.getServiceName());
     }
 
     @Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 01bde1d..16f5446 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -31,6 +31,8 @@ import org.apache.dubbo.common.utils.UrlUtils;
 import org.apache.dubbo.registry.AddressListener;
 import org.apache.dubbo.registry.NotifyListener;
 import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
 import org.apache.dubbo.remoting.Constants;
 import org.apache.dubbo.rpc.Invocation;
 import org.apache.dubbo.rpc.Invoker;
@@ -53,6 +55,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -68,6 +71,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
 import static org.apache.dubbo.common.constants.RegistryConstants.APP_DYNAMIC_CONFIGURATORS_CATEGORY;
 import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
 import static org.apache.dubbo.common.constants.RegistryConstants.COMPATIBLE_CONFIG_KEY;
@@ -133,6 +137,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
     private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
     private ReferenceConfigurationListener serviceConfigurationListener;
 
+    private Set<ServiceInstancesChangedListener> serviceListeners;
 
     public RegistryDirectory(Class<T> serviceType, URL url) {
         super(url);
@@ -151,6 +156,34 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
         this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
         String group = directoryUrl.getParameter(GROUP_KEY, "");
         this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
+
+        this.serviceListeners = new HashSet<>();
+    }
+
+    @Override
+    public void addServiceListener(ServiceInstancesChangedListener instanceListener) {
+        this.serviceListeners.add(instanceListener);
+    }
+
+    @Override
+    public void notifyServiceInstances() {
+        List<URL> urls = new LinkedList<>();
+        for (ServiceInstancesChangedListener listener : serviceListeners) {
+            List<ServiceInstance> instances = listener.getInstances(serviceKey);
+            for (ServiceInstance instance : instances) {
+                // FIXME, the right protocol? the right path?
+                urls.add(
+                        instance.toURL(
+                                "dubbo",
+                                serviceType.getName(),
+                                serviceType.getName(),
+                                queryMap.get(GROUP_KEY),
+                                queryMap.get(VERSION_KEY),
+                                serviceKey)
+                );
+            }
+        }
+        notify(urls);
     }
 
     private URL turnRegistryUrlToConsumerUrl(URL url) {