You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/09/22 08:20:47 UTC
[dubbo] branch 3.0 updated: [3.0] ZookeeperRegistry CountDownLatch
should be release and add some UT (#8870)
This is an automated email from the ASF dual-hosted git repository.
albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new ff9cfa1 [3.0] ZookeeperRegistry CountDownLatch should be release and add some UT (#8870)
ff9cfa1 is described below
commit ff9cfa16f8bd36b7a2d94352833333e60d13744e
Author: Wang Chengming <63...@qq.com>
AuthorDate: Wed Sep 22 16:20:35 2021 +0800
[3.0] ZookeeperRegistry CountDownLatch should be release and add some UT (#8870)
2. add some unit test
---
.../registry/zookeeper/ZookeeperRegistry.java | 43 ++++++++++--------
.../registry/zookeeper/ZookeeperRegistryTest.java | 53 +++++++++++++++++++++-
2 files changed, 75 insertions(+), 21 deletions(-)
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
index 904e451..e14f237 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistry.java
@@ -82,8 +82,8 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
zkClient.addStateListener((state) -> {
if (state == StateListener.RECONNECTED) {
logger.warn("Trying to fetch the latest urls, in case there're provider changes during connection loss.\n" +
- " Since ephemeral ZNode will not get deleted for a connection lose, " +
- "there's no need to re-register url of this instance.");
+ " Since ephemeral ZNode will not get deleted for a connection lose, " +
+ "there's no need to re-register url of this instance.");
ZookeeperRegistry.this.fetchLatestAddresses();
} else if (state == StateListener.NEW_SESSION_CREATED) {
logger.warn("Trying to re-register urls and re-subscribe listeners of this instance to registry...");
@@ -94,7 +94,7 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
}
} else if (state == StateListener.SESSION_LOST) {
logger.warn("Url of this instance will be deleted from registry soon. " +
- "Dubbo client will try to re-register once a new session is created.");
+ "Dubbo client will try to re-register once a new session is created.");
} else if (state == StateListener.SUSPENDED) {
} else if (state == StateListener.CONNECTED) {
@@ -148,7 +148,7 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
if (!anyServices.contains(child)) {
anyServices.add(child);
subscribe(url.setPath(child).addParameters(INTERFACE_KEY, child,
- Constants.CHECK_KEY, String.valueOf(false)), k);
+ Constants.CHECK_KEY, String.valueOf(false)), k);
}
}
});
@@ -159,27 +159,30 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
service = URL.decode(service);
anyServices.add(service);
subscribe(url.setPath(service).addParameters(INTERFACE_KEY, service,
- Constants.CHECK_KEY, String.valueOf(false)), listener);
+ Constants.CHECK_KEY, String.valueOf(false)), listener);
}
}
} else {
CountDownLatch latch = new CountDownLatch(1);
- List<URL> urls = new ArrayList<>();
- for (String path : toCategoriesPath(url)) {
- ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
- ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
- if (zkListener instanceof RegistryChildListenerImpl) {
- ((RegistryChildListenerImpl) zkListener).setLatch(latch);
- }
- zkClient.create(path, false);
- List<String> children = zkClient.addChildListener(path, zkListener);
- if (children != null) {
- urls.addAll(toUrlsWithEmpty(url, path, children));
+ try {
+ List<URL> urls = new ArrayList<>();
+ for (String path : toCategoriesPath(url)) {
+ ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.computeIfAbsent(url, k -> new ConcurrentHashMap<>());
+ ChildListener zkListener = listeners.computeIfAbsent(listener, k -> new RegistryChildListenerImpl(url, path, k, latch));
+ if (zkListener instanceof RegistryChildListenerImpl) {
+ ((RegistryChildListenerImpl) zkListener).setLatch(latch);
+ }
+ zkClient.create(path, false);
+ List<String> children = zkClient.addChildListener(path, zkListener);
+ if (children != null) {
+ urls.addAll(toUrlsWithEmpty(url, path, children));
+ }
}
+ notify(url, listener, urls);
+ } finally {
+ // tells the listener to run only after the sync notification of main thread finishes.
+ latch.countDown();
}
- notify(url, listener, urls);
- // tells the listener to run only after the sync notification of main thread finishes.
- latch.countDown();
}
} catch (Throwable e) {
throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
@@ -202,7 +205,7 @@ public class ZookeeperRegistry extends CacheableFailbackRegistry {
}
}
- if(listeners.isEmpty()){
+ if (listeners.isEmpty()) {
zkListeners.remove(url);
}
}
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistryTest.java b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistryTest.java
index 23d5af9..2052c3b 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistryTest.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/test/java/org/apache/dubbo/registry/zookeeper/ZookeeperRegistryTest.java
@@ -16,14 +16,16 @@
*/
package org.apache.dubbo.registry.zookeeper;
-import org.apache.curator.test.TestingServer;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.status.Status;
import org.apache.dubbo.common.utils.NetUtils;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.status.RegistryStatusChecker;
+import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.model.ApplicationModel;
+
+import org.apache.curator.test.TestingServer;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
@@ -39,6 +41,8 @@ import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsString;
+import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.Mockito.mock;
public class ZookeeperRegistryTest {
@@ -49,6 +53,7 @@ public class ZookeeperRegistryTest {
private URL anyUrl = URL.valueOf("zookeeper://zookeeper/*");
private URL registryUrl;
private ZookeeperRegistryFactory zookeeperRegistryFactory;
+ private NotifyListener listener;
@BeforeEach
public void setUp() throws Exception {
@@ -122,6 +127,23 @@ public class ZookeeperRegistryTest {
assertThat(lookup.size(), is(1));
}
+ @Test
+ public void testLookupIllegalUrl() {
+ try {
+ zookeeperRegistry.lookup(null);
+ fail();
+ } catch (IllegalArgumentException expected) {
+ assertThat(expected.getMessage(),
+ containsString("lookup url == null"));
+ }
+ }
+
+ @Test
+ public void testLookupWithException() {
+ URL errorUrl = URL.valueOf("multicast://0.0.0.0/");
+ Assertions.assertThrows(RpcException.class, () -> zookeeperRegistry.lookup(errorUrl));
+ }
+
@Disabled
@Test
/*
@@ -152,4 +174,33 @@ public class ZookeeperRegistryTest {
zookeeperRegistry.register(serviceUrl);
latch.await();
}
+
+ @Test
+ public void testDestroy() {
+ zookeeperRegistry.destroy();
+ assertThat(zookeeperRegistry.isAvailable(), is(false));
+ }
+
+
+ @Test
+ public void testDoRegisterWithException() {
+ Assertions.assertThrows(RpcException.class, () -> {
+ URL errorUrl = URL.valueOf("multicast://0.0.0.0/");
+ zookeeperRegistry.doRegister(errorUrl);
+ });
+ }
+
+ @Test
+ public void testDoUnregisterWithException() {
+ Assertions.assertThrows(RpcException.class, () -> {
+ URL errorUrl = URL.valueOf("multicast://0.0.0.0/");
+ zookeeperRegistry.doUnregister(errorUrl);
+ });
+ }
+
+ @Test
+ public void testDoSubscribeWithException() {
+ Assertions.assertThrows(RpcException.class,
+ () -> zookeeperRegistry.doSubscribe(anyUrl, listener));
+ }
}