You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by al...@apache.org on 2021/08/08 14:19:31 UTC

[dubbo] branch 3.0 updated: Refactor service change listener. (#8404)

This is an automated email from the ASF dual-hosted git repository.

albumenj pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new 33e373c  Refactor service change listener. (#8404)
33e373c is described below

commit 33e373c62d9e9eff24d490a081209a02518d64f2
Author: 赵延 <ho...@apache.org>
AuthorDate: Sun Aug 8 22:19:19 2021 +0800

    Refactor service change listener. (#8404)
    
    * refactor service change listener.
    
    * destroyMetadataServiceProxy remove second param ServiceDiscovery.
    
    * revert getAddresses param consumerURL
---
 .../client/SelfHostMetaServiceDiscovery.java       |  4 +--
 .../listener/ServiceInstancesChangedListener.java  | 37 +++++++++++++---------
 .../registry/client/metadata/MetadataUtils.java    |  5 ++-
 .../ServiceInstancesChangedListenerTest.java       | 12 +++----
 4 files changed, 32 insertions(+), 26 deletions(-)

diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
index 47794f5..43bb167 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/SelfHostMetaServiceDiscovery.java
@@ -223,7 +223,7 @@ public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery {
             serviceInstance.setMetadata(JSONObject.parseObject(metadataString, Map.class));
         } else {
             // refer from MetadataUtils, this proxy is different from the one used to refer exportedURL
-            MetadataService metadataService = MetadataUtils.getMetadataServiceProxy(serviceInstance, this);
+            MetadataService metadataService = MetadataUtils.getMetadataServiceProxy(serviceInstance);
 
             String consumerId = ApplicationModel.getName() + NetUtils.getLocalHost();
             String metadata = metadataService.getAndListenInstanceMetadata(
@@ -264,7 +264,7 @@ public abstract class SelfHostMetaServiceDiscovery implements ServiceDiscovery {
             allServiceInstances.removeAll(oldServiceInstances);
 
             allServiceInstances.forEach(removedServiceInstance -> {
-                MetadataUtils.destroyMetadataServiceProxy(removedServiceInstance, this);
+                MetadataUtils.destroyMetadataServiceProxy(removedServiceInstance);
             });
 
             cachedServiceInstances.put(serviceName, instances);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 890e046..bae3450 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -98,10 +98,12 @@ public class ServiceInstancesChangedListener {
      * @param event {@link ServiceInstancesChangedEvent}
      */
     public synchronized void onEvent(ServiceInstancesChangedEvent event) {
-        if (destroyed.get() || this.isRetryAndExpired(event) || !accept(event)) {
+        if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
             return;
         }
 
+        refreshInstance(event);
+
         if (logger.isDebugEnabled()) {
             logger.debug(event.getServiceInstances().toString());
         }
@@ -118,7 +120,7 @@ public class ServiceInstancesChangedListener {
             for (ServiceInstance instance : instances) {
                 String revision = getExportedServicesRevision(instance);
                 if (EMPTY_REVISION.equals(revision)) {
-                    if(logger.isDebugEnabled()) {
+                    if (logger.isDebugEnabled()) {
                         logger.debug("Find instance without valid service metadata: " + instance.getAddress());
                     }
                     continue;
@@ -136,19 +138,19 @@ public class ServiceInstancesChangedListener {
             MetadataInfo metadata = getRemoteMetadata(revision, localServiceToRevisions, instance);
             // update metadata into each instance, in case new instance created.
             for (ServiceInstance tmpInstance : subInstances) {
-                ((DefaultServiceInstance)tmpInstance).setServiceMetadata(metadata);
+                ((DefaultServiceInstance) tmpInstance).setServiceMetadata(metadata);
             }
 //            ((DefaultServiceInstance) instance).setServiceMetadata(metadata);
             newRevisionToMetadata.putIfAbsent(revision, metadata);
         }
 
-        if(logger.isDebugEnabled()) {
+        if (logger.isDebugEnabled()) {
             logger.debug(newRevisionToMetadata.size() + " unique revisions: " + newRevisionToMetadata.keySet());
         }
 
         if (hasEmptyMetadata(newRevisionToMetadata)) {// retry every 10 seconds
             if (retryPermission.tryAcquire()) {
-                retryFuture = scheduler.schedule(new AddressRefreshRetryTask(retryPermission, event.getServiceName()), 10000, TimeUnit.MILLISECONDS);
+                retryFuture = scheduler.schedule(new AddressRefreshRetryTask(retryPermission, event.getServiceName()), 10_000L, TimeUnit.MILLISECONDS);
                 logger.warn("Address refresh try task submitted.");
             }
             logger.error("Address refresh failed because of Metadata Server failure, wait for retry or new address refresh event.");
@@ -236,9 +238,6 @@ public class ServiceInstancesChangedListener {
     }
 
     protected boolean isRetryAndExpired(ServiceInstancesChangedEvent event) {
-        String appName = event.getServiceName();
-        List<ServiceInstance> appInstances = event.getServiceInstances();
-
         if (event instanceof RetryServiceInstancesChangedEvent) {
             RetryServiceInstancesChangedEvent retryEvent = (RetryServiceInstancesChangedEvent) event;
             logger.warn("Received address refresh retry event, " + retryEvent.getFailureRecordTime());
@@ -247,14 +246,21 @@ public class ServiceInstancesChangedListener {
                 return true;
             }
             logger.warn("Retrying address notification...");
-        } else {
-            logger.info("Received instance notification, serviceName: " + appName + ", instances: " + appInstances.size());
-            allInstances.put(appName, appInstances);
-            lastRefreshTime = System.currentTimeMillis();
         }
         return false;
     }
 
+    private void refreshInstance(ServiceInstancesChangedEvent event) {
+        if (event instanceof RetryServiceInstancesChangedEvent) {
+            return;
+        }
+        String appName = event.getServiceName();
+        List<ServiceInstance> appInstances = event.getServiceInstances();
+        logger.info("Received instance notification, serviceName: " + appName + ", instances: " + appInstances.size());
+        allInstances.put(appName, appInstances);
+        lastRefreshTime = System.currentTimeMillis();
+    }
+
     protected boolean hasEmptyMetadata(Map<String, MetadataInfo> revisionToMetadata) {
         if (revisionToMetadata == null) {
             return false;
@@ -289,11 +295,12 @@ public class ServiceInstancesChangedListener {
                 break;
             } else {// failed
                 logger.error("Failed to get MetadataInfo for instance " + instance.getAddress() + "?revision=" + revision
-                    + "&cluster=" + instance.getRegistryCluster() + ", wait for retry.");
+                        + "&cluster=" + instance.getRegistryCluster() + ", wait for retry.");
                 triedTimes++;
                 try {
                     Thread.sleep(1000);
-                } catch (InterruptedException e) {}
+                } catch (InterruptedException e) {
+                }
             }
         }
 
@@ -327,7 +334,7 @@ public class ServiceInstancesChangedListener {
                 metadataInfo = remoteMetadataService.getMetadata(instance);
             } else {
                 // change the instance used to communicate to avoid all requests route to the same instance
-                MetadataService metadataServiceProxy = MetadataUtils.getMetadataServiceProxy(instance, serviceDiscovery);
+                MetadataService metadataServiceProxy = MetadataUtils.getMetadataServiceProxy(instance);
                 metadataInfo = metadataServiceProxy.getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
             }
         } catch (Exception e) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
index fd0c855..e6f6d68 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
@@ -22,7 +22,6 @@ import org.apache.dubbo.common.utils.CollectionUtils;
 import org.apache.dubbo.common.utils.StringUtils;
 import org.apache.dubbo.metadata.MetadataService;
 import org.apache.dubbo.metadata.WritableMetadataService;
-import org.apache.dubbo.registry.client.ServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl;
 import org.apache.dubbo.rpc.Invoker;
@@ -90,7 +89,7 @@ public class MetadataUtils {
                 ServiceInstanceMetadataUtils.getExportedServicesRevision(serviceInstance);
     }
 
-    public static MetadataService getMetadataServiceProxy(ServiceInstance instance, ServiceDiscovery serviceDiscovery) {
+    public static MetadataService getMetadataServiceProxy(ServiceInstance instance) {
         String key = computeKey(instance);
         Lock lock = metadataServiceLocks.computeIfAbsent(key, k -> new ReentrantLock());
 
@@ -102,7 +101,7 @@ public class MetadataUtils {
         }
     }
 
-    public static void destroyMetadataServiceProxy(ServiceInstance instance, ServiceDiscovery serviceDiscovery) {
+    public static void destroyMetadataServiceProxy(ServiceInstance instance) {
         String key = computeKey(instance);
         Lock lock = metadataServiceLocks.computeIfAbsent(key, k -> new ReentrantLock());
 
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
index 1303a09..e685610 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListenerTest.java
@@ -182,7 +182,7 @@ public class ServiceInstancesChangedListenerTest {
         ServiceInstancesChangedListener listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
 
         try (MockedStatic<MetadataUtils> mockedMetadataUtils = Mockito.mockStatic(MetadataUtils.class)) {
-            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any(), Mockito.any())).thenReturn(metadataService);
+            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any())).thenReturn(metadataService);
             // notify instance change
             ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent("app1", app1Instances);
             listener.onEvent(event);
@@ -215,7 +215,7 @@ public class ServiceInstancesChangedListenerTest {
         ServiceInstancesChangedListener listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
 
         try (MockedStatic<MetadataUtils> mockedMetadataUtils = Mockito.mockStatic(MetadataUtils.class)) {
-            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any(), Mockito.any())).thenReturn(metadataService);
+            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any())).thenReturn(metadataService);
             // notify app1 instance change
             ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1Instances);
             listener.onEvent(app1_event);
@@ -257,7 +257,7 @@ public class ServiceInstancesChangedListenerTest {
         ServiceInstancesChangedListener listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
 
         try (MockedStatic<MetadataUtils> mockedMetadataUtils = Mockito.mockStatic(MetadataUtils.class)) {
-            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any(), Mockito.any())).thenReturn(metadataService);
+            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any())).thenReturn(metadataService);
             // notify app1 instance change
             ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1Instances);
             listener.onEvent(app1_event);
@@ -325,7 +325,7 @@ public class ServiceInstancesChangedListenerTest {
         listener.addListenerAndNotify(service2 + ":dubbo", demoService2Listener);
 
         try (MockedStatic<MetadataUtils> mockedMetadataUtils = Mockito.mockStatic(MetadataUtils.class)) {
-            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any(), Mockito.any())).thenReturn(metadataService);
+            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any())).thenReturn(metadataService);
             // notify app1 instance change
             ServiceInstancesChangedEvent app1_event = new ServiceInstancesChangedEvent("app1", app1Instances);
             listener.onEvent(app1_event);
@@ -369,7 +369,7 @@ public class ServiceInstancesChangedListenerTest {
         serviceNames.add("app1");
         ServiceInstancesChangedListener listener = new ServiceInstancesChangedListener(serviceNames, serviceDiscovery);
         try (MockedStatic<MetadataUtils> mockedMetadataUtils = Mockito.mockStatic(MetadataUtils.class)) {
-            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any(), Mockito.any())).thenReturn(metadataService);
+            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any())).thenReturn(metadataService);
             // notify app1 instance change
             ServiceInstancesChangedEvent failed_revision_event = new ServiceInstancesChangedEvent("app1", app1FailedInstances);
             listener.onEvent(failed_revision_event);
@@ -398,7 +398,7 @@ public class ServiceInstancesChangedListenerTest {
         ConcurrentMap tmpProxyMap = MetadataUtils.metadataServiceProxies;
 
         try (MockedStatic<MetadataUtils> mockedMetadataUtils = Mockito.mockStatic(MetadataUtils.class)) {
-            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any(), Mockito.any())).thenReturn(metadataService);
+            mockedMetadataUtils.when(() -> MetadataUtils.getMetadataServiceProxy(Mockito.any())).thenReturn(metadataService);
 
             // notify app1 instance change
             ServiceInstancesChangedEvent event = new ServiceInstancesChangedEvent("app1", app1Instances);