You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/09/28 09:50:56 UTC
[dubbo] branch master updated: Nacos unsub support and
RegistryDirectory destroy (#8907)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/master by this push:
new 01ccaf4 Nacos unsub support and RegistryDirectory destroy (#8907)
01ccaf4 is described below
commit 01ccaf46c0d0ea02395f5a7bffe443292ba06ae3
Author: Owen.Cai <89...@qq.com>
AuthorDate: Tue Sep 28 17:50:45 2021 +0800
Nacos unsub support and RegistryDirectory destroy (#8907)
* nacos_unsub_support & destroy RegistryDirectory
* delete idea import use *
* nacos_unsub_support & destroy RegistryDirectory(#8895)
* delete idea import use *
* change description for code
---
.../support/wrapper/MockClusterInvoker.java | 5 ++
.../registry/integration/DynamicDirectory.java | 3 +-
.../registry/nacos/NacosNamingServiceWrapper.java | 4 ++
.../apache/dubbo/registry/nacos/NacosRegistry.java | 64 +++++++++++++++++-----
4 files changed, 61 insertions(+), 15 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
index 51d0a49..46bab30 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
@@ -75,6 +75,11 @@ public class MockClusterInvoker<T> implements ClusterInvoker<T> {
@Override
public void destroy() {
+ //directory need destroy, because do not have directory manager, so who get the directory need destroy
+ //directory need support destroy multi times
+ //other ClusterInvoker maybe also have directory, so it also destroy
+ this.directory.destroy();
+
this.invoker.destroy();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 84bb67a..f5b64ff 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -238,7 +238,8 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
// unsubscribe.
try {
if (getSubscribeConsumerurl() != null && registry != null && registry.isAvailable()) {
- registry.unsubscribe(getSubscribeConsumerurl(), this);
+ //overwrite by child, so need call function
+ unSubscribe(getSubscribeConsumerurl());
}
} catch (Throwable t) {
logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t);
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
index 6aa2bdf..18c301b 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosNamingServiceWrapper.java
@@ -49,6 +49,10 @@ public class NacosNamingServiceWrapper {
namingService.subscribe(handleInnerSymbol(serviceName), group, eventListener);
}
+ public void unsubscribe(String serviceName, String group, EventListener eventListener) throws NacosException {
+ namingService.unsubscribe(handleInnerSymbol(serviceName), group, eventListener);
+ }
+
public List<Instance> getAllInstances(String serviceName, String group) throws NacosException {
return namingService.getAllInstances(handleInnerSymbol(serviceName), group);
}
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
index a235013..9987986 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosRegistry.java
@@ -48,6 +48,8 @@ import java.util.Set;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;
import java.util.Collections;
@@ -122,6 +124,8 @@ public class NacosRegistry extends FailbackRegistry {
private final NacosNamingServiceWrapper namingService;
+ private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, EventListener>> nacosListeners = new ConcurrentHashMap<>();
+
public NacosRegistry(URL url, NacosNamingServiceWrapper namingService) {
super(url);
this.namingService = namingService;
@@ -240,6 +244,19 @@ public class NacosRegistry extends FailbackRegistry {
if (isAdminProtocol(url)) {
shutdownServiceNamesLookup();
}
+ else {
+ Set<String> serviceNames = getServiceNames(url, listener);
+
+ doUnsubscribe(url, listener, serviceNames);
+ }
+ }
+
+ private void doUnsubscribe(final URL url, final NotifyListener listener, final Set<String> serviceNames) {
+ execute(namingService -> {
+ for (String serviceName : serviceNames) {
+ unsubscribeEventListener(serviceName, url, listener);
+ }
+ });
}
private void shutdownServiceNamesLookup() {
@@ -495,26 +512,45 @@ public class NacosRegistry extends FailbackRegistry {
private void subscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
throws NacosException {
- EventListener eventListener = event -> {
- if (event instanceof NamingEvent) {
- NamingEvent e = (NamingEvent) event;
- List<Instance> instances = e.getInstances();
+ ConcurrentMap<NotifyListener, EventListener> listeners = nacosListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
+ EventListener nacosListener = listeners.computeIfAbsent(listener, k -> {
+ EventListener eventListener = event -> {
+ if (event instanceof NamingEvent) {
+ NamingEvent e = (NamingEvent) event;
+ List<Instance> instances = e.getInstances();
- if (isServiceNamesWithCompatibleMode(url)) {
+ if (isServiceNamesWithCompatibleMode(url)) {
- // Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
- // in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
- NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
- instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
- }
+ // Get all instances with corresponding serviceNames to avoid instance overwrite and but with empty instance mentioned
+ // in https://github.com/apache/dubbo/issues/5885 and https://github.com/apache/dubbo/issues/5899
+ NacosInstanceManageUtil.initOrRefreshServiceInstanceList(serviceName, instances);
+ instances = NacosInstanceManageUtil.getAllCorrespondingServiceInstanceList(serviceName);
+ }
- notifySubscriber(url, listener, instances);
- }
- };
+ notifySubscriber(url, listener, instances);
+ }
+ };
+ return eventListener;
+ });
namingService.subscribe(serviceName,
getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
- eventListener);
+ nacosListener);
+ }
+
+ private void unsubscribeEventListener(String serviceName, final URL url, final NotifyListener listener)
+ throws NacosException {
+ ConcurrentMap<NotifyListener, EventListener> notifyListenerEventListenerConcurrentMap = nacosListeners.get(url);
+ if(notifyListenerEventListenerConcurrentMap == null){
+ return;
+ }
+ EventListener nacosListener = notifyListenerEventListenerConcurrentMap.get(listener);
+ if(nacosListener == null){
+ return;
+ }
+ namingService.unsubscribe(serviceName,
+ getUrl().getParameter(GROUP_KEY, Constants.DEFAULT_GROUP),
+ nacosListener);
}
/**