You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/05/24 06:42:21 UTC
[dubbo] branch 3.0 updated: fixes #10079, address notification issue with service discovery multi subscription (#10080)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new d42b93dfda fixes #10079, address notification issue with service discovery multi subscription (#10080)
d42b93dfda is described below
commit d42b93dfda471b448cf6ccea364ba402e58615ce
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue May 24 14:42:09 2022 +0800
fixes #10079, address notification issue with service discovery multi subscription (#10080)
---
.../src/main/java/org/apache/dubbo/common/URL.java | 4 +-
.../listener/ServiceInstancesChangedListener.java | 109 +++++++++---------
.../ServiceInstancesChangedListenerTest.java | 123 ++++++++++++++++++++-
3 files changed, 178 insertions(+), 58 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index c8078430d3..fb9f1b65a5 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -1402,8 +1402,8 @@ class URL implements Serializable {
}
this.protocolServiceKey = getServiceKey();
/*
- Special treatment if this is a consumer subscription url instance with no protocol specified - starts with 'consumer://'
- If the specific protocol is specified on the consumer side, then this method will return as normal.
+ Special treatment for urls begins with 'consumer://', that is, a consumer subscription url instance with no protocol specified.
+ If protocol is specified on the consumer side, then this method will return as normal.
*/
if (!CONSUMER.equals(getProtocol())) {
this.protocolServiceKey += (GROUP_CHAR_SEPARATOR + getProtocol());
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 3604b18d43..3142f5f202 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -204,33 +204,30 @@ public class ServiceInstancesChangedListener {
return;
}
- Set<String> protocolServiceKeys = getProtocolServiceKeyList(serviceKey, listener);
- for (String protocolServiceKey : protocolServiceKeys) {
- // Add to global listeners
- if (!this.listeners.containsKey(serviceKey)) {
- // synchronized method, no need to use DCL
- this.listeners.put(serviceKey, new ConcurrentHashSet<>());
- }
- Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
- notifyListeners.add(new NotifyListenerWithKey(protocolServiceKey, listener));
- }
-
+ Set<NotifyListenerWithKey> notifyListeners = this.listeners.computeIfAbsent(serviceKey, _k -> new ConcurrentHashSet<>());
+ // {@code protocolServiceKeysToConsume} will be specific protocols configured in reference config or default protocols supported by framework.
+ Set<String> protocolServiceKeysToConsume = getProtocolServiceKeyList(serviceKey, listener);
+ // Add current listener to serviceKey set, there will have more than one listener when multiple references of one same service is configured.
+ NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, protocolServiceKeysToConsume, listener);
+ notifyListeners.add(listenerWithKey);
+
+ // Aggregate address and notify on subscription.
List<URL> urls;
- if (protocolServiceKeys.size() > 1) {
+ if (protocolServiceKeysToConsume.size() > 1) {
urls = new ArrayList<>();
- for (NotifyListenerWithKey notifyListenerWithKey : this.listeners.get(serviceKey)) {
- String protocolKey = notifyListenerWithKey.getProtocolServiceKey();
- List<URL> urlsOfProtocol = getAddresses(protocolKey, listener.getConsumerUrl());
+ for (String protocolServiceKey : protocolServiceKeysToConsume) {
+ List<URL> urlsOfProtocol = getAddresses(protocolServiceKey, listener.getConsumerUrl());
if (CollectionUtils.isNotEmpty(urlsOfProtocol)) {
+ logger.info(String.format("Found %s urls of protocol service key %s ", urlsOfProtocol.size(), protocolServiceKey));
urls.addAll(urlsOfProtocol);
}
}
} else {
- String protocolKey = this.listeners.get(serviceKey).iterator().next().getProtocolServiceKey();
- urls = getAddresses(protocolKey, listener.getConsumerUrl());
+ urls = getAddresses(protocolServiceKeysToConsume.iterator().next(), listener.getConsumerUrl());
}
if (CollectionUtils.isNotEmpty(urls)) {
+ logger.info(String.format("Notify serviceKey: %s, listener: %s with %s urls on subscription", serviceKey, listener, urls.size()));
listener.notify(urls);
}
}
@@ -240,18 +237,16 @@ public class ServiceInstancesChangedListener {
return;
}
- for (String protocolServiceKey : getProtocolServiceKeyList(serviceKey, notifyListener)) {
- // synchronized method, no need to use DCL
- Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
- if (notifyListeners != null) {
- NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(protocolServiceKey, notifyListener);
- // Remove from global listeners
- notifyListeners.remove(listenerWithKey);
+ // synchronized method, no need to use DCL
+ Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
+ if (notifyListeners != null) {
+ NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, notifyListener);
+ // Remove from global listeners
+ notifyListeners.remove(listenerWithKey);
- // ServiceKey has no listener, remove set
- if (notifyListeners.size() == 0) {
- this.listeners.remove(serviceKey);
- }
+ // ServiceKey has no listener, remove set
+ if (notifyListeners.size() == 0) {
+ this.listeners.remove(serviceKey);
}
}
}
@@ -385,32 +380,32 @@ public class ServiceInstancesChangedListener {
* race condition is protected by onEvent/doOnEvent
*/
protected void notifyAddressChanged() {
+ // 1 different services
listeners.forEach((serviceKey, listenerSet) -> {
- if (listenerSet != null) {
- if (listenerSet.size() == 1) {
- NotifyListenerWithKey listenerWithKey = listenerSet.iterator().next();
- String protocolServiceKey = listenerWithKey.getProtocolServiceKey();
- NotifyListener notifyListener = listenerWithKey.getNotifyListener();
+ // 2 multiple subscription listener of the same service
+ for (NotifyListenerWithKey listenerWithKey : listenerSet) {
+ NotifyListener notifyListener = listenerWithKey.getNotifyListener();
+ if (listenerWithKey.getProtocolServiceKeys().size() == 1) {// 2.1 if one specific protocol is specified
+ String protocolServiceKey = listenerWithKey.getProtocolServiceKeys().iterator().next();
//FIXME, group wildcard match
List<URL> urls = toUrlsWithEmpty(getAddresses(protocolServiceKey, notifyListener.getConsumerUrl()));
- logger.info("Notify service " + serviceKey + " with urls " + urls.size());
+ logger.info("Notify service " + protocolServiceKey + " with urls " + urls.size());
notifyListener.notify(urls);
- } else {
+ } else {// 2.2 multiple protocols or no protocol(using default protocols) set
List<URL> urls = new ArrayList<>();
- NotifyListener notifyListener = null;
- for (NotifyListenerWithKey listenerWithKey : listenerSet) {
- String protocolServiceKey = listenerWithKey.getProtocolServiceKey();
- notifyListener = listenerWithKey.getNotifyListener();
+ int effectiveProtocolNum = 0;
+ for (String protocolServiceKey : listenerWithKey.getProtocolServiceKeys()) {
List<URL> tmpUrls = getAddresses(protocolServiceKey, notifyListener.getConsumerUrl());
if (CollectionUtils.isNotEmpty(tmpUrls)) {
+ logger.info("Found " + urls.size() + " urls of protocol service key " + protocolServiceKey);
+ effectiveProtocolNum++;
urls.addAll(tmpUrls);
}
}
- if (notifyListener != null) {
- logger.info("Notify service " + serviceKey + " with urls " + urls.size());
- urls = toUrlsWithEmpty(urls);
- notifyListener.notify(urls);
- }
+
+ logger.info("Notify service " + serviceKey + " with " + urls.size() + " urls from " + effectiveProtocolNum + " different protocols");
+ urls = toUrlsWithEmpty(urls);
+ notifyListener.notify(urls);
}
}
});
@@ -477,7 +472,7 @@ public class ServiceInstancesChangedListener {
* Calculate the protocol list that the consumer cares about.
*
* @param serviceKey possible input serviceKey includes
- * 1. {group}/{interface}:{version}:consumer
+ * 1. {group}/{interface}:{version}, if protocol is not specified
* 2. {group}/{interface}:{version}:{user specified protocols}
* @param listener listener also contains the user specified protocols
* @return protocol list with the format {group}/{interface}:{version}:{protocol}
@@ -530,16 +525,26 @@ public class ServiceInstancesChangedListener {
}
public static class NotifyListenerWithKey {
- private final String protocolServiceKey;
+ private final String serviceKey;
+ private final Set<String> protocolServiceKeys;
private final NotifyListener notifyListener;
- public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
- this.protocolServiceKey = protocolServiceKey;
+ public NotifyListenerWithKey(String protocolServiceKey, Set<String> protocolServiceKeys, NotifyListener notifyListener) {
+ this.serviceKey = protocolServiceKey;
+ this.protocolServiceKeys = (protocolServiceKeys == null ? new ConcurrentHashSet<>() : protocolServiceKeys);
this.notifyListener = notifyListener;
}
- public String getProtocolServiceKey() {
- return protocolServiceKey;
+ public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
+ this(protocolServiceKey, null, notifyListener);
+ }
+
+ public String getServiceKey() {
+ return serviceKey;
+ }
+
+ public Set<String> getProtocolServiceKeys() {
+ return protocolServiceKeys;
}
public NotifyListener getNotifyListener() {
@@ -555,12 +560,12 @@ public class ServiceInstancesChangedListener {
return false;
}
NotifyListenerWithKey that = (NotifyListenerWithKey) o;
- return Objects.equals(protocolServiceKey, that.protocolServiceKey) && Objects.equals(notifyListener, that.notifyListener);
+ return Objects.equals(serviceKey, that.serviceKey) && Objects.equals(notifyListener, that.notifyListener);
}
@Override
public int hashCode() {
- return Objects.hash(protocolServiceKey, notifyListener);
+ return Objects.hash(serviceKey, notifyListener);
}
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
index 3c0734237c..c4a06f2131 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
@@ -82,6 +82,7 @@ public class ServiceInstancesChangedListenerTest {
static List<ServiceInstance> app1FailedInstances;
static List<ServiceInstance> app1FailedInstances2;
static List<ServiceInstance> app1InstancesWithNoRevision;
+ static List<ServiceInstance> app1InstancesMultipleProtocols;
static String metadata_111 = "{\"app\":\"app1\",\"revision\":\"111\",\"services\":{"
+ "\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\ [...]
@@ -99,6 +100,10 @@ public class ServiceInstancesChangedListenerTest {
static String metadata_444 = "{\"app\":\"app1\",\"revision\":\"444\",\"services\":{"
+ "\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\ [...]
+ "}}";
+ // only triple protocol enabled
+ static String metadata_555_triple = "{\"app\":\"app1\",\"revision\":\"555\",\"services\":{"
+ + "\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"tri\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\": [...]
+ + "}}";
static String service1 = "org.apache.dubbo.demo.DemoService";
static String service2 = "org.apache.dubbo.demo.DemoService2";
@@ -107,12 +112,16 @@ public class ServiceInstancesChangedListenerTest {
static URL consumerURL = URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=dubbo®istry_cluster=default");
static URL consumerURL2 = URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService2?interface=org.apache.dubbo.demo.DemoService2&protocol=dubbo®istry_cluster=default");
static URL consumerURL3 = URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService3?interface=org.apache.dubbo.demo.DemoService3&protocol=dubbo®istry_cluster=default");
+ static URL multipleProtocolsConsumerURL = URL.valueOf("dubbo,tri://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=dubbo,tri®istry_cluster=default");
+ static URL noProtocolConsumerURL = URL.valueOf("consumer://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService®istry_cluster=default");
+ static URL singleProtocolsConsumerURL = URL.valueOf("tri://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=tri®istry_cluster=default");
static URL registryURL = URL.valueOf("dubbo://127.0.0.1:2181/org.apache.dubbo.demo.RegistryService");
static MetadataInfo metadataInfo_111;
static MetadataInfo metadataInfo_222;
static MetadataInfo metadataInfo_333;
static MetadataInfo metadataInfo_444;
+ static MetadataInfo metadataInfo_555_tri;
static MetadataService metadataService;
@@ -149,16 +158,22 @@ public class ServiceInstancesChangedListenerTest {
List<Object> urlsWithoutRevision = new ArrayList<>();
urlsWithoutRevision.add("30.10.0.1:20880");
+ List<Object> urlsMultipleProtocols = new ArrayList<>();
+ urlsMultipleProtocols.add("30.10.0.1:20880?revision=555");//triple
+ urlsMultipleProtocols.addAll(urlsSameRevision);// dubbo
+
app1Instances = buildInstances(urlsSameRevision);
app2Instances = buildInstances(urlsDifferentRevision);
app1FailedInstances = buildInstances(urlsFailedRevision);
app1FailedInstances2 = buildInstances(urlsFailedRevision2);
app1InstancesWithNoRevision = buildInstances(urlsWithoutRevision);
+ app1InstancesMultipleProtocols = buildInstances(urlsMultipleProtocols);
metadataInfo_111 = gson.fromJson(metadata_111, MetadataInfo.class);
metadataInfo_222 = gson.fromJson(metadata_222, MetadataInfo.class);
metadataInfo_333 = gson.fromJson(metadata_333, MetadataInfo.class);
metadataInfo_444 = gson.fromJson(metadata_444, MetadataInfo.class);
+ metadataInfo_555_tri = gson.fromJson(metadata_555_triple, MetadataInfo.class);
serviceDiscovery = Mockito.mock(ServiceDiscovery.class);
when(serviceDiscovery.getUrl()).thenReturn(registryURL);
@@ -167,6 +182,7 @@ public class ServiceInstancesChangedListenerTest {
when(serviceDiscovery.getRemoteMetadata(eq("222"), anyList())).thenReturn(metadataInfo_222);
when(serviceDiscovery.getRemoteMetadata(eq("333"), anyList())).thenReturn(metadataInfo_333);
when(serviceDiscovery.getRemoteMetadata(eq("444"), anyList())).thenReturn(MetadataInfo.EMPTY);
+ when(serviceDiscovery.getRemoteMetadata(eq("555"), anyList())).thenReturn(metadataInfo_555_tri);
}
@@ -367,9 +383,106 @@ public class ServiceInstancesChangedListenerTest {
Mockito.verify(demoService3Listener, Mockito.times(1)).notify(Mockito.anyList());
}
- // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
@Test
@Order(6)
+ public void testMultiServiceListenerNotification() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ serviceNames.add("app2");
+ listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+ NotifyListener demoServiceListener1 = Mockito.mock(NotifyListener.class);
+ when(demoServiceListener1.getConsumerUrl()).thenReturn(consumerURL);
+ NotifyListener demoServiceListener2 = Mockito.mock(NotifyListener.class);
+ when(demoServiceListener2.getConsumerUrl()).thenReturn(consumerURL);
+ NotifyListener demoService2Listener1 = Mockito.mock(NotifyListener.class);
+ when(demoService2Listener1.getConsumerUrl()).thenReturn(consumerURL2);
+ NotifyListener demoService2Listener2 = Mockito.mock(NotifyListener.class);
+ when(demoService2Listener2.getConsumerUrl()).thenReturn(consumerURL2);
+ listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), demoServiceListener1);
+ listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), demoServiceListener2);
+ listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), demoService2Listener1);
+ listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), demoService2Listener2);
+ // notify app1 instance change
+ ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1Instances);
+ listener.onEvent(app1_event);
+
+ // check
+ ArgumentCaptor<List<URL>> captor = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoServiceListener1, Mockito.times(1)).notify(captor.capture());
+ List<URL> notifiedUrls = captor.getValue();
+ Assertions.assertEquals(3, notifiedUrls.size());
+ ArgumentCaptor<List<URL>> captor2 = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoService2Listener1, Mockito.times(1)).notify(captor2.capture());
+ List<URL> notifiedUrls2 = captor2.getValue();
+ Assertions.assertEquals(0, notifiedUrls2.size());
+
+ // notify app2 instance change
+ ServiceInstancesChangedEvent app2_event = new ServiceInstancesChangedEvent("app2", app2Instances);
+ listener.onEvent(app2_event);
+
+ // check
+ ArgumentCaptor<List<URL>> app2_captor = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoServiceListener1, Mockito.times(2)).notify(app2_captor.capture());
+ List<URL> app2_notifiedUrls = app2_captor.getValue();
+ Assertions.assertEquals(7, app2_notifiedUrls.size());
+ ArgumentCaptor<List<URL>> app2_captor2 = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoService2Listener1, Mockito.times(2)).notify(app2_captor2.capture());
+ List<URL> app2_notifiedUrls2 = app2_captor2.getValue();
+ Assertions.assertEquals(4, app2_notifiedUrls2.size());
+
+ // test service listener still get notified when added after instance notification.
+ NotifyListener demoService3Listener = Mockito.mock(NotifyListener.class);
+ when(demoService3Listener.getConsumerUrl()).thenReturn(consumerURL3);
+ listener.addListenerAndNotify(consumerURL3.getProtocolServiceKey(), demoService3Listener);
+ Mockito.verify(demoService3Listener, Mockito.times(1)).notify(Mockito.anyList());
+ }
+
+ /**
+ * Test subscribe multiple protocols
+ */
+ @Test
+ @Order(7)
+ public void testSubscribeMultipleProtocols() {
+ Set<String> serviceNames = new HashSet<>();
+ serviceNames.add("app1");
+ listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+ // no protocol specified, consume all instances
+ NotifyListener demoServiceListener1 = Mockito.mock(NotifyListener.class);
+ when(demoServiceListener1.getConsumerUrl()).thenReturn(noProtocolConsumerURL);
+ listener.addListenerAndNotify(noProtocolConsumerURL.getProtocolServiceKey(), demoServiceListener1);
+ // multiple protocols specified
+ NotifyListener demoServiceListener2 = Mockito.mock(NotifyListener.class);
+ when(demoServiceListener2.getConsumerUrl()).thenReturn(multipleProtocolsConsumerURL);
+ listener.addListenerAndNotify(multipleProtocolsConsumerURL.getProtocolServiceKey(), demoServiceListener2);
+ // one protocol specified
+ NotifyListener demoServiceListener3 = Mockito.mock(NotifyListener.class);
+ when(demoServiceListener3.getConsumerUrl()).thenReturn(singleProtocolsConsumerURL);
+ listener.addListenerAndNotify(singleProtocolsConsumerURL.getProtocolServiceKey(), demoServiceListener3);
+
+ // notify app1 instance change
+ ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1InstancesMultipleProtocols);
+ listener.onEvent(app1_event);
+
+ // check instances expose framework supported default protocols(currently dubbo, triple and rest) are notified
+ ArgumentCaptor<List<URL>> default_protocol_captor = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoServiceListener1, Mockito.times(1)).notify(default_protocol_captor.capture());
+ List<URL> default_protocol_notifiedUrls = default_protocol_captor.getValue();
+ Assertions.assertEquals(4, default_protocol_notifiedUrls.size());
+ // check instances expose protocols in consuming list(dubbo and triple) are notified
+ ArgumentCaptor<List<URL>> multi_protocols_captor = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoServiceListener2, Mockito.times(1)).notify(multi_protocols_captor.capture());
+ List<URL> multi_protocol_notifiedUrls = multi_protocols_captor.getValue();
+ Assertions.assertEquals(4, multi_protocol_notifiedUrls.size());
+ // check instances expose protocols in consuming list(only triple) are notified
+ ArgumentCaptor<List<URL>> single_protocols_captor = ArgumentCaptor.forClass(List.class);
+ Mockito.verify(demoServiceListener3, Mockito.times(1)).notify(single_protocols_captor.capture());
+ List<URL> single_protocol_notifiedUrls = single_protocols_captor.getValue();
+ Assertions.assertEquals(1, single_protocol_notifiedUrls.size());
+ }
+
+ // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
+ @Test
+ @Order(8)
public void testRevisionFailureOnStartup() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -387,7 +500,7 @@ public class ServiceInstancesChangedListenerTest {
// revision 异常场景。运行中地址通知,拿不到revision就用老版本revision
@Test
- @Order(7)
+ @Order(9)
public void testRevisionFailureOnNotification() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -431,10 +544,9 @@ public class ServiceInstancesChangedListenerTest {
}
-
// Abnormal case. Instance does not have revision
@Test
- @Order(9)
+ @Order(10)
public void testInstanceWithoutRevision() {
Set<String> serviceNames = new HashSet<>();
serviceNames.add("app1");
@@ -448,6 +560,9 @@ public class ServiceInstancesChangedListenerTest {
assertTrue(true);
}
+ /**
+ * Test calculation of subscription protocols
+ */
@Test
public void testGetProtocolServiceKeyList() {
NotifyListener listener = Mockito.mock(NotifyListener.class);