You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/09/28 09:50:56 UTC

[dubbo] branch master updated: Nacos unsub support and RegistryDirectory destroy (#8907)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/master by this push:
     new 01ccaf4  Nacos unsub support and RegistryDirectory destroy (#8907)
01ccaf4 is described below

commit 01ccaf46c0d0ea02395f5a7bffe443292ba06ae3
Author: Owen.Cai <89...@qq.com>
AuthorDate: Tue Sep 28 17:50:45 2021 +0800

    Nacos unsub support and RegistryDirectory destroy (#8907)
    
    * nacos_unsub_support & destroy RegistryDirectory
    
    * delete idea import use *
    
    * nacos_unsub_support & destroy RegistryDirectory(#8895)
    
    * delete idea import use *
    
    * change description for code
---
 .../support/wrapper/MockClusterInvoker.java        |  5 ++
 .../registry/integration/DynamicDirectory.java     |  3 +-
 .../registry/nacos/NacosNamingServiceWrapper.java  |  4 ++
 .../apache/dubbo/registry/nacos/NacosRegistry.java | 64 +++++++++++++++++-----
 4 files changed, 61 insertions(+), 15 deletions(-)

diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
index 51d0a49..46bab30 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
@@ -75,6 +75,11 @@ public class MockClusterInvoker<T> implements ClusterInvoker<T> {
 
     @Override
     public void destroy() {
+        //directory need destroy, because do not have directory manager, so who get the directory need destroy
+        //directory need support destroy multi times
+        //other ClusterInvoker maybe also have directory, so it also destroy
+        this.directory.destroy();
+
         this.invoker.destroy();
     }
 
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 84bb67a..f5b64ff 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -238,7 +238,8 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
         // unsubscribe.
         try {
             if (getSubscribeConsumerurl() != null && registry != null && registry.isAvailable()) {
-                registry.unsubscribe(getSubscribeConsumerurl(), this);
+                //overwrite by child, so need call function
+                unSubscribe(getSubscribeConsumerurl());
             }
         } catch (Throwable t) {
             logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t);
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
index 6aa2bdf..18c301b 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
@@ -49,6 +49,10 @@ public class NacosNamingServiceWrapper {
         namingService.subscribe(handleInnerSymbol(serviceName), group, eventListener);
     }
 
+    public void unsubscribe(String serviceName, String group, EventListener eventListener) throws NacosException {
+        namingService.unsubscribe(handleInnerSymbol(serviceName), group, eventListener);
+    }
+
     public List<Instance> getAllInstances(String serviceName, String group) throws NacosException {
         return namingService.getAllInstances(handleInnerSymbol(serviceName), group);
     }
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
index a235013..9987986 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
@@ -48,6 +48,8 @@ import java.util.Set;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.stream.Collectors;
 import java.util.Collections;
 
@@ -122,6 +124,8 @@ public class NacosRegistry extends FailbackRegistry {
 
     private final NacosNamingServiceWrapper namingService;
 
+    private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, EventListener>> nacosListeners = new ConcurrentHashMap<>();
+
     public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) {
         super(url);
         this.namingService = namingService;
@@ -240,6 +244,19 @@ public class NacosRegistry extends FailbackRegistry {
         if (isAdminProtocol(url)) {
             shutdownServiceNamesLookup();
         }
+        else {
+            Set<String> serviceNames = getServiceNames(url, listener);
+
+            doUnsubscribe(url, listener, serviceNames);
+        }
+    }
+
+    private void doUnsubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
+        execute(namingService -> {
+            for (String serviceName : serviceNames) {
+                unsubscribeEventListener(serviceName, url, listener);
+            }
+        });
     }
 
     private void shutdownServiceNamesLookup() {
@@ -495,26 +512,45 @@ public class NacosRegistry extends FailbackRegistry {
 
     private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
             throws NacosException {
-        EventListener eventListener = event -> {
-            if (event instanceof NamingEvent) {
-                NamingEvent e = (NamingEvent) event;
-                List<Instance> instances = e.getInstances();
+        ConcurrentMap<NotifyListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
+        EventListener nacosListener = listeners.computeIfAbsent(listener, k -> {
+            EventListener eventListener = event -> {
+                if (event instanceof NamingEvent) {
+                    NamingEvent e = (NamingEvent) event;
+                    List<Instance> instances = e.getInstances();
 
 
-                if (isServiceNamesWithCompatibleMode(url)) {
+                    if (isServiceNamesWithCompatibleMode(url)) {
 
-                    // Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
-                    // in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
-                    NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
-                    instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
-                }
+                        // Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
+                        // in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
+                        NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
+                        instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
+                    }
 
-                notifySubscriber(url, listener, instances);
-            }
-        };
+                    notifySubscriber(url, listener, instances);
+                }
+            };
+            return eventListener;
+        });
         namingService.subscribe(serviceName,
                 getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
-                eventListener);
+                nacosListener);
+    }
+
+    private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
+            throws NacosException {
+        ConcurrentMap<NotifyListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
+        if(notifyListenerEventListenerConcurrentMap == null){
+            return;
+        }
+        EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener);
+        if(nacosListener == null){
+            return;
+        }
+        namingService.unsubscribe(serviceName,
+                getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
+                nacosListener);
     }
 
     /**