You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2022/01/29 06:55:33 UTC
[dubbo] branch 3.0 updated: [3.0] Fix Service Discovery related concurrency issue (#9642)
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
The following commit(s) were added to refs/heads/3.0 by this push:
new eddb773 [3.0] Fix Service Discovery related concurrency issue (#9642)
eddb773 is described below
commit eddb7731517f99f2e378ea50e3334a5ee69e79ad
Author: Albumen Kevin <jh...@gmail.com>
AuthorDate: Sat Jan 29 14:55:23 2022 +0800
[3.0] Fix Service Discovery related concurrency issue (#9642)
---
.../dubbo/config/mock/MockServiceDiscovery.java | 3 ++-
.../spring/registry/MockServiceDiscovery.java | 3 ++-
.../org/apache/dubbo/metadata/MetadataInfo.java | 22 ++++++++++++++++++
.../registry/client/AbstractServiceDiscovery.java | 26 ++++++++++++++--------
.../listener/ServiceInstancesChangedListener.java | 18 ++++++++++-----
.../client/support/MockServiceDiscovery.java | 3 ++-
.../multicast/MulticastServiceDiscovery.java | 3 ++-
7 files changed, 60 insertions(+), 18 deletions(-)
diff --git a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/mock/MockServiceDiscovery.java b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/mock/MockServiceDiscovery.java
index 11f584c..aa5514f 100644
--- a/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/mock/MockServiceDiscovery.java
+++ b/dubbo-config/dubbo-config-api/src/test/java/org/apache/dubbo/config/mock/MockServiceDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.config.mock;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -48,7 +49,7 @@ public class MockServiceDiscovery extends AbstractServiceDiscovery {
}
@Override
- public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+ public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
this.serviceInstance = serviceInstance;
}
diff --git a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/registry/MockServiceDiscovery.java b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/registry/MockServiceDiscovery.java
index a5c5b3c..e27dd70 100644
--- a/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/registry/MockServiceDiscovery.java
+++ b/dubbo-config/dubbo-config-spring/src/test/java/org/apache/dubbo/config/spring/registry/MockServiceDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.config.spring.registry;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -47,7 +48,7 @@ public class MockServiceDiscovery extends AbstractServiceDiscovery {
}
@Override
- public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+ public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
}
@Override
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 0d10238..00cae28 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -90,6 +90,23 @@ public class MetadataInfo implements Serializable {
this.instanceParams = new ConcurrentHashMap<>();
}
+ private MetadataInfo(String app, String revision, Map<String, ServiceInfo> services, AtomicBoolean initiated,
+ Map<String, String> extendParams, Map<String, String> instanceParams, AtomicBoolean updated,
+ ConcurrentNavigableMap<String, SortedSet<URL>> subscribedServiceURLs,
+ ConcurrentNavigableMap<String, SortedSet<URL>> exportedServiceURLs,
+ ExtensionLoader<MetadataParamsFilter> loader) {
+ this.app = app;
+ this.revision = revision;
+ this.services = new ConcurrentHashMap<>(services);
+ this.initiated = new AtomicBoolean(initiated.get());
+ this.extendParams = new ConcurrentHashMap<>(extendParams);
+ this.instanceParams = new ConcurrentHashMap<>(instanceParams);
+ this.updated = new AtomicBoolean(updated.get());
+ this.subscribedServiceURLs = subscribedServiceURLs == null ? null : new ConcurrentSkipListMap<>(subscribedServiceURLs);
+ this.exportedServiceURLs = exportedServiceURLs == null ? null : new ConcurrentSkipListMap<>(exportedServiceURLs);
+ this.loader = loader;
+ }
+
/**
* Initialize is needed when MetadataInfo is created from deserialization on the consumer side before being used for RPC call.
*/
@@ -344,6 +361,11 @@ public class MetadataInfo implements Serializable {
return services.keySet().toString();
}
+ @Override
+ public synchronized MetadataInfo clone() {
+ return new MetadataInfo(app, revision, services, initiated, extendParams, instanceParams, updated, subscribedServiceURLs, exportedServiceURLs, loader);
+ }
+
public static class ServiceInfo implements Serializable {
private String name;
private String group;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
index e256e04..e75ef4e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/AbstractServiceDiscovery.java
@@ -87,9 +87,13 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
return;
}
- boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance);
+ // update origin metadataInfo's revision
+ this.metadataInfo.calAndGetRevision();
+ // clone metadataInfo to prevent metadataInfo changed during `calOrUpdateInstanceRevision` to `reportMetadata`
+ MetadataInfo copyOfMetaInfo = this.metadataInfo.clone();
+ boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance, copyOfMetaInfo);
if (revisionUpdated) {
- reportMetadata(this.metadataInfo);
+ reportMetadata(copyOfMetaInfo);
doRegister(this.serviceInstance);
}
}
@@ -115,10 +119,15 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
return;
}
- boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance);
+
+ // update origin metadataInfo's revision
+ this.metadataInfo.calAndGetRevision();
+ // clone metadataInfo to prevent metadataInfo changed during `calOrUpdateInstanceRevision` to `reportMetadata`
+ MetadataInfo copyOfMetaInfo = this.metadataInfo.clone();
+ boolean revisionUpdated = calOrUpdateInstanceRevision(this.serviceInstance, copyOfMetaInfo);
if (revisionUpdated) {
- logger.info(String.format("Metadata of instance changed, updating instance with revision %s.", this.serviceInstance.getServiceMetadata().getRevision()));
- doUpdate(this.serviceInstance);
+ logger.info(String.format("Metadata of instance changed, updating instance with revision %s.", copyOfMetaInfo.getRevision()));
+ doUpdate(this.serviceInstance, copyOfMetaInfo);
}
}
@@ -224,11 +233,11 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
throw new UnsupportedOperationException("Service discovery implementation does not support lookup of url list.");
}
- protected void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+ protected void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
this.unregister();
- reportMetadata(serviceInstance.getServiceMetadata());
+ reportMetadata(metadataInfo);
this.doRegister(serviceInstance);
}
@@ -251,9 +260,8 @@ public abstract class AbstractServiceDiscovery implements ServiceDiscovery {
return instance;
}
- protected boolean calOrUpdateInstanceRevision(ServiceInstance instance) {
+ protected boolean calOrUpdateInstanceRevision(ServiceInstance instance, MetadataInfo metadataInfo) {
String existingInstanceRevision = instance.getMetadata().get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME);
- MetadataInfo metadataInfo = instance.getServiceMetadata();
String newRevision = metadataInfo.calAndGetRevision();
if (!newRevision.equals(existingInstanceRevision)) {
if (EMPTY_REVISION.equals(newRevision)) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 13ecc9b..3a06e8b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -98,7 +98,14 @@ public class ServiceInstancesChangedListener {
*
* @param event {@link ServiceInstancesChangedEvent}
*/
- public synchronized void onEvent(ServiceInstancesChangedEvent event) {
+ public void onEvent(ServiceInstancesChangedEvent event) {
+ if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
+ return;
+ }
+ doOnEvent(event);
+ }
+
+ private synchronized void doOnEvent(ServiceInstancesChangedEvent event) {
if (destroyed.get() || !accept(event) || isRetryAndExpired(event)) {
return;
}
@@ -222,10 +229,6 @@ public class ServiceInstancesChangedListener {
}
logger.info("Interface listener of interface " + serviceKey + " removed.");
- if (listeners.isEmpty()) {
- logger.info("No interface listeners exist, will stop instance listener for " + this.getServiceNames());
- serviceDiscovery.removeServiceInstancesChangedListener(this);
- }
}
public boolean hasListeners() {
@@ -375,6 +378,11 @@ public class ServiceInstancesChangedListener {
if (!destroyed.get()) {
if (CollectionUtils.isEmptyMap(listeners)) {
if (destroyed.compareAndSet(false, true)) {
+ if (listeners.isEmpty()) {
+ logger.info("No interface listeners exist, will stop instance listener for " + this.getServiceNames());
+ serviceDiscovery.removeServiceInstancesChangedListener(this);
+ }
+
allInstances.clear();
serviceUrls.clear();
if (retryFuture != null && !retryFuture.isDone()) {
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/support/MockServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/support/MockServiceDiscovery.java
index 892acb0..fcd0383 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/support/MockServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/support/MockServiceDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.registry.client.support;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -40,7 +41,7 @@ public class MockServiceDiscovery extends AbstractServiceDiscovery {
}
@Override
- public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+ public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
}
diff --git a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastServiceDiscovery.java b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastServiceDiscovery.java
index 97f91a3..3967469 100644
--- a/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-multicast/src/main/java/org/apache/dubbo/registry/multicast/MulticastServiceDiscovery.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.registry.multicast;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.registry.client.AbstractServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -47,7 +48,7 @@ public class MulticastServiceDiscovery extends AbstractServiceDiscovery {
}
@Override
- public void doUpdate(ServiceInstance serviceInstance) throws RuntimeException {
+ public void doUpdate(ServiceInstance serviceInstance, MetadataInfo metadataInfo) throws RuntimeException {
}
@Override