You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2018/11/09 08:57:13 UTC

[incubator-dubbo] branch dev-metadata updated: Fix subscribe and parse problems with override protocol.

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch dev-metadata
in repository https://gitbox.apache.org/repos/asf/incubator-dubbo.git


The following commit(s) were added to refs/heads/dev-metadata by this push:
     new 42fac3e  Fix subscribe and parse problems with override protocol.
42fac3e is described below

commit 42fac3eae1cff284d873423f2fe3a0b8bdf582bb
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Nov 9 16:56:51 2018 +0800

    Fix subscribe and parse problems with override protocol.
---
 .../cluster/configurator/AbstractConfigurator.java |   4 +
 .../org/apache/dubbo/common/utils/UrlUtils.java    |   5 +-
 .../META-INF/spring/dubbo-demo-consumer.xml        |   3 +-
 .../registry/integration/RegistryProtocol.java     | 149 +++++++++++++--------
 .../registry/integration/parser/ConfigParser.java  |  11 +-
 .../parser/model/ConfiguratorConfig.java           |   9 ++
 .../registry/support/ProviderConsumerRegTable.java |  34 +++--
 .../registry/support/ProviderInvokerWrapper.java   |   9 ++
 8 files changed, 151 insertions(+), 73 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/configurator/AbstractConfigurator.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/configurator/AbstractConfigurator.java
index 1fdaec4..642b7da 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/configurator/AbstractConfigurator.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/configurator/AbstractConfigurator.java
@@ -79,6 +79,10 @@ public abstract class AbstractConfigurator implements Configurator {
                 conditionKeys.add(Constants.CHECK_KEY);
                 conditionKeys.add(Constants.DYNAMIC_KEY);
                 conditionKeys.add(Constants.ENABLED_KEY);
+                conditionKeys.add(Constants.GROUP_KEY);
+                conditionKeys.add(Constants.VERSION_KEY);
+                conditionKeys.add(Constants.APPLICATION_KEY);
+                conditionKeys.add(Constants.SIDE_KEY);
                 for (Map.Entry<String, String> entry : configuratorUrl.getParameters().entrySet()) {
                     String key = entry.getKey();
                     String value = entry.getValue();
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
index f9e6f4e..65381ab 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/utils/UrlUtils.java
@@ -361,7 +361,10 @@ public class UrlUtils {
     public static boolean isMatch(URL consumerUrl, URL providerUrl) {
         String consumerInterface = consumerUrl.getServiceInterface();
         String providerInterface = providerUrl.getServiceInterface();
-        if (!(Constants.ANY_VALUE.equals(consumerInterface) || StringUtils.isEquals(consumerInterface, providerInterface))) {
+        //FIXME accept providerUrl with '*' as interface name, after carefully thought about all possible scenarios I think it's ok to add this condition.
+        if (!(Constants.ANY_VALUE.equals(consumerInterface)
+                || Constants.ANY_VALUE.equals(providerInterface)
+                || StringUtils.isEquals(consumerInterface, providerInterface))) {
             return false;
         }
 
diff --git a/dubbo-demo/dubbo-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml b/dubbo-demo/dubbo-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml
index 241e605..108b566 100644
--- a/dubbo-demo/dubbo-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml
+++ b/dubbo-demo/dubbo-demo-consumer/src/main/resources/META-INF/spring/dubbo-demo-consumer.xml
@@ -32,6 +32,7 @@
 
     <!-- generate proxy for the remote service, then demoService can be used in the same way as the
     local regular interface -->
-    <dubbo:reference id="demoService" check="false" interface="org.apache.dubbo.demo.DemoService" version="1.0.4" group="dd-test"/>
+    <dubbo:reference url="dubbo://127.0.0.1:20880/org.apache.dubbo.demo.DemoService" id="demoService" check="false"
+                     interface="org.apache.dubbo.demo.DemoService" version="1.0.4" group="dd-test"/>
 
 </beans>
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index 4fbf5c9..6c0f8d1 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -148,17 +148,21 @@ public class RegistryProtocol implements Protocol {
         URL registryUrl = getRegistryUrl(originInvoker);
         // url to export locally
         URL providerUrl = getProviderUrl(originInvoker);
-        providerUrl = overrideUrlWithConfig(providerUrl);
 
+        // Subscribe the override data
+        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
+        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
+        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
+        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
+
+        providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
         //export invoker
         final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
 
         // url to registry
         final Registry registry = getRegistry(originInvoker);
         final URL registeredProviderUrl = getRegistedProviderUrl(providerUrl, registryUrl);
-
         ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
-
         //to judge if we need to delay publish
         boolean register = registeredProviderUrl.getParameter("register", true);
         if (register) {
@@ -166,29 +170,29 @@ public class RegistryProtocol implements Protocol {
             ProviderConsumerRegTable.getProviderWrapper(originInvoker).setReg(true);
         }
 
-        // Subscribe the override data
-        // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call the same service. Because the subscribed is cached key with the name of the service, it causes the subscription information to cover.
-        final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registeredProviderUrl);
-        final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
-        overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
 
-        dynamicConfiguration.addListener(overrideSubscribeUrl.getServiceKey() + Constants.CONFIGURATORS_SUFFIX, overrideSubscribeListener);
+        exporter.setRegisterUrl(registeredProviderUrl);
+        exporter.setSubscribeUrl(overrideSubscribeUrl);
         //Ensure that a new exporter instance is returned every time export
-        return new DestroyableExporter<T>(exporter, originInvoker, overrideSubscribeUrl, registeredProviderUrl);
+        return new DestroyableExporter<>(exporter);
     }
 
-    private <T> URL overrideUrlWithConfig(URL providerUrl) {
-        List<Configurator> dynamicConfigurators = new LinkedList<>();
-        String appRawConfig = dynamicConfiguration.getConfig(providerUrl.getParameter(Constants.APPLICATION_KEY) + Constants.CONFIGURATORS_SUFFIX);
+    private <T> URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) {
+        List<Configurator> configurators = new LinkedList<>();
+        String appRawConfig = dynamicConfiguration.getConfig(providerUrl.getParameter(Constants.APPLICATION_KEY) + Constants.CONFIGURATORS_SUFFIX, listener);
         if (!StringUtils.isEmpty(appRawConfig)) {
-            dynamicConfigurators.addAll(RegistryDirectory.configToConfiguratiors(appRawConfig));
+            List<Configurator> appDynamicConfigurators = RegistryDirectory.configToConfiguratiors(appRawConfig);
+            listener.setAppDynamicConfigurators(appDynamicConfigurators);
+            configurators.addAll(appDynamicConfigurators);
         }
-        String rawConfig = dynamicConfiguration.getConfig(providerUrl.getServiceKey() + Constants.CONFIGURATORS_SUFFIX);
+        String rawConfig = dynamicConfiguration.getConfig(providerUrl.getServiceKey() + Constants.CONFIGURATORS_SUFFIX, listener);
         if (!StringUtils.isEmpty(rawConfig)) {
-            dynamicConfigurators.addAll(RegistryDirectory.configToConfiguratiors(rawConfig));
+            List<Configurator> dynamicConfigurators = RegistryDirectory.configToConfiguratiors(rawConfig);
+            listener.setDynamicConfigurators(dynamicConfigurators);
+            configurators.addAll(dynamicConfigurators);
         }
-        providerUrl = getConfigedInvokerUrl(dynamicConfigurators, providerUrl);
+        providerUrl = getConfigedInvokerUrl(configurators, providerUrl);
         return providerUrl;
     }
 
@@ -210,6 +214,23 @@ public class RegistryProtocol implements Protocol {
         return exporter;
     }
 
+    public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
+        // update local exporter
+        ExporterChangeableWrapper exporter = doChangeLocalExport(originInvoker, newInvokerUrl);
+        // update registry
+        URL registryUrl = getRegistryUrl(originInvoker);
+        final URL registeredProviderUrl = getRegistedProviderUrl(newInvokerUrl, registryUrl);
+
+        //decide if we need to re-publish
+        boolean shouldReregister = ProviderConsumerRegTable.getProviderWrapper(originInvoker).isReg();
+        ProviderConsumerRegTable.registerProvider(originInvoker, registryUrl, registeredProviderUrl);
+        if (shouldReregister) {
+            register(registryUrl, registeredProviderUrl);
+        }
+
+        exporter.setRegisterUrl(registeredProviderUrl);
+    }
+
     /**
      * Reexport the invoker of the modified url
      *
@@ -217,7 +238,7 @@ public class RegistryProtocol implements Protocol {
      * @param newInvokerUrl
      */
     @SuppressWarnings("unchecked")
-    private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
+    private <T> ExporterChangeableWrapper doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
         String key = getCacheKey(originInvoker);
         final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
         if (exporter == null) {
@@ -226,6 +247,7 @@ public class RegistryProtocol implements Protocol {
             final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
             exporter.setExporter(protocol.export(invokerDelegete));
         }
+        return exporter;
     }
 
     /**
@@ -416,18 +438,10 @@ public class RegistryProtocol implements Protocol {
 
     static private class DestroyableExporter<T> implements Exporter<T> {
 
-        public static final ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("Exporter-Unexport", true));
-
         private Exporter<T> exporter;
-        private Invoker<T> originInvoker;
-        private URL subscribeUrl;
-        private URL registerUrl;
 
-        public DestroyableExporter(Exporter<T> exporter, Invoker<T> originInvoker, URL subscribeUrl, URL registerUrl) {
+        public DestroyableExporter(Exporter<T> exporter) {
             this.exporter = exporter;
-            this.originInvoker = originInvoker;
-            this.subscribeUrl = subscribeUrl;
-            this.registerUrl = registerUrl;
         }
 
         @Override
@@ -437,34 +451,7 @@ public class RegistryProtocol implements Protocol {
 
         @Override
         public void unexport() {
-            Registry registry = RegistryProtocol.INSTANCE.getRegistry(originInvoker);
-            try {
-                registry.unregister(registerUrl);
-            } catch (Throwable t) {
-                logger.warn(t.getMessage(), t);
-            }
-            try {
-                NotifyListener listener = RegistryProtocol.INSTANCE.overrideListeners.remove(subscribeUrl);
-                registry.unsubscribe(subscribeUrl, listener);
-            } catch (Throwable t) {
-                logger.warn(t.getMessage(), t);
-            }
-
-            executor.submit(new Runnable() {
-                @Override
-                public void run() {
-                    try {
-                        int timeout = ConfigUtils.getServerShutdownTimeout();
-                        if (timeout > 0) {
-                            logger.info("Waiting " + timeout + "ms for registry to notify all consumers before unexport. Usually, this is called when you use dubbo API");
-                            Thread.sleep(timeout);
-                        }
-                        exporter.unexport();
-                    } catch (Throwable t) {
-                        logger.warn(t.getMessage(), t);
-                    }
-                }
-            });
+            exporter.unexport();
         }
     }
 
@@ -487,6 +474,14 @@ public class RegistryProtocol implements Protocol {
             this.originInvoker = originalInvoker;
         }
 
+        public void setDynamicConfigurators(List<Configurator> dynamicConfigurators) {
+            this.dynamicConfigurators = dynamicConfigurators;
+        }
+
+        public void setAppDynamicConfigurators(List<Configurator> appDynamicConfigurators) {
+            this.appDynamicConfigurators = appDynamicConfigurators;
+        }
+
         /**
          * @param urls The list of registered information , is always not empty, The meaning is the same as the return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.
          */
@@ -537,7 +532,7 @@ public class RegistryProtocol implements Protocol {
             newUrl = getConfigedInvokerUrl(appDynamicConfigurators, newUrl);
             newUrl = getConfigedInvokerUrl(dynamicConfigurators, newUrl);
             if (!currentUrl.equals(newUrl)) {
-                RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);
+                RegistryProtocol.this.reExport(originInvoker, newUrl);
                 logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl);
             }
         }
@@ -599,8 +594,12 @@ public class RegistryProtocol implements Protocol {
      */
     private class ExporterChangeableWrapper<T> implements Exporter<T> {
 
+        private final ExecutorService executor = Executors.newSingleThreadExecutor(new NamedThreadFactory("Exporter-Unexport", true));
+
         private final Invoker<T> originInvoker;
         private Exporter<T> exporter;
+        private URL subscribeUrl;
+        private URL registerUrl;
 
         public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
             this.exporter = exporter;
@@ -624,7 +623,43 @@ public class RegistryProtocol implements Protocol {
         public void unexport() {
             String key = getCacheKey(this.originInvoker);
             bounds.remove(key);
-            exporter.unexport();
+
+            Registry registry = RegistryProtocol.INSTANCE.getRegistry(originInvoker);
+            try {
+                registry.unregister(registerUrl);
+            } catch (Throwable t) {
+                logger.warn(t.getMessage(), t);
+            }
+            try {
+                NotifyListener listener = RegistryProtocol.INSTANCE.overrideListeners.remove(subscribeUrl);
+                registry.unsubscribe(subscribeUrl, listener);
+            } catch (Throwable t) {
+                logger.warn(t.getMessage(), t);
+            }
+
+            executor.submit(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        int timeout = ConfigUtils.getServerShutdownTimeout();
+                        if (timeout > 0) {
+                            logger.info("Waiting " + timeout + "ms for registry to notify all consumers before unexport. Usually, this is called when you use dubbo API");
+                            Thread.sleep(timeout);
+                        }
+                        exporter.unexport();
+                    } catch (Throwable t) {
+                        logger.warn(t.getMessage(), t);
+                    }
+                }
+            });
+        }
+
+        public void setSubscribeUrl(URL subscribeUrl) {
+            this.subscribeUrl = subscribeUrl;
+        }
+
+        public void setRegisterUrl(URL registerUrl) {
+            this.registerUrl = registerUrl;
         }
     }
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/parser/ConfigParser.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/parser/ConfigParser.java
index 961b106..5483e3e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/parser/ConfigParser.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/parser/ConfigParser.java
@@ -38,6 +38,7 @@ public class ConfigParser {
     public static List<URL> parseConfigurators(String rawConfig) {
         List<URL> urls = new ArrayList<>();
         ConfiguratorConfig configuratorConfig = parseObject(rawConfig, ConfiguratorConfig.class);
+
         String scope = configuratorConfig.getScope();
         List<ConfigItem> items = configuratorConfig.getConfigs();
 
@@ -46,10 +47,16 @@ public class ConfigParser {
                     appItemToUrls(item, configuratorConfig.getKey())
                             .stream()
                             .map(u -> u.addParameter(Constants.CATEGORY_KEY, Constants.APP_DYNAMIC_CONFIGURATORS_CATEGORY))
+                            .map(u -> u.addParameter(Constants.ENABLED_KEY, configuratorConfig.isEnabled()))
                             .collect(Collectors.toList())
             ));
         } else { // servcie scope by default.
-            items.forEach(item -> urls.addAll(serviceItemToUrls(item, configuratorConfig.getKey())));
+            items.forEach(item -> urls.addAll(
+                    serviceItemToUrls(item, configuratorConfig.getKey())
+                            .stream()
+                            .map(u -> u.addParameter(Constants.ENABLED_KEY, configuratorConfig.isEnabled()))
+                            .collect(Collectors.toList()))
+            );
         }
         return urls;
     }
@@ -105,7 +112,7 @@ public class ConfigParser {
                 services = new ArrayList<>();
             }
             if (services.size() == 0) {
-                services.add("*/*:*");
+                services.add("*");
             }
             for (String s : services) {
                 urls.add(URL.valueOf(urlStr + appendService(s) + toParameterString(item) + "&application=" + appKey));
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/parser/model/ConfiguratorConfig.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/parser/model/ConfiguratorConfig.java
index db7aab7..3265b21 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/parser/model/ConfiguratorConfig.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/parser/model/ConfiguratorConfig.java
@@ -27,6 +27,7 @@ public class ConfiguratorConfig {
 
     private String scope;
     private String key;
+    private boolean enabled = true;
     private List<ConfigItem> configs;
 
     public String getScope() {
@@ -45,6 +46,14 @@ public class ConfiguratorConfig {
         this.key = key;
     }
 
+    public boolean isEnabled() {
+        return enabled;
+    }
+
+    public void setEnabled(boolean enabled) {
+        this.enabled = enabled;
+    }
+
     public List<ConfigItem> getConfigs() {
         return configs;
     }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/ProviderConsumerRegTable.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/ProviderConsumerRegTable.java
index e36e734..9bf1e3f 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/ProviderConsumerRegTable.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/ProviderConsumerRegTable.java
@@ -23,33 +23,44 @@ import org.apache.dubbo.registry.integration.RegistryDirectory;
 import org.apache.dubbo.rpc.Invoker;
 
 import java.util.Collections;
+import java.util.HashSet;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
 
 /**
  * @date 2017/11/23
  */
 public class ProviderConsumerRegTable {
-    public static ConcurrentHashMap<String, Set<ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<String, Set<ProviderInvokerWrapper>>();
-    public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>>();
+    public static ConcurrentHashMap<String, ConcurrentMap<Invoker, ProviderInvokerWrapper>> providerInvokers = new ConcurrentHashMap<>();
+    public static ConcurrentHashMap<String, Set<ConsumerInvokerWrapper>> consumerInvokers = new ConcurrentHashMap<>();
 
     public static void registerProvider(Invoker invoker, URL registryUrl, URL providerUrl) {
         ProviderInvokerWrapper wrapperInvoker = new ProviderInvokerWrapper(invoker, registryUrl, providerUrl);
         String serviceUniqueName = providerUrl.getServiceKey();
-        Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
+        ConcurrentMap<Invoker, ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
         if (invokers == null) {
-            providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashSet<ProviderInvokerWrapper>());
+            providerInvokers.putIfAbsent(serviceUniqueName, new ConcurrentHashMap<>());
             invokers = providerInvokers.get(serviceUniqueName);
         }
-        invokers.add(wrapperInvoker);
+        invokers.put(invoker, wrapperInvoker);
     }
 
-    public static Set<ProviderInvokerWrapper> getProviderInvoker(String serviceUniqueName) {
+    /*public static ProviderInvokerWrapper removeProviderWrapper(Invoker invoker, URL providerUrl) {
+        String serviceUniqueName = providerUrl.getServiceKey();
         Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
         if (invokers == null) {
+            return null;
+        }
+        return invokers.remove(new ProviderInvokerWrapper(invoker, null, null));
+    }*/
+
+    public static Set<ProviderInvokerWrapper> getProviderInvoker(String serviceUniqueName) {
+        ConcurrentMap<Invoker, ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
+        if (invokers == null) {
             return Collections.emptySet();
         }
-        return invokers;
+        return new HashSet<>(invokers.values());
     }
 
     public static ProviderInvokerWrapper getProviderWrapper(Invoker invoker) {
@@ -58,15 +69,14 @@ public class ProviderConsumerRegTable {
             providerUrl = URL.valueOf(providerUrl.getParameterAndDecoded(Constants.EXPORT_KEY));
         }
         String serviceUniqueName = providerUrl.getServiceKey();
-        Set<ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
+        ConcurrentMap<Invoker, ProviderInvokerWrapper> invokers = providerInvokers.get(serviceUniqueName);
         if (invokers == null) {
             return null;
         }
 
-        for (ProviderInvokerWrapper providerWrapper : invokers) {
-            Invoker providerInvoker = providerWrapper.getInvoker();
-            if (providerInvoker == invoker) {
-                return providerWrapper;
+        for (Invoker inv : invokers.keySet()) {
+            if (inv == invoker) {
+                return invokers.get(inv);
             }
         }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/ProviderInvokerWrapper.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/ProviderInvokerWrapper.java
index 88177ab..ce079b7 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/ProviderInvokerWrapper.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/ProviderInvokerWrapper.java
@@ -87,4 +87,13 @@ public class ProviderInvokerWrapper<T> implements Invoker {
     public void setReg(boolean reg) {
         isReg = reg;
     }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o == null || !(o instanceof ProviderInvokerWrapper)) {
+            return false;
+        }
+        ProviderInvokerWrapper other = (ProviderInvokerWrapper) o;
+        return other.getInvoker().equals(this.getInvoker());
+    }
 }