You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/01/29 06:55:33 UTC

[dubbo] branch 3.0 updated: [3.0] Fix Service Discovery related concurrency issue (#9642)

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git


The following commit(s) were added to refs/heads/3.0 by this push:
     new eddb773  [3.0] Fix Service Discovery related concurrency issue (#9642)
eddb773 is described below

commit eddb7731517f99f2e378ea50e3334a5ee69e79ad
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Sat Jan 29 14:55:23 2022 +0800

    [3.0] Fix Service Discovery related concurrency issue (#9642)
---
 .../dubbo/config/mock/MockServiceDiscovery.java    |  3 ++-
 .../spring/registry/MockServiceDiscovery.java      |  3 ++-
 .../org/apache/dubbo/metadata/MetadataInfo.java    | 22 ++++++++++++++++++
 .../registry/client/AbstractServiceDiscovery.java  | 26 ++++++++++++++--------
 .../listener/ServiceInstancesChangedListener.java  | 18 ++++++++++-----
 .../client/support/MockServiceDiscovery.java       |  3 ++-
 .../multicast/MulticastServiceDiscovery.java       |  3 ++-
 7 files changed, 60 insertions(+), 18 deletions(-)

diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/mock/MockServiceDiscovery.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/mock/MockServiceDiscovery.java
index 11f584c..aa5514f 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/mock/MockServiceDiscovery.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/mock/MockServiceDiscovery.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.config.mock;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
 import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -48,7 +49,7 @@ public class MockServiceDiscovery extends AbstractServiceDiscovery {
     }
 
     @Override
-    public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+    public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
         this.serviceInstance = serviceInstance;
     }
 
diff --git a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/registry/MockServiceDiscovery.java b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/registry/MockServiceDiscovery.java
index a5c5b3c..e27dd70 100644
--- a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/registry/MockServiceDiscovery.java
+++ b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/registry/MockServiceDiscovery.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.config.spring.registry;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
 import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -47,7 +48,7 @@ public class MockServiceDiscovery extends AbstractServiceDiscovery {
     }
 
     @Override
-    public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+    public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
     }
 
     @Override
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 0d10238..00cae28 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -90,6 +90,23 @@ public class MetadataInfo implements Serializable {
         this.instanceParams = new ConcurrentHashMap<>();
     }
 
+    private MetadataInfo(String app, String revision, Map<String, ServiceInfo> services, AtomicBoolean initiated,
+                        Map<String, String> extendParams, Map<String, String> instanceParams, AtomicBoolean updated,
+                        ConcurrentNavigableMap<String, SortedSet<URL>> subscribedServiceURLs,
+                        ConcurrentNavigableMap<String, SortedSet<URL>> exportedServiceURLs,
+                        ExtensionLoader<MetadataParamsFilter> loader) {
+        this.app = app;
+        this.revision = revision;
+        this.services = new ConcurrentHashMap<>(services);
+        this.initiated = new AtomicBoolean(initiated.get());
+        this.extendParams = new ConcurrentHashMap<>(extendParams);
+        this.instanceParams = new ConcurrentHashMap<>(instanceParams);
+        this.updated = new AtomicBoolean(updated.get());
+        this.subscribedServiceURLs = subscribedServiceURLs == null ? null : new ConcurrentSkipListMap<>(subscribedServiceURLs);
+        this.exportedServiceURLs = exportedServiceURLs == null ? null : new ConcurrentSkipListMap<>(exportedServiceURLs);
+        this.loader = loader;
+    }
+
     /**
      * Initialize is needed when MetadataInfo is created from deserialization on the consumer side before being used for RPC call.
      */
@@ -344,6 +361,11 @@ public class MetadataInfo implements Serializable {
         return services.keySet().toString();
     }
 
+    @Override
+    public synchronized MetadataInfo clone() {
+        return new MetadataInfo(app, revision, services, initiated, extendParams, instanceParams, updated, subscribedServiceURLs, exportedServiceURLs, loader);
+    }
+
     public static class ServiceInfo implements Serializable {
         private String name;
         private String group;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
index e256e04..e75ef4e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
@@ -87,9 +87,13 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
             return;
         }
 
-        boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance);
+        // update origin metadataInfo's revision
+        this.metadataInfo.calAndGetRevision();
+        // clone metadataInfo to prevent metadataInfo changed during `calOrUpdateInstanceRevision` to `reportMetadata`
+        MetadataInfo copyOfMetaInfo = this.metadataInfo.clone();
+        boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance, copyOfMetaInfo);
         if (revisionUpdated) {
-            reportMetadata(this.metadataInfo);
+            reportMetadata(copyOfMetaInfo);
             doRegister(this.serviceInstance);
         }
     }
@@ -115,10 +119,15 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
             return;
         }
 
-        boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance);
+
+        // update origin metadataInfo's revision
+        this.metadataInfo.calAndGetRevision();
+        // clone metadataInfo to prevent metadataInfo changed during `calOrUpdateInstanceRevision` to `reportMetadata`
+        MetadataInfo copyOfMetaInfo = this.metadataInfo.clone();
+        boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance, copyOfMetaInfo);
         if (revisionUpdated) {
-            logger.info(String.format("Metadata of instance changed, updating instance with revision %s.", this.serviceInstance.getServiceMetadata().getRevision()));
-            doUpdate(this.serviceInstance);
+            logger.info(String.format("Metadata of instance changed, updating instance with revision %s.", copyOfMetaInfo.getRevision()));
+            doUpdate(this.serviceInstance, copyOfMetaInfo);
         }
     }
 
@@ -224,11 +233,11 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
        throw new UnsupportedOperationException("Service discovery implementation does not support lookup of url list.");
     }
 
-    protected void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+    protected void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
 
         this.unregister();
 
-        reportMetadata(serviceInstance.getServiceMetadata());
+        reportMetadata(metadataInfo);
         this.doRegister(serviceInstance);
     }
 
@@ -251,9 +260,8 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
         return instance;
     }
 
-    protected boolean calOrUpdateInstanceRevision(ServiceInstance instance) {
+    protected boolean calOrUpdateInstanceRevision(ServiceInstance instance, MetadataInfo metadataInfo) {
         String existingInstanceRevision = instance.getMetadata().get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME);
-        MetadataInfo metadataInfo = instance.getServiceMetadata();
         String newRevision = metadataInfo.calAndGetRevision();
         if (!newRevision.equals(existingInstanceRevision)) {
             if (EMPTY_REVISION.equals(newRevision)) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 13ecc9b..3a06e8b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -98,7 +98,14 @@ public class ServiceInstancesChangedListener {
      *
      * @param event {@link ServiceInstancesChangedEvent}
      */
-    public synchronized void onEvent(ServiceInstancesChangedEvent event) {
+    public void onEvent(ServiceInstancesChangedEvent event) {
+        if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
+            return;
+        }
+        doOnEvent(event);
+    }
+
+    private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
         if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
             return;
         }
@@ -222,10 +229,6 @@ public class ServiceInstancesChangedListener {
         }
 
         logger.info("Interface listener of interface " + serviceKey + " removed.");
-        if (listeners.isEmpty()) {
-            logger.info("No interface listeners exist, will stop instance listener for " + this.getServiceNames());
-            serviceDiscovery.removeServiceInstancesChangedListener(this);
-        }
     }
 
     public boolean hasListeners() {
@@ -375,6 +378,11 @@ public class ServiceInstancesChangedListener {
         if (!destroyed.get()) {
             if (CollectionUtils.isEmptyMap(listeners)) {
                 if (destroyed.compareAndSet(false, true)) {
+                    if (listeners.isEmpty()) {
+                        logger.info("No interface listeners exist, will stop instance listener for " + this.getServiceNames());
+                        serviceDiscovery.removeServiceInstancesChangedListener(this);
+                    }
+
                     allInstances.clear();
                     serviceUrls.clear();
                     if (retryFuture != null && !retryFuture.isDone()) {
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/support/MockServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/support/MockServiceDiscovery.java
index 892acb0..fcd0383 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/support/MockServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/support/MockServiceDiscovery.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.registry.client.support;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
 import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -40,7 +41,7 @@ public class MockServiceDiscovery extends AbstractServiceDiscovery {
     }
 
     @Override
-    public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+    public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
 
     }
 
diff --git a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastServiceDiscovery.java b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastServiceDiscovery.java
index 97f91a3..3967469 100644
--- a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastServiceDiscovery.java
@@ -17,6 +17,7 @@
 package org.apache.dubbo.registry.multicast;
 
 import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
 import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
 import org.apache.dubbo.registry.client.ServiceInstance;
 import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -47,7 +48,7 @@ public class MulticastServiceDiscovery extends AbstractServiceDiscovery {
     }
 
     @Override
-    public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+    public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
     }
 
     @Override