You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/05/24 06:42:21 UTC

[dubbo] branch 3.0 updated: fixes #10079, address notification issue with service discovery multi subscription (#10080)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new d42b93dfda fixes #10079, address notification issue with service discovery multi subscription (#10080)
d42b93dfda is described below

commit d42b93dfda471b448cf6ccea364ba402e58615ce
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue May 24 14:42:09 2022 +0800

    fixes #10079, address notification issue with service discovery multi subscription (#10080)
---
 .../src/main/java/org/apache/dubbo/common/URL.java |   4 +-
 .../listener/ServiceInstancesChangedListener.java  | 109 +++++++++---------
 .../ServiceInstancesChangedListenerTest.java       | 123 ++++++++++++++++++++-
 3 files changed, 178 insertions(+), 58 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index c8078430d3..fb9f1b65a5 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -1402,8 +1402,8 @@ class URL implements Serializable {
         }
         this.protocolServiceKey = getServiceKey();
         /*
-        Special treatment if this is a consumer subscription url instance with no protocol specified - starts with 'consumer://'
-        If the specific protocol is specified on the consumer side, then this method will return as normal.
+        Special treatment for urls begins with 'consumer://', that is, a consumer subscription url instance with no protocol specified.
+        If protocol is specified on the consumer side, then this method will return as normal.
         */
         if (!CONSUMER.equals(getProtocol())) {
             this.protocolServiceKey += (GROUP_CHAR_SEPARATOR + getProtocol());
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 3604b18d43..3142f5f202 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -204,33 +204,30 @@ public class ServiceInstancesChangedListener {
             return;
         }
 
-        Set<String> protocolServiceKeys = getProtocolServiceKeyList(serviceKey, listener);
-        for (String protocolServiceKey : protocolServiceKeys) {
-            // Add to global listeners
-            if (!this.listeners.containsKey(serviceKey)) {
-                // synchronized method, no need to use DCL
-                this.listeners.put(serviceKey, new ConcurrentHashSet<>());
-            }
-            Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
-            notifyListeners.add(new NotifyListenerWithKey(protocolServiceKey, listener));
-        }
-
+        Set<NotifyListenerWithKey> notifyListeners = this.listeners.computeIfAbsent(serviceKey, _k -> new ConcurrentHashSet<>());
+        // {@code protocolServiceKeysToConsume} will be specific protocols configured in reference config or default protocols supported by framework.
+        Set<String> protocolServiceKeysToConsume = getProtocolServiceKeyList(serviceKey, listener);
+        // Add current listener to serviceKey set, there will have more than one listener when multiple references of one same service is configured.
+        NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, protocolServiceKeysToConsume, listener);
+        notifyListeners.add(listenerWithKey);
+
+        // Aggregate address and notify on subscription.
         List<URL> urls;
-        if (protocolServiceKeys.size() > 1) {
+        if (protocolServiceKeysToConsume.size() > 1) {
             urls = new ArrayList<>();
-            for (NotifyListenerWithKey notifyListenerWithKey : this.listeners.get(serviceKey)) {
-                String protocolKey = notifyListenerWithKey.getProtocolServiceKey();
-                List<URL> urlsOfProtocol = getAddresses(protocolKey, listener.getConsumerUrl());
+            for (String protocolServiceKey : protocolServiceKeysToConsume) {
+                List<URL> urlsOfProtocol = getAddresses(protocolServiceKey, listener.getConsumerUrl());
                 if (CollectionUtils.isNotEmpty(urlsOfProtocol)) {
+                    logger.info(String.format("Found %s urls of protocol service key %s ", urlsOfProtocol.size(), protocolServiceKey));
                     urls.addAll(urlsOfProtocol);
                 }
             }
         } else {
-            String protocolKey = this.listeners.get(serviceKey).iterator().next().getProtocolServiceKey();
-            urls = getAddresses(protocolKey, listener.getConsumerUrl());
+            urls = getAddresses(protocolServiceKeysToConsume.iterator().next(), listener.getConsumerUrl());
         }
 
         if (CollectionUtils.isNotEmpty(urls)) {
+            logger.info(String.format("Notify serviceKey: %s, listener: %s with %s urls on subscription", serviceKey, listener, urls.size()));
             listener.notify(urls);
         }
     }
@@ -240,18 +237,16 @@ public class ServiceInstancesChangedListener {
             return;
         }
 
-        for (String protocolServiceKey : getProtocolServiceKeyList(serviceKey, notifyListener)) {
-            // synchronized method, no need to use DCL
-            Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
-            if (notifyListeners != null) {
-                NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(protocolServiceKey, notifyListener);
-                // Remove from global listeners
-                notifyListeners.remove(listenerWithKey);
+        // synchronized method, no need to use DCL
+        Set<NotifyListenerWithKey> notifyListeners = this.listeners.get(serviceKey);
+        if (notifyListeners != null) {
+            NotifyListenerWithKey listenerWithKey = new NotifyListenerWithKey(serviceKey, notifyListener);
+            // Remove from global listeners
+            notifyListeners.remove(listenerWithKey);
 
-                // ServiceKey has no listener, remove set
-                if (notifyListeners.size() == 0) {
-                    this.listeners.remove(serviceKey);
-                }
+            // ServiceKey has no listener, remove set
+            if (notifyListeners.size() == 0) {
+                this.listeners.remove(serviceKey);
             }
         }
     }
@@ -385,32 +380,32 @@ public class ServiceInstancesChangedListener {
      * race condition is protected by onEvent/doOnEvent
      */
     protected void notifyAddressChanged() {
+        // 1 different services
         listeners.forEach((serviceKey, listenerSet) -> {
-            if (listenerSet != null) {
-                if (listenerSet.size() == 1) {
-                    NotifyListenerWithKey listenerWithKey = listenerSet.iterator().next();
-                    String protocolServiceKey = listenerWithKey.getProtocolServiceKey();
-                    NotifyListener notifyListener = listenerWithKey.getNotifyListener();
+            // 2 multiple subscription listener of the same service
+            for (NotifyListenerWithKey listenerWithKey : listenerSet) {
+                NotifyListener notifyListener = listenerWithKey.getNotifyListener();
+                if (listenerWithKey.getProtocolServiceKeys().size() == 1) {// 2.1 if one specific protocol is specified
+                    String protocolServiceKey = listenerWithKey.getProtocolServiceKeys().iterator().next();
                     //FIXME, group wildcard match
                     List<URL> urls = toUrlsWithEmpty(getAddresses(protocolServiceKey, notifyListener.getConsumerUrl()));
-                    logger.info("Notify service " + serviceKey + " with urls " + urls.size());
+                    logger.info("Notify service " + protocolServiceKey + " with urls " + urls.size());
                     notifyListener.notify(urls);
-                } else {
+                } else {// 2.2 multiple protocols or no protocol(using default protocols) set
                     List<URL> urls = new ArrayList<>();
-                    NotifyListener notifyListener = null;
-                    for (NotifyListenerWithKey listenerWithKey : listenerSet) {
-                        String protocolServiceKey = listenerWithKey.getProtocolServiceKey();
-                        notifyListener = listenerWithKey.getNotifyListener();
+                    int effectiveProtocolNum = 0;
+                    for (String protocolServiceKey : listenerWithKey.getProtocolServiceKeys()) {
                         List<URL> tmpUrls = getAddresses(protocolServiceKey, notifyListener.getConsumerUrl());
                         if (CollectionUtils.isNotEmpty(tmpUrls)) {
+                            logger.info("Found  " + urls.size() + " urls of protocol service key " + protocolServiceKey);
+                            effectiveProtocolNum++;
                             urls.addAll(tmpUrls);
                         }
                     }
-                    if (notifyListener != null) {
-                        logger.info("Notify service " + serviceKey + " with urls " + urls.size());
-                        urls = toUrlsWithEmpty(urls);
-                        notifyListener.notify(urls);
-                    }
+
+                    logger.info("Notify service " + serviceKey + " with " + urls.size() + " urls from " + effectiveProtocolNum + " different protocols");
+                    urls = toUrlsWithEmpty(urls);
+                    notifyListener.notify(urls);
                 }
             }
         });
@@ -477,7 +472,7 @@ public class ServiceInstancesChangedListener {
      * Calculate the protocol list that the consumer cares about.
      *
      * @param serviceKey possible input serviceKey includes
-     *                   1. {group}/{interface}:{version}:consumer
+     *                   1. {group}/{interface}:{version}, if protocol is not specified
      *                   2. {group}/{interface}:{version}:{user specified protocols}
      * @param listener   listener also contains the user specified protocols
      * @return protocol list with the format {group}/{interface}:{version}:{protocol}
@@ -530,16 +525,26 @@ public class ServiceInstancesChangedListener {
     }
 
     public static class NotifyListenerWithKey {
-        private final String protocolServiceKey;
+        private final String serviceKey;
+        private final Set<String> protocolServiceKeys;
         private final NotifyListener notifyListener;
 
-        public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
-            this.protocolServiceKey = protocolServiceKey;
+        public NotifyListenerWithKey(String protocolServiceKey, Set<String> protocolServiceKeys, NotifyListener notifyListener) {
+            this.serviceKey = protocolServiceKey;
+            this.protocolServiceKeys = (protocolServiceKeys == null ? new ConcurrentHashSet<>() : protocolServiceKeys);
             this.notifyListener = notifyListener;
         }
 
-        public String getProtocolServiceKey() {
-            return protocolServiceKey;
+        public NotifyListenerWithKey(String protocolServiceKey, NotifyListener notifyListener) {
+            this(protocolServiceKey, null, notifyListener);
+        }
+
+        public String getServiceKey() {
+            return serviceKey;
+        }
+
+        public Set<String> getProtocolServiceKeys() {
+            return protocolServiceKeys;
         }
 
         public NotifyListener getNotifyListener() {
@@ -555,12 +560,12 @@ public class ServiceInstancesChangedListener {
                 return false;
             }
             NotifyListenerWithKey that = (NotifyListenerWithKey) o;
-            return Objects.equals(protocolServiceKey, that.protocolServiceKey) && Objects.equals(notifyListener, that.notifyListener);
+            return Objects.equals(serviceKey, that.serviceKey) && Objects.equals(notifyListener, that.notifyListener);
         }
 
         @Override
         public int hashCode() {
-            return Objects.hash(protocolServiceKey, notifyListener);
+            return Objects.hash(serviceKey, notifyListener);
         }
     }
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
index 3c0734237c..c4a06f2131 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
@@ -82,6 +82,7 @@ public class ServiceInstancesChangedListenerTest {
     static List<ServiceInstance> app1FailedInstances;
     static List<ServiceInstance> app1FailedInstances2;
     static List<ServiceInstance> app1InstancesWithNoRevision;
+    static List<ServiceInstance> app1InstancesMultipleProtocols;
 
     static String metadata_111 = "{\"app\":\"app1\",\"revision\":\"111\",\"services\":{"
         + "\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\ [...]
@@ -99,6 +100,10 @@ public class ServiceInstancesChangedListenerTest {
     static String metadata_444 = "{\"app\":\"app1\",\"revision\":\"444\",\"services\":{"
         + "\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"dubbo\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\ [...]
         + "}}";
+    // only triple protocol enabled
+    static String metadata_555_triple = "{\"app\":\"app1\",\"revision\":\"555\",\"services\":{"
+        + "\"org.apache.dubbo.demo.DemoService:dubbo\":{\"name\":\"org.apache.dubbo.demo.DemoService\",\"protocol\":\"tri\",\"path\":\"org.apache.dubbo.demo.DemoService\",\"params\":{\"side\":\"provider\",\"release\":\"\",\"methods\":\"sayHello,sayHelloAsync\",\"deprecated\":\"false\",\"dubbo\":\"2.0.2\",\"pid\":\"72723\",\"interface\":\"org.apache.dubbo.demo.DemoService\",\"service-name-mapping\":\"true\",\"timeout\":\"3000\",\"generic\":\"false\",\"metadata-type\":\"remote\",\"delay\": [...]
+        + "}}";
 
     static String service1 = "org.apache.dubbo.demo.DemoService";
     static String service2 = "org.apache.dubbo.demo.DemoService2";
@@ -107,12 +112,16 @@ public class ServiceInstancesChangedListenerTest {
     static URL consumerURL = URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=dubbo&registry_cluster=default");
     static URL consumerURL2 = URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService2?interface=org.apache.dubbo.demo.DemoService2&protocol=dubbo&registry_cluster=default");
     static URL consumerURL3 = URL.valueOf("dubbo://127.0.0.1/org.apache.dubbo.demo.DemoService3?interface=org.apache.dubbo.demo.DemoService3&protocol=dubbo&registry_cluster=default");
+    static URL multipleProtocolsConsumerURL = URL.valueOf("dubbo,tri://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=dubbo,tri&registry_cluster=default");
+    static URL noProtocolConsumerURL = URL.valueOf("consumer://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&registry_cluster=default");
+    static URL singleProtocolsConsumerURL = URL.valueOf("tri://127.0.0.1/org.apache.dubbo.demo.DemoService?interface=org.apache.dubbo.demo.DemoService&protocol=tri&registry_cluster=default");
     static URL registryURL = URL.valueOf("dubbo://127.0.0.1:2181/org.apache.dubbo.demo.RegistryService");
 
     static MetadataInfo metadataInfo_111;
     static MetadataInfo metadataInfo_222;
     static MetadataInfo metadataInfo_333;
     static MetadataInfo metadataInfo_444;
+    static MetadataInfo metadataInfo_555_tri;
 
     static MetadataService metadataService;
 
@@ -149,16 +158,22 @@ public class ServiceInstancesChangedListenerTest {
         List<Object> urlsWithoutRevision = new ArrayList<>();
         urlsWithoutRevision.add("30.10.0.1:20880");
 
+        List<Object> urlsMultipleProtocols = new ArrayList<>();
+        urlsMultipleProtocols.add("30.10.0.1:20880?revision=555");//triple
+        urlsMultipleProtocols.addAll(urlsSameRevision);// dubbo
+
         app1Instances = buildInstances(urlsSameRevision);
         app2Instances = buildInstances(urlsDifferentRevision);
         app1FailedInstances = buildInstances(urlsFailedRevision);
         app1FailedInstances2 = buildInstances(urlsFailedRevision2);
         app1InstancesWithNoRevision = buildInstances(urlsWithoutRevision);
+        app1InstancesMultipleProtocols = buildInstances(urlsMultipleProtocols);
 
         metadataInfo_111 = gson.fromJson(metadata_111, MetadataInfo.class);
         metadataInfo_222 = gson.fromJson(metadata_222, MetadataInfo.class);
         metadataInfo_333 = gson.fromJson(metadata_333, MetadataInfo.class);
         metadataInfo_444 = gson.fromJson(metadata_444, MetadataInfo.class);
+        metadataInfo_555_tri = gson.fromJson(metadata_555_triple, MetadataInfo.class);
 
         serviceDiscovery = Mockito.mock(ServiceDiscovery.class);
         when(serviceDiscovery.getUrl()).thenReturn(registryURL);
@@ -167,6 +182,7 @@ public class ServiceInstancesChangedListenerTest {
         when(serviceDiscovery.getRemoteMetadata(eq("222"), anyList())).thenReturn(metadataInfo_222);
         when(serviceDiscovery.getRemoteMetadata(eq("333"), anyList())).thenReturn(metadataInfo_333);
         when(serviceDiscovery.getRemoteMetadata(eq("444"), anyList())).thenReturn(MetadataInfo.EMPTY);
+        when(serviceDiscovery.getRemoteMetadata(eq("555"), anyList())).thenReturn(metadataInfo_555_tri);
     }
 
 
@@ -367,9 +383,106 @@ public class ServiceInstancesChangedListenerTest {
         Mockito.verify(demoService3Listener, Mockito.times(1)).notify(Mockito.anyList());
     }
 
-    // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
     @Test
     @Order(6)
+    public void testMultiServiceListenerNotification() {
+        Set<String> serviceNames = new HashSet<>();
+        serviceNames.add("app1");
+        serviceNames.add("app2");
+        listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+        NotifyListener demoServiceListener1 = Mockito.mock(NotifyListener.class);
+        when(demoServiceListener1.getConsumerUrl()).thenReturn(consumerURL);
+        NotifyListener demoServiceListener2 = Mockito.mock(NotifyListener.class);
+        when(demoServiceListener2.getConsumerUrl()).thenReturn(consumerURL);
+        NotifyListener demoService2Listener1 = Mockito.mock(NotifyListener.class);
+        when(demoService2Listener1.getConsumerUrl()).thenReturn(consumerURL2);
+        NotifyListener demoService2Listener2 = Mockito.mock(NotifyListener.class);
+        when(demoService2Listener2.getConsumerUrl()).thenReturn(consumerURL2);
+        listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), demoServiceListener1);
+        listener.addListenerAndNotify(consumerURL.getProtocolServiceKey(), demoServiceListener2);
+        listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), demoService2Listener1);
+        listener.addListenerAndNotify(consumerURL2.getProtocolServiceKey(), demoService2Listener2);
+        // notify app1 instance change
+        ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1Instances);
+        listener.onEvent(app1_event);
+
+        // check
+        ArgumentCaptor<List<URL>> captor = ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener1, Mockito.times(1)).notify(captor.capture());
+        List<URL> notifiedUrls = captor.getValue();
+        Assertions.assertEquals(3, notifiedUrls.size());
+        ArgumentCaptor<List<URL>> captor2 = ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoService2Listener1, Mockito.times(1)).notify(captor2.capture());
+        List<URL> notifiedUrls2 = captor2.getValue();
+        Assertions.assertEquals(0, notifiedUrls2.size());
+
+        // notify app2 instance change
+        ServiceInstancesChangedEvent app2_event = new ServiceInstancesChangedEvent("app2", app2Instances);
+        listener.onEvent(app2_event);
+
+        // check
+        ArgumentCaptor<List<URL>> app2_captor = ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener1, Mockito.times(2)).notify(app2_captor.capture());
+        List<URL> app2_notifiedUrls = app2_captor.getValue();
+        Assertions.assertEquals(7, app2_notifiedUrls.size());
+        ArgumentCaptor<List<URL>> app2_captor2 = ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoService2Listener1, Mockito.times(2)).notify(app2_captor2.capture());
+        List<URL> app2_notifiedUrls2 = app2_captor2.getValue();
+        Assertions.assertEquals(4, app2_notifiedUrls2.size());
+
+        // test service listener still get notified when added after instance notification.
+        NotifyListener demoService3Listener = Mockito.mock(NotifyListener.class);
+        when(demoService3Listener.getConsumerUrl()).thenReturn(consumerURL3);
+        listener.addListenerAndNotify(consumerURL3.getProtocolServiceKey(), demoService3Listener);
+        Mockito.verify(demoService3Listener, Mockito.times(1)).notify(Mockito.anyList());
+    }
+
+    /**
+     * Test subscribe multiple protocols
+     */
+    @Test
+    @Order(7)
+    public void testSubscribeMultipleProtocols() {
+        Set<String> serviceNames = new HashSet<>();
+        serviceNames.add("app1");
+        listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
+        // no protocol specified, consume all instances
+        NotifyListener demoServiceListener1 = Mockito.mock(NotifyListener.class);
+        when(demoServiceListener1.getConsumerUrl()).thenReturn(noProtocolConsumerURL);
+        listener.addListenerAndNotify(noProtocolConsumerURL.getProtocolServiceKey(), demoServiceListener1);
+        // multiple protocols specified
+        NotifyListener demoServiceListener2 = Mockito.mock(NotifyListener.class);
+        when(demoServiceListener2.getConsumerUrl()).thenReturn(multipleProtocolsConsumerURL);
+        listener.addListenerAndNotify(multipleProtocolsConsumerURL.getProtocolServiceKey(), demoServiceListener2);
+        // one protocol specified
+        NotifyListener demoServiceListener3 = Mockito.mock(NotifyListener.class);
+        when(demoServiceListener3.getConsumerUrl()).thenReturn(singleProtocolsConsumerURL);
+        listener.addListenerAndNotify(singleProtocolsConsumerURL.getProtocolServiceKey(), demoServiceListener3);
+
+        // notify app1 instance change
+        ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1InstancesMultipleProtocols);
+        listener.onEvent(app1_event);
+
+        // check instances expose framework supported default protocols(currently dubbo, triple and rest) are notified
+        ArgumentCaptor<List<URL>> default_protocol_captor = ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener1, Mockito.times(1)).notify(default_protocol_captor.capture());
+        List<URL> default_protocol_notifiedUrls = default_protocol_captor.getValue();
+        Assertions.assertEquals(4, default_protocol_notifiedUrls.size());
+        // check instances expose protocols in consuming list(dubbo and triple) are notified
+        ArgumentCaptor<List<URL>> multi_protocols_captor = ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener2, Mockito.times(1)).notify(multi_protocols_captor.capture());
+        List<URL> multi_protocol_notifiedUrls = multi_protocols_captor.getValue();
+        Assertions.assertEquals(4, multi_protocol_notifiedUrls.size());
+        // check instances expose protocols in consuming list(only triple) are notified
+        ArgumentCaptor<List<URL>> single_protocols_captor = ArgumentCaptor.forClass(List.class);
+        Mockito.verify(demoServiceListener3, Mockito.times(1)).notify(single_protocols_captor.capture());
+        List<URL> single_protocol_notifiedUrls = single_protocols_captor.getValue();
+        Assertions.assertEquals(1, single_protocol_notifiedUrls.size());
+    }
+
+    // revision 异常场景。第一次启动,完全拿不到metadata,只能通知部分地址
+    @Test
+    @Order(8)
     public void testRevisionFailureOnStartup() {
         Set<String> serviceNames = new HashSet<>();
         serviceNames.add("app1");
@@ -387,7 +500,7 @@ public class ServiceInstancesChangedListenerTest {
 
     // revision 异常场景。运行中地址通知,拿不到revision就用老版本revision
     @Test
-    @Order(7)
+    @Order(9)
     public void testRevisionFailureOnNotification() {
         Set<String> serviceNames = new HashSet<>();
         serviceNames.add("app1");
@@ -431,10 +544,9 @@ public class ServiceInstancesChangedListenerTest {
 
     }
 
-
     // Abnormal case. Instance does not have revision
     @Test
-    @Order(9)
+    @Order(10)
     public void testInstanceWithoutRevision() {
         Set<String> serviceNames = new HashSet<>();
         serviceNames.add("app1");
@@ -448,6 +560,9 @@ public class ServiceInstancesChangedListenerTest {
         assertTrue(true);
     }
 
+    /**
+     * Test calculation of subscription protocols
+     */
     @Test
     public void testGetProtocolServiceKeyList() {
         NotifyListener listener = Mockito.mock(NotifyListener.class);