You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/04 07:09:45 UTC
[dubbo] 08/27: Service Discovery Enhancement
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit c572c2cd2e5dd2d11a81825f50d22a1363eee1fb
Author: ken.lj <ke...@gmail.com>
AuthorDate: Fri Jul 3 12:24:00 2020 +0800
Service Discovery Enhancement
---
.../rpc/cluster/directory/AbstractDirectory.java | 15 +-
.../org.apache.dubbo.rpc.cluster.RouterFactory | 1 +
.../src/main/java/org/apache/dubbo/common/URL.java | 49 ++--
.../dubbo/common/config/ConfigurationUtils.java | 2 +-
.../manager/DefaultExecutorRepository.java | 8 +-
.../apache/dubbo/config/MetadataReportConfig.java | 13 ++
.../org/apache/dubbo/config/RegistryConfig.java | 5 +
.../org/apache/dubbo/config/ReferenceConfig.java | 6 +
.../dubbo/config/bootstrap/DubboBootstrap.java | 32 ++-
.../src/main/resources/META-INF/compat/dubbo.xsd | 5 +
.../src/main/resources/META-INF/dubbo.xsd | 5 +
.../src/main/resources/spring/dubbo-consumer.xml | 4 +-
.../src/main/resources/spring/dubbo-provider.xml | 8 +-
.../DynamicConfigurationServiceNameMapping.java | 2 +-
.../apache/dubbo/metadata/MappingChangedEvent.java | 27 ++-
.../org/apache/dubbo/metadata/MappingListener.java | 8 +-
.../org/apache/dubbo/metadata/MetadataInfo.java | 37 +--
.../org/apache/dubbo/metadata/MetadataService.java | 5 +-
.../dubbo/metadata/MetadataServiceNameMapping.java | 17 +-
.../apache/dubbo/metadata/ServiceNameMapping.java | 2 +-
.../dubbo/metadata/WritableMetadataService.java | 1 +
.../dubbo/metadata/report/MetadataReport.java | 4 +-
.../metadata/report/MetadataReportInstance.java | 18 +-
.../org/apache/dubbo/registry/NotifyListener.java | 2 -
.../client/DefaultRegistryClusterIdentifier.java | 23 +-
.../registry/client/DefaultServiceInstance.java | 26 ++-
.../client/EventPublishingServiceDiscovery.java | 5 +
.../client/FileSystemServiceDiscovery.java | 5 +
.../dubbo/registry/client/InstanceAddressURL.java | 100 +++++---
.../registry/client/RegistryClusterIdentifier.java | 22 +-
.../dubbo/registry/client/ServiceDiscovery.java | 6 +
.../registry/client/ServiceDiscoveryRegistry.java | 155 +++++++------
.../client/ServiceDiscoveryRegistryDirectory.java | 253 +++++++++++++++++++++
.../client/ServiceDiscoveryRegistryProtocol.java | 6 +-
... ServiceDiscoveryRegistryProtocolListener.java} | 28 +--
.../dubbo/registry/client/ServiceInstance.java | 8 +-
.../listener/ServiceInstancesChangedListener.java | 165 ++++++++------
.../store/InMemoryWritableMetadataService.java | 45 +++-
.../metadata/store/RemoteMetadataServiceImpl.java | 37 +--
.../registry/integration/DynamicDirectory.java | 243 ++++++++++++++++++++
.../registry/integration/RegistryDirectory.java | 178 +++------------
.../integration/RegistryInvokerWrapper.java | 22 +-
.../registry/integration/RegistryProtocol.java | 11 +-
...dubbo.registry.client.RegistryClusterIdentifier | 1 +
...o.registry.integration.RegistryProtocolListener | 1 +
.../registry/consul/ConsulServiceDiscovery.java | 7 +-
.../consul/ConsulServiceDiscoveryFactory.java | 17 +-
...g.apache.dubbo.registry.client.ServiceDiscovery | 1 -
...e.dubbo.registry.client.ServiceDiscoveryFactory | 1 +
.../registry/dubbo/RegistryDirectoryTest.java | 2 +-
.../dubbo/registry/etcd/EtcdServiceDiscovery.java | 2 +-
.../registry/nacos/NacosServiceDiscovery.java | 10 +-
.../nacos/NacosServiceDiscoveryFactory.java | 14 +-
...e.dubbo.registry.client.ServiceDiscoveryFactory | 1 +
.../zookeeper/ZookeeperServiceDiscovery.java | 27 +--
.../ZookeeperServiceDiscoveryChangeWatcher.java | 8 +-
.../ZookeeperServiceDiscoveryFactory.java | 16 +-
dubbo-registry/pom.xml | 18 +-
.../main/java/org/apache/dubbo/rpc/RpcContext.java | 79 +++++++
.../dubbo/rpc/proxy/InvokerInvocationHandler.java | 5 +-
60 files changed, 1302 insertions(+), 522 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index 7984d7d..6b4773f 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -29,8 +29,13 @@ import org.apache.dubbo.rpc.cluster.RouterChain;
import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
/**
@@ -48,6 +53,9 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
private volatile URL consumerUrl;
+ protected final Map<String, String> queryMap; // Initialization at construction time, assertion not null
+ protected final String consumedProtocol;
+
protected RouterChain<T> routerChain;
public AbstractDirectory(URL url) {
@@ -59,8 +67,13 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
throw new IllegalArgumentException("url == null");
}
+ queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
+ String path = queryMap.get(PATH_KEY);
+ this.consumedProtocol = this.queryMap.get(PROTOCOL_KEY) == null ? DUBBO : this.queryMap.get(PROTOCOL_KEY);
this.url = url.removeParameter(REFER_KEY).removeParameter(MONITOR_KEY);
- this.consumerUrl = this.url.addParameters(StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY)));
+
+ this.consumerUrl = this.url.setProtocol(consumedProtocol).setPath(path == null ? queryMap.get(INTERFACE_KEY) : path).addParameters(queryMap)
+ .removeParameter(MONITOR_KEY);
setRouterChain(routerChain);
}
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
index 2a807f0..13e307b 100644
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
@@ -5,3 +5,4 @@ service=org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouterFactor
app=org.apache.dubbo.rpc.cluster.router.condition.config.AppRouterFactory
tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory
+instance=org.apache.dubbo.rpc.cluster.router.service.InstanceRouterFactory
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index a396cc9..cc2b66f 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -134,6 +134,7 @@ class URL implements Serializable {
private volatile transient String string;
private transient String serviceKey;
+ private transient String protocolServiceKey;
private transient String address;
@@ -448,7 +449,7 @@ class URL implements Serializable {
}
public URL setUsername(String username) {
- return new URL(protocol, username, password, host, port, path, getParameters());
+ return new URL(getProtocol(), username, password, host, port, path, getParameters());
}
public String getPassword() {
@@ -456,7 +457,7 @@ class URL implements Serializable {
}
public URL setPassword(String password) {
- return new URL(protocol, username, password, host, port, path, getParameters());
+ return new URL(getProtocol(), username, password, host, port, path, getParameters());
}
public String getAuthority() {
@@ -473,7 +474,7 @@ class URL implements Serializable {
}
public URL setHost(String host) {
- return new URL(protocol, username, password, host, port, path, getParameters());
+ return new URL(getProtocol(), username, password, host, port, path, getParameters());
}
/**
@@ -496,7 +497,7 @@ class URL implements Serializable {
}
public URL setPort(int port) {
- return new URL(protocol, username, password, host, port, path, getParameters());
+ return new URL(getProtocol(), username, password, host, port, path, getParameters());
}
public int getPort(int defaultPort) {
@@ -520,7 +521,7 @@ class URL implements Serializable {
} else {
host = address;
}
- return new URL(protocol, username, password, host, port, path, getParameters());
+ return new URL(getProtocol(), username, password, host, port, path, getParameters());
}
public String getBackupAddress() {
@@ -556,7 +557,7 @@ class URL implements Serializable {
}
public URL setPath(String path) {
- return new URL(protocol, username, password, host, port, path, getParameters());
+ return new URL(getProtocol(), username, password, host, port, path, getParameters());
}
public String getAbsolutePath() {
@@ -1142,7 +1143,7 @@ class URL implements Serializable {
Map<String, String> map = new HashMap<>(getParameters());
map.put(key, value);
- return new URL(protocol, username, password, host, port, path, map);
+ return new URL(getProtocol(), username, password, host, port, path, map);
}
public URL addParameterIfAbsent(String key, String value) {
@@ -1156,7 +1157,7 @@ class URL implements Serializable {
Map<String, String> map = new HashMap<>(getParameters());
map.put(key, value);
- return new URL(protocol, username, password, host, port, path, map);
+ return new URL(getProtocol(), username, password, host, port, path, map);
}
public URL addMethodParameter(String method, String key, String value) {
@@ -1171,7 +1172,7 @@ class URL implements Serializable {
Map<String, Map<String, String>> methodMap = toMethodParameters(map);
URL.putMethodParameter(method, key, value, methodMap);
- return new URL(protocol, username, password, host, port, path, map, methodMap);
+ return new URL(getProtocol(), username, password, host, port, path, map, methodMap);
}
public URL addMethodParameterIfAbsent(String method, String key, String value) {
@@ -1189,7 +1190,7 @@ class URL implements Serializable {
Map<String, Map<String, String>> methodMap = toMethodParameters(map);
URL.putMethodParameter(method, key, value, methodMap);
- return new URL(protocol, username, password, host, port, path, map, methodMap);
+ return new URL(getProtocol(), username, password, host, port, path, map, methodMap);
}
/**
@@ -1225,7 +1226,7 @@ class URL implements Serializable {
Map<String, String> map = new HashMap<>(getParameters());
map.putAll(parameters);
- return new URL(protocol, username, password, host, port, path, map);
+ return new URL(getProtocol(), username, password, host, port, path, map);
}
public URL addParametersIfAbsent(Map<String, String> parameters) {
@@ -1234,7 +1235,7 @@ class URL implements Serializable {
}
Map<String, String> map = new HashMap<>(parameters);
map.putAll(getParameters());
- return new URL(protocol, username, password, host, port, path, map);
+ return new URL(getProtocol(), username, password, host, port, path, map);
}
public URL addParameters(String... pairs) {
@@ -1284,11 +1285,11 @@ class URL implements Serializable {
if (map.size() == getParameters().size()) {
return this;
}
- return new URL(protocol, username, password, host, port, path, map);
+ return new URL(getProtocol(), username, password, host, port, path, map);
}
public URL clearParameters() {
- return new URL(protocol, username, password, host, port, path, new HashMap<>());
+ return new URL(getProtocol(), username, password, host, port, path, new HashMap<>());
}
public String getRawParameter(String key) {
@@ -1525,6 +1526,14 @@ class URL implements Serializable {
return BaseServiceMetadata.buildServiceKey(path, group, version);
}
+ public String getProtocolServiceKey() {
+ if (protocolServiceKey != null) {
+ return protocolServiceKey;
+ }
+ this.protocolServiceKey = getServiceKey() + ":" + getProtocol();
+ return protocolServiceKey;
+ }
+
public String toServiceStringWithoutResolving() {
return buildString(true, false, false, true);
}
@@ -1701,4 +1710,16 @@ class URL implements Serializable {
subParameter.put(key, value);
}
+// public String getServiceParameter(String service, String key) {
+// return getParameter(key);
+// }
+//
+// public String getServiceMethodParameter(String service, String key) {
+// return getParameter(key);
+// }
+//
+// public String getServiceParameter(String service, String key) {
+// return getParameter(key);
+// }
+
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
index b0447c8..70fd9f1 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
@@ -97,7 +97,7 @@ public class ConfigurationUtils {
}
public static String getProperty(String property, String defaultValue) {
- return StringUtils.trim(ApplicationModel.getEnvironment().getConfiguration().getString(property, defaultValue));
+ return StringUtils.trim(getGlobalConfiguration().getString(property, defaultValue));
}
public static Map<String, String> parseProperties(String content) throws IOException {
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
index dd37bff..1669089 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/manager/DefaultExecutorRepository.java
@@ -55,10 +55,10 @@ public class DefaultExecutorRepository implements ExecutorRepository {
private ConcurrentMap<String, ConcurrentMap<Integer, ExecutorService>> data = new ConcurrentHashMap<>();
public DefaultExecutorRepository() {
-// for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
-// ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-framework-scheduler"));
-// scheduledExecutors.addItem(scheduler);
-// }
+ for (int i = 0; i < DEFAULT_SCHEDULER_SIZE; i++) {
+ ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-framework-scheduler"));
+ scheduledExecutors.addItem(scheduler);
+ }
//
// reconnectScheduledExecutor = Executors.newSingleThreadScheduledExecutor(new NamedThreadFactory("Dubbo-reconnect-scheduler"));
serviceExporterExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("Dubbo-exporter-scheduler"));
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/MetadataReportConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/MetadataReportConfig.java
index dcd43e2..c3f8c93 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/MetadataReportConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/MetadataReportConfig.java
@@ -79,6 +79,11 @@ public class MetadataReportConfig extends AbstractConfig {
*/
private Boolean cluster;
+ /**
+ * registry id
+ */
+ private String registry;
+
public MetadataReportConfig() {
}
@@ -211,4 +216,12 @@ public class MetadataReportConfig extends AbstractConfig {
public void setCluster(Boolean cluster) {
this.cluster = cluster;
}
+
+ public String getRegistry() {
+ return registry;
+ }
+
+ public void setRegistry(String registry) {
+ this.registry = registry;
+ }
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
index 3b65b40..817bbc7 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/RegistryConfig.java
@@ -194,6 +194,11 @@ public class RegistryConfig extends AbstractConfig {
setProtocol(protocol);
}
+ @Override
+ public String getId() {
+ return super.getId();
+ }
+
public String getProtocol() {
return protocol;
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
index 23f7bf6..f38b80b 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/ReferenceConfig.java
@@ -430,6 +430,12 @@ public class ReferenceConfig<T> extends ReferenceConfigBase<T> {
}
// get consumer's global configuration
checkDefault();
+
+ // init some null configuration.
+ List<ConfigInitializer> configInitializers = ExtensionLoader.getExtensionLoader(ConfigInitializer.class)
+ .getActivateExtension(URL.valueOf("configInitializer://"), (String[]) null);
+ configInitializers.forEach(e -> e.initReferConfig(this));
+
this.refresh();
if (getGeneric() == null && getConsumer() != null) {
setGeneric(getConsumer().getGeneric());
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index dd28b81..455535c6 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -58,6 +58,7 @@ import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.event.EventDispatcher;
import org.apache.dubbo.event.EventListener;
import org.apache.dubbo.event.GenericEventListener;
+import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.MetadataServiceExporter;
import org.apache.dubbo.metadata.WritableMetadataService;
@@ -82,6 +83,7 @@ import java.util.SortedSet;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
@@ -98,8 +100,10 @@ import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_
import static org.apache.dubbo.common.function.ThrowableAction.execute;
import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty;
import static org.apache.dubbo.metadata.WritableMetadataService.getDefaultExtension;
+import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.setMetadataStorageType;
import static org.apache.dubbo.remoting.Constants.CLIENT_KEY;
+import static org.apache.dubbo.rpc.Constants.ID_KEY;
/**
* See {@link ApplicationModel} and {@link ExtensionLoader} for why this class is designed to be singleton.
@@ -1020,10 +1024,26 @@ public class DubboBootstrap extends GenericEventListener {
ServiceInstance serviceInstance = createServiceInstance(serviceName, host, port);
- // register metadata
publishMetadataToRemote(serviceInstance);
- getServiceDiscoveries().forEach(serviceDiscovery -> serviceDiscovery.register(serviceInstance));
+ getServiceDiscoveries().forEach(serviceDiscovery ->
+ {
+ calInstanceRevision(serviceDiscovery, serviceInstance);
+ // register metadata
+ serviceDiscovery.register(serviceInstance);
+ });
+
+ // scheduled task for updating Metadata and ServiceInstance
+ executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
+ publishMetadataToRemote(serviceInstance);
+
+ getServiceDiscoveries().forEach(serviceDiscovery ->
+ {
+ calInstanceRevision(serviceDiscovery, serviceInstance);
+ // register metadata
+ serviceDiscovery.register(serviceInstance);
+ });
+ }, 0, 5000, TimeUnit.MICROSECONDS);
}
private void publishMetadataToRemote(ServiceInstance serviceInstance) {
@@ -1031,6 +1051,14 @@ public class DubboBootstrap extends GenericEventListener {
remoteMetadataService.publishMetadata(serviceInstance);
}
+ private void calInstanceRevision(ServiceDiscovery serviceDiscovery, ServiceInstance instance) {
+ String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
+ MetadataInfo metadataInfo = WritableMetadataService.getDefaultExtension().getMetadataInfos().get(registryCluster);
+ if (metadataInfo != null) {
+ instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision());
+ }
+ }
+
private URL selectMetadataServiceExportedURL() {
URL selectedURL = null;
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
index 1805b2a..ab229eb 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/compat/dubbo.xsd
@@ -690,6 +690,11 @@
<xsd:documentation><![CDATA[ Need cluster support, default false. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="registry" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ registry config id. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
</xsd:complexType>
<xsd:complexType name="configCenterType">
diff --git a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
index b12d815..a79ebb4 100644
--- a/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
+++ b/dubbo-config/dubbo-config-spring/src/main/resources/META-INF/dubbo.xsd
@@ -685,6 +685,11 @@
<xsd:documentation><![CDATA[ Need cluster support, default false. ]]></xsd:documentation>
</xsd:annotation>
</xsd:attribute>
+ <xsd:attribute name="registry" type="xsd:string" use="optional">
+ <xsd:annotation>
+ <xsd:documentation><![CDATA[ registry config id. ]]></xsd:documentation>
+ </xsd:annotation>
+ </xsd:attribute>
</xsd:complexType>
<xsd:complexType name="configCenterType">
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml
index 17aa9d6..bec81fe 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml
@@ -21,11 +21,11 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
- <dubbo:application metadata-type="remote" name="demo-consumer">
+ <dubbo:application name="demo-consumer">
<dubbo:parameter key="mapping-type" value="metadata"/>
</dubbo:application>
- <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
+ <!-- <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>-->
<dubbo:registry address="zookeeper://127.0.0.1:2181?registry-type=service"/>
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/resources/spring/dubbo-provider.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/resources/spring/dubbo-provider.xml
index d7ab19a..90a6ae3 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/resources/spring/dubbo-provider.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-provider/src/main/resources/spring/dubbo-provider.xml
@@ -21,18 +21,18 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-4.3.xsd
http://dubbo.apache.org/schema/dubbo http://dubbo.apache.org/schema/dubbo/dubbo.xsd">
- <dubbo:application metadata-type="remote" name="demo-provider">
+ <dubbo:application name="demo-provider">
<dubbo:parameter key="mapping-type" value="metadata"/>
</dubbo:application>
+ <dubbo:config-center address="zookeeper://127.0.0.1:2181"/>
<dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>
-
- <dubbo:registry address="zookeeper://127.0.0.1:2181?registry-type=service"/>
+ <dubbo:registry id="registry1" address="zookeeper://127.0.0.1:2181?registry-type=service"/>
<dubbo:protocol name="dubbo"/>
<bean id="demoService" class="org.apache.dubbo.demo.provider.DemoServiceImpl"/>
- <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService"/>
+ <dubbo:service interface="org.apache.dubbo.demo.DemoService" ref="demoService" registry="registry1"/>
</beans>
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DynamicConfigurationServiceNameMapping.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DynamicConfigurationServiceNameMapping.java
index a25d07a..21df199 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DynamicConfigurationServiceNameMapping.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DynamicConfigurationServiceNameMapping.java
@@ -72,7 +72,7 @@ public class DynamicConfigurationServiceNameMapping implements ServiceNameMappin
}
@Override
- public Set<String> get(URL url) {
+ public Set<String> get(URL url, MappingListener mappingListener) {
String serviceInterface = url.getServiceInterface();
String group = url.getParameter(GROUP_KEY);
String version = url.getParameter(VERSION_KEY);
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingChangedEvent.java
similarity index 63%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
copy to dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingChangedEvent.java
index ba4b5b8..27a7a3f 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingChangedEvent.java
@@ -14,10 +14,27 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.metadata;
-/**
- *
- */
-public class AbstractPrefixConfigurationTest {
+import java.util.Set;
+
+public class MappingChangedEvent {
+ private String serviceKey;
+ private Set<String> apps;
+
+ public String getServiceKey() {
+ return serviceKey;
+ }
+
+ public void setServiceKey(String serviceKey) {
+ this.serviceKey = serviceKey;
+ }
+
+ public Set<String> getApps() {
+ return apps;
+ }
+
+ public void setApps(Set<String> apps) {
+ this.apps = apps;
+ }
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingListener.java
similarity index 87%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
copy to dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingListener.java
index ba4b5b8..f709d75 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingListener.java
@@ -14,10 +14,8 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.metadata;
-/**
- *
- */
-public class AbstractPrefixConfigurationTest {
+public interface MappingListener {
+ void onEvent(MappingChangedEvent event);
}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index f235655..932517d 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -42,15 +42,17 @@ public class MetadataInfo implements Serializable {
private String revision;
private Map<String, ServiceInfo> services;
+ private transient Map<String, String> extendParams;
+
public MetadataInfo(String app) {
- this.app = app;
- this.services = new HashMap<>();
+ this(app, null, null);
}
public MetadataInfo(String app, String revision, Map<String, ServiceInfo> services) {
this.app = app;
this.revision = revision;
this.services = services == null ? new HashMap<>() : services;
+ this.extendParams = new HashMap<>();
}
public void addService(ServiceInfo serviceInfo) {
@@ -111,6 +113,10 @@ public class MetadataInfo implements Serializable {
return services.get(serviceKey);
}
+ public Map<String, String> getExtendParams() {
+ return extendParams;
+ }
+
public String getParameter(String key, String serviceKey) {
ServiceInfo serviceInfo = services.get(serviceKey);
if (serviceInfo == null) {
@@ -133,7 +139,6 @@ public class MetadataInfo implements Serializable {
private String group;
private String version;
private String protocol;
- private String registry;
private Map<String, String> params;
private transient Map<String, Map<String, String>> methodParams;
@@ -150,7 +155,6 @@ public class MetadataInfo implements Serializable {
url.getParameter(GROUP_KEY),
url.getParameter(VERSION_KEY),
url.getProtocol(),
- "",
null
);
@@ -179,12 +183,11 @@ public class MetadataInfo implements Serializable {
this.params = params;
}
- public ServiceInfo(String name, String group, String version, String protocol, String registry, Map<String, String> params) {
+ public ServiceInfo(String name, String group, String version, String protocol, Map<String, String> params) {
this.name = name;
this.group = group;
this.version = version;
this.protocol = protocol;
- this.registry = registry;
this.params = params == null ? new HashMap<>() : params;
this.serviceKey = URL.buildKey(name, group, version);
@@ -204,9 +207,6 @@ public class MetadataInfo implements Serializable {
if (StringUtils.isNotEmpty(protocol)) {
matchKey = getServiceKey() + GROUP_CHAR_SEPERATOR + protocol;
}
- if (StringUtils.isNotEmpty(registry)) {
- matchKey = getServiceKey() + GROUP_CHAR_SEPERATOR + registry;
- }
return matchKey;
}
@@ -242,22 +242,6 @@ public class MetadataInfo implements Serializable {
this.version = version;
}
- public String getProtocol() {
- return protocol;
- }
-
- public void setProtocol(String protocol) {
- this.protocol = protocol;
- }
-
- public String getRegistry() {
- return registry;
- }
-
- public void setRegistry(String registry) {
- this.registry = registry;
- }
-
public Map<String, String> getParams() {
if (params == null) {
return Collections.emptyMap();
@@ -283,6 +267,9 @@ public class MetadataInfo implements Serializable {
if (keyMap != null) {
value = keyMap.get(key);
}
+ if (StringUtils.isEmpty(value)) {
+ value = getParameter(key);
+ }
return value == null ? defaultValue : value;
}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataService.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataService.java
index ba9e50b..761cd88 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataService.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataService.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.metadata;
import org.apache.dubbo.common.URL;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import java.util.SortedSet;
import java.util.TreeSet;
@@ -177,7 +178,9 @@ public interface MetadataService {
*/
String getServiceDefinition(String serviceKey);
- MetadataInfo getMetadataInfo();
+ MetadataInfo getMetadataInfo(String revision);
+
+ Map<String, MetadataInfo> getMetadataInfos();
/**
* Is the {@link URL} for the {@link MetadataService} or not?
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataServiceNameMapping.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataServiceNameMapping.java
index 9c0c600..5389e63 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataServiceNameMapping.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataServiceNameMapping.java
@@ -23,6 +23,7 @@ import org.apache.dubbo.metadata.report.MetadataReportInstance;
import java.util.LinkedHashSet;
import java.util.List;
+import java.util.Map;
import java.util.Set;
import static java.util.Arrays.asList;
@@ -44,23 +45,27 @@ public class MetadataServiceNameMapping implements ServiceNameMapping {
return;
}
- List<MetadataReport> metadataReports = MetadataReportInstance.getMetadataReports(true);
- metadataReports.forEach(reporter -> {
+ Map<String, MetadataReport> metadataReports = MetadataReportInstance.getMetadataReports(true);
+ metadataReports.forEach((key, reporter) -> {
reporter.registerServiceAppMapping(ServiceNameMapping.buildGroup(serviceInterface, group, version, protocol), getName(), url);
});
}
@Override
- public Set<String> get(URL url) {
+ public Set<String> get(URL url, MappingListener mappingListener) {
String serviceInterface = url.getServiceInterface();
String group = url.getParameter(GROUP_KEY);
String version = url.getParameter(VERSION_KEY);
String protocol = url.getProtocol();
- List<MetadataReport> metadataReports = MetadataReportInstance.getMetadataReports(true);
+ Map<String, MetadataReport> metadataReports = MetadataReportInstance.getMetadataReports(true);
Set<String> serviceNames = new LinkedHashSet<>();
- for (MetadataReport reporter : metadataReports) {
- Set<String> apps = reporter.getServiceAppMapping(ServiceNameMapping.buildGroup(serviceInterface, group, version, protocol), url);
+ for (Map.Entry<String, MetadataReport> entry : metadataReports.entrySet()) {
+ MetadataReport reporter = entry.getValue();
+ Set<String> apps = reporter.getServiceAppMapping(
+ ServiceNameMapping.buildGroup(serviceInterface, group, version, protocol),
+ mappingListener,
+ url);
if (CollectionUtils.isNotEmpty(apps)) {
serviceNames.addAll(apps);
break;
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/ServiceNameMapping.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/ServiceNameMapping.java
index 44d55c5..b4ad31e 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/ServiceNameMapping.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/ServiceNameMapping.java
@@ -44,7 +44,7 @@ public interface ServiceNameMapping {
*
* @return
*/
- Set<String> get(URL url);
+ Set<String> get(URL url, MappingListener mappingListener);
/**
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/WritableMetadataService.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/WritableMetadataService.java
index 0de9683..b870fff 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/WritableMetadataService.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/WritableMetadataService.java
@@ -75,6 +75,7 @@ public interface WritableMetadataService extends MetadataService {
void publishServiceDefinition(URL providerUrl);
+
/**
* Get {@link ExtensionLoader#getDefaultExtension() the defautl extension} of {@link WritableMetadataService}
*
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReport.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReport.java
index 256ae6e..012c162 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReport.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReport.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.metadata.report;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MappingListener;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.definition.model.ServiceDefinition;
import org.apache.dubbo.metadata.report.identifier.MetadataIdentifier;
@@ -30,7 +31,6 @@ import java.util.Map;
import java.util.Set;
public interface MetadataReport {
-
/**
* Service Definition -- START
**/
@@ -51,7 +51,7 @@ public interface MetadataReport {
/**
* Service<-->Application Mapping -- START
**/
- default Set<String> getServiceAppMapping(String serviceKey, URL url) {
+ default Set<String> getServiceAppMapping(String serviceKey, MappingListener listener, URL url) {
return Collections.emptySet();
}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReportInstance.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReportInstance.java
index 4b66fe4..c3afdb0 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReportInstance.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReportInstance.java
@@ -21,11 +21,12 @@ import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.config.MetadataReportConfig;
-import java.util.ArrayList;
-import java.util.List;
+import java.util.HashMap;
+import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_DIRECTORY;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
import static org.apache.dubbo.metadata.report.support.Constants.METADATA_REPORT_KEY;
/**
@@ -35,7 +36,7 @@ public class MetadataReportInstance {
private static AtomicBoolean init = new AtomicBoolean(false);
- private static final List<MetadataReport> metadataReports = new ArrayList<>();
+ private static final Map<String, MetadataReport> metadataReports = new HashMap<>();
public static void init(MetadataReportConfig config) {
if (init.get()) {
@@ -50,15 +51,14 @@ public class MetadataReportInstance {
.removeParameter(METADATA_REPORT_KEY)
.build();
}
- metadataReports.add(metadataReportFactory.getMetadataReport(url));
+ String relatedRegistryId = config.getRegistry() == null ? DEFAULT_KEY : config.getRegistry();
+// RegistryConfig registryConfig = ApplicationModel.getConfigManager().getRegistry(relatedRegistryId)
+// .orElseThrow(() -> new IllegalStateException("Registry id " + relatedRegistryId + " does not exist."));
+ metadataReports.put(relatedRegistryId, metadataReportFactory.getMetadataReport(url));
init.set(true);
}
- public static List<MetadataReport> getMetadataReports() {
- return getMetadataReports(false);
- }
-
- public static List<MetadataReport> getMetadataReports(boolean checked) {
+ public static Map<String, MetadataReport> getMetadataReports(boolean checked) {
if (checked) {
checkInit();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
index 5b54000..89e3e75 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
@@ -45,6 +45,4 @@ public interface NotifyListener {
default void addServiceListener(ServiceInstancesChangedListener instanceListener) {
}
- default void notifyServiceInstances() {
- }
}
\ No newline at end of file
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultRegistryClusterIdentifier.java
similarity index 58%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
copy to dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultRegistryClusterIdentifier.java
index ba4b5b8..b225c06 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultRegistryClusterIdentifier.java
@@ -14,10 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.registry.client;
-/**
- *
- */
-public class AbstractPrefixConfigurationTest {
+import org.apache.dubbo.common.URL;
+
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
+
+public class DefaultRegistryClusterIdentifier implements RegistryClusterIdentifier {
+ @Override
+ public String providerKey(URL url) {
+ // url.getParameter("registry_cluster");
+ // ServiceMetadata.get("registry_cluster");
+ // return;
+ return url.getParameter(REGISTRY_KEY);
+ }
+
+ @Override
+ public String consumerKey(URL url) {
+ return url.getParameter(REGISTRY_KEY);
+ }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index b394024..e44bd4f 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.registry.client;
-import org.apache.dubbo.common.URL;
import org.apache.dubbo.metadata.MetadataInfo;
import java.util.HashMap;
@@ -46,7 +45,9 @@ public class DefaultServiceInstance implements ServiceInstance {
private Map<String, String> metadata = new HashMap<>();
+ private transient String address;
private transient MetadataInfo serviceMetadata;
+ private transient Map<String, String> extendParams = new HashMap<>();
public DefaultServiceInstance() {
}
@@ -104,6 +105,18 @@ public class DefaultServiceInstance implements ServiceInstance {
}
@Override
+ public String getAddress() {
+ if (address == null) {
+ address = getAddress(host, port);
+ }
+ return address;
+ }
+
+ private static String getAddress(String host, int port) {
+ return port <= 0 ? host : host + ':' + port;
+ }
+
+ @Override
public boolean isEnabled() {
return enabled;
}
@@ -126,6 +139,11 @@ public class DefaultServiceInstance implements ServiceInstance {
return metadata;
}
+ @Override
+ public Map<String, String> getExtendParams() {
+ return extendParams;
+ }
+
public void setMetadata(Map<String, String> metadata) {
this.metadata = metadata;
}
@@ -139,10 +157,8 @@ public class DefaultServiceInstance implements ServiceInstance {
}
@Override
- public URL toURL(String protocol, String path, String interfaceName, String group, String version, String serviceKey) {
- InstanceAddressURL url = new InstanceAddressURL(protocol, host, port, path, interfaceName, group, version, serviceKey);
- url.setMetadata(this.getServiceMetadata());
- return url;
+ public InstanceAddressURL toURL() {
+ return new InstanceAddressURL(this, serviceMetadata);
}
@Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
index b3a8b92..b5517b7 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
@@ -224,6 +224,11 @@ final class EventPublishingServiceDiscovery implements ServiceDiscovery {
}
@Override
+ public URL getUrl() {
+ return serviceDiscovery.getUrl();
+ }
+
+ @Override
public void initialize(URL registryURL) {
assertInitialized(INITIALIZE_ACTION);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
index 82bc860..ba8d7d3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
@@ -129,6 +129,11 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen
}
@Override
+ public URL getUrl() {
+ return null;
+ }
+
+ @Override
public void register(ServiceInstance serviceInstance) throws RuntimeException {
String serviceInstanceId = getServiceInstanceId(serviceInstance);
String serviceName = getServiceName(serviceInstance);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
index 6ec4df2..18632bf 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -19,6 +19,7 @@ package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metadata.MetadataInfo;
+import org.apache.dubbo.rpc.RpcContext;
import java.util.HashMap;
import java.util.Map;
@@ -28,37 +29,45 @@ import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
public class InstanceAddressURL extends URL {
-
- private MetadataInfo metadataInfo; // points to metadata of one revision
- private String interfaceName;
- private String group;
- private String version;
- private String serviceKey;
-
- public InstanceAddressURL(String protocol, String host, int port, String path, String interfaceName, String group, String version, String serviceKey) {
- super(protocol, host, port, path);
- this.interfaceName = interfaceName;
- this.group = group;
- this.version = version;
- this.serviceKey = serviceKey;
+ private ServiceInstance instance;
+ private MetadataInfo metadataInfo;
+
+ public InstanceAddressURL(
+ ServiceInstance instance,
+ MetadataInfo metadataInfo
+ ) {
+ this.instance = instance;
+ this.metadataInfo = metadataInfo;
+ this.setHost(instance.getHost());
+ this.setPort(instance.getPort());
}
@Override
public String getServiceInterface() {
- return interfaceName;
+ return RpcContext.getContext().getInterfaceName();
}
public String getGroup() {
- return group;
+ return RpcContext.getContext().getGroup();
}
public String getVersion() {
- return version;
+ return RpcContext.getContext().getVersion();
+ }
+
+ @Override
+ public String getProtocol() {
+ return RpcContext.getContext().getProtocol();
}
@Override
public String getServiceKey() {
- return serviceKey;
+ return RpcContext.getContext().getServiceKey();
+ }
+
+ @Override
+ public String getAddress() {
+ return instance.getAddress();
}
@Override
@@ -71,9 +80,12 @@ public class InstanceAddressURL extends URL {
return getServiceInterface();
}
- String value = super.getParameter(key);
+ String value = getConsumerParameters().get(key);
+ if (StringUtils.isEmpty(value)) {
+ value = getInstanceMetadata().get(key);
+ }
if (StringUtils.isEmpty(value) && metadataInfo != null) {
- value = metadataInfo.getParameter(key, this.getServiceKey());
+ value = metadataInfo.getParameter(key, RpcContext.getContext().getProtocolServiceKey());
}
return value;
}
@@ -97,22 +109,18 @@ public class InstanceAddressURL extends URL {
@Override
public String getMethodParameter(String method, String key) {
- Map<String, Map<String, String>> instanceMethodParams = super.getMethodParameters();
- Map<String, String> keyMap = instanceMethodParams.get(method);
- String value = null;
- if (keyMap != null) {
- value = keyMap.get(key);
+ String value = getMethodParameter(method, key);
+ if (StringUtils.isNotEmpty(value)) {
+ return value;
}
-
MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getServiceKey());
- value = serviceInfo.getMethodParameter(method, key, value);
- return value;
+ return serviceInfo.getMethodParameter(method, key, null);
}
@Override
public Map<String, String> getParameters() {
- Map<String, String> instanceParams = super.getParameters();
- Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(getServiceKey()));
+ Map<String, String> instanceParams = getInstanceMetadata();
+ Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(RpcContext.getContext().getProtocolServiceKey()));
int i = instanceParams == null ? 0 : instanceParams.size();
int j = metadataParams == null ? 0 : metadataParams.size();
Map<String, String> params = new HashMap<>((int) ((i + j) / 0.75) + 1);
@@ -122,11 +130,41 @@ public class InstanceAddressURL extends URL {
if (metadataParams != null) {
params.putAll(metadataParams);
}
+
+ params.putAll(getConsumerParameters());
return params;
}
- public void setMetadata(MetadataInfo metadataInfo) {
- this.metadataInfo = metadataInfo;
+ private Map<String, String> getInstanceMetadata() {
+ return this.instance.getMetadata();
}
+ private Map<String, String> getConsumerParameters() {
+ return RpcContext.getContext().getConsumerUrl().getParameters();
+ }
+
+ private String getConsumerParameter(String key) {
+ return RpcContext.getContext().getConsumerUrl().getParameter(key);
+ }
+
+ private String getConsumerMethodParameter(String method, String key) {
+ return RpcContext.getContext().getConsumerUrl().getMethodParameter(method, key);
+ }
+
+ @Override
+ public URL addParameter(String key, String value) {
+ throw new UnsupportedOperationException("");
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ // instance metadata equals
+ // service metadata equals
+ return super.equals(obj);
+ }
+
+ @Override
+ public int hashCode() {
+ return super.hashCode();
+ }
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/RegistryClusterIdentifier.java
similarity index 54%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
copy to dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/RegistryClusterIdentifier.java
index ba4b5b8..b48b233 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/RegistryClusterIdentifier.java
@@ -14,10 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.registry.client;
-/**
- *
- */
-public class AbstractPrefixConfigurationTest {
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.extension.SPI;
+
+@SPI
+public interface RegistryClusterIdentifier {
+ String providerKey(URL url);
+
+ String consumerKey(URL url);
+
+ static RegistryClusterIdentifier getExtension() {
+ ExtensionLoader<RegistryClusterIdentifier> loader
+ = ExtensionLoader.getExtensionLoader(RegistryClusterIdentifier.class);
+ return loader.getExtension(ConfigurationUtils.getProperty("dubbo.application.sd.type", "default"));
+ }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index bc73cd0..f9e667b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -261,6 +261,12 @@ public interface ServiceDiscovery extends Prioritized {
// ==================================================================================== //
+// String getKey(URL exportedURL);
+
+ default URL getUrl() {
+ return null;
+ }
+
/**
* A human-readable description of the implementation
*
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 878b9e6..9b129f9 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -23,6 +23,8 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.metadata.MappingChangedEvent;
+import org.apache.dubbo.metadata.MappingListener;
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.registry.NotifyListener;
@@ -30,6 +32,7 @@ import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.registry.client.metadata.SubscribedURLsSynthesizer;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
import org.apache.dubbo.registry.support.FailbackRegistry;
import java.util.ArrayList;
@@ -41,6 +44,7 @@ import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.TreeSet;
import java.util.stream.Collectors;
import static java.lang.String.format;
@@ -56,6 +60,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDED_BY;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE;
import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY;
@@ -63,6 +68,7 @@ import static org.apache.dubbo.common.function.ThrowableAction.execute;
import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
import static org.apache.dubbo.common.utils.StringUtils.isBlank;
import static org.apache.dubbo.registry.client.ServiceDiscoveryFactory.getExtension;
+import static org.apache.dubbo.rpc.Constants.ID_KEY;
/**
* Being different to the traditional registry, {@link ServiceDiscoveryRegistry} that is a new service-oriented
@@ -87,7 +93,7 @@ import static org.apache.dubbo.registry.client.ServiceDiscoveryFactory.getExtens
* @see WritableMetadataService
* @since 2.7.5
*/
-public class ServiceDiscoveryRegistry extends FailbackRegistry {
+public class ServiceDiscoveryRegistry implements Registry {
protected final Logger logger = LoggerFactory.getLogger(getClass());
@@ -101,9 +107,11 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
private final Set<String> registeredListeners = new LinkedHashSet<>();
- /* app - listener */
+ /* apps - listener */
private final Map<String, ServiceInstancesChangedListener> serviceListeners = new HashMap<>();
+ private URL registryURL;
+
/**
* A cache for all URLs of services that the subscribed services exported
* The key is the service name
@@ -112,7 +120,7 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
private final Map<String, Map<String, List<URL>>> serviceRevisionExportedURLsCache = new LinkedHashMap<>();
public ServiceDiscoveryRegistry(URL registryURL) {
- super(registryURL);
+ this.registryURL = registryURL;
this.serviceDiscovery = createServiceDiscovery(registryURL);
this.subscribedServices = parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY));
this.serviceNameMapping = ServiceNameMapping.getExtension(registryURL.getParameter(MAPPING_KEY));
@@ -190,11 +198,14 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
if (!shouldRegister(url)) { // Should Not Register
return;
}
- super.register(url);
+ doRegister(url);
}
- @Override
public void doRegister(URL url) {
+ String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
+ if (registryCluster != null) {
+ url = url.addParameter(REGISTRY_KEY, registryCluster);
+ }
if (writableMetadataService.exportURL(url)) {
if (logger.isInfoEnabled()) {
logger.info(format("The URL[%s] registered successfully.", url.toString()));
@@ -211,11 +222,14 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
if (!shouldRegister(url)) {
return;
}
- super.unregister(url);
+ doUnregister(url);
}
- @Override
public void doUnregister(URL url) {
+ String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
+ if (registryCluster != null) {
+ url = url.addParameter(REGISTRY_KEY, registryCluster);
+ }
if (writableMetadataService.unexportURL(url)) {
if (logger.isInfoEnabled()) {
logger.info(format("The URL[%s] deregistered successfully.", url.toString()));
@@ -232,12 +246,22 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
if (!shouldSubscribe(url)) { // Should Not Subscribe
return;
}
- super.subscribe(url, listener);
+ String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
+ if (registryCluster != null) {
+ url = url.addParameter(REGISTRY_KEY, registryCluster);
+ }
+ doSubscribe(url, listener);
}
- @Override
public void doSubscribe(URL url, NotifyListener listener) {
- subscribeURLs(url, listener);
+ writableMetadataService.subscribeURL(url);
+
+ Set<String> serviceNames = getServices(url, listener);
+ if (CollectionUtils.isEmpty(serviceNames)) {
+ throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
+ }
+
+ subscribeURLs(url, listener, serviceNames);
}
@Override
@@ -245,57 +269,56 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
if (!shouldSubscribe(url)) { // Should Not Subscribe
return;
}
- super.unsubscribe(url, listener);
+ String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
+ if (registryCluster != null) {
+ url = url.addParameter(REGISTRY_KEY, registryCluster);
+ }
+ doUnsubscribe(url, listener);
}
- @Override
public void doUnsubscribe(URL url, NotifyListener listener) {
writableMetadataService.unsubscribeURL(url);
}
@Override
+ public List<URL> lookup(URL url) {
+ throw new UnsupportedOperationException("");
+ }
+
+ @Override
+ public URL getUrl() {
+ return registryURL;
+ }
+
+ @Override
public boolean isAvailable() {
return !serviceDiscovery.getServices().isEmpty();
}
@Override
public void destroy() {
- super.destroy();
+ AbstractRegistryFactory.removeDestroyedRegistry(this);
execute(() -> {
// stop ServiceDiscovery
serviceDiscovery.destroy();
});
}
- protected void subscribeURLs(URL url, NotifyListener listener) {
-
- writableMetadataService.subscribeURL(url);
-
- Set<String> serviceNames = getServices(url);
- if (CollectionUtils.isEmpty(serviceNames)) {
- throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
- }
-
- serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));
- }
-
- protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
+ protected void subscribeURLs(URL url, NotifyListener listener, Set<String> serviceNames) {
+ String serviceNamesKey = serviceNames.toString();
// register ServiceInstancesChangedListener
- ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceName,
- k -> new ServiceInstancesChangedListener(serviceName, serviceDiscovery, url) {
- @Override
- protected void notifyAddresses() {
- listener.notifyServiceInstances();
- }
- });
+ ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceNamesKey,
+ k -> new ServiceInstancesChangedListener(serviceNames, serviceDiscovery));
serviceListener.setUrl(url);
listener.addServiceListener(serviceListener);
- // FIXME, sync notification
- List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
- serviceListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
- listener.notifyServiceInstances();
+ serviceNames.forEach(serviceName -> {
+ List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
+ serviceListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
+ });
+ listener.notify(serviceListener.getUrls(url.getProtocolServiceKey()));
+ serviceListener.addListener(url.getProtocolServiceKey(), listener);
registerServiceInstancesChangedListener(url, serviceListener);
}
@@ -313,29 +336,10 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
}
private String createListenerId(URL url, ServiceInstancesChangedListener listener) {
- return listener.getServiceName() + ":" + url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY);
- }
-
- private Map<String, List<URL>> getRevisionExportedURLsMap(String serviceName) {
- return serviceRevisionExportedURLsCache.computeIfAbsent(serviceName, s -> new LinkedHashMap());
+ return listener.getServiceNames() + ":" + url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY);
}
/**
- * Synthesize new subscribed {@link URL URLs} from old one
- *
- * @param subscribedURL
- * @param serviceInstances
- * @return non-null
- */
-// private Collection<? extends URL> synthesizeSubscribedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
-// return subscribedURLsSynthesizers.stream()
-// .filter(synthesizer -> synthesizer.supports(subscribedURL))
-// .map(synthesizer -> synthesizer.synthesize(subscribedURL, serviceInstances))
-// .flatMap(Collection::stream)
-// .collect(Collectors.toList());
-// }
-
- /**
* 1.developer explicitly specifies the application name this interface belongs to
* 2.check Interface-App mapping
* 3.use the services specified in registry url.
@@ -343,18 +347,18 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
* @param subscribedURL
* @return
*/
- protected Set<String> getServices(URL subscribedURL) {
- Set<String> subscribedServices = new LinkedHashSet<>();
+ protected Set<String> getServices(URL subscribedURL, final NotifyListener listener) {
+ Set<String> subscribedServices = new TreeSet<>();
String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
if (StringUtils.isNotEmpty(serviceNames)) {
- subscribedServices = parseServices(serviceNames);
+ subscribedServices.addAll(parseServices(serviceNames));
}
if (isEmpty(subscribedServices)) {
- subscribedServices = findMappedServices(subscribedURL);
+ subscribedServices.addAll(findMappedServices(subscribedURL, new DefaultMappingListener(subscribedURL, subscribedServices, listener)));
if (isEmpty(subscribedServices)) {
- subscribedServices = getSubscribedServices();
+ subscribedServices.addAll(getSubscribedServices());
}
}
return subscribedServices;
@@ -383,8 +387,8 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
* @param subscribedURL
* @return
*/
- protected Set<String> findMappedServices(URL subscribedURL) {
- return serviceNameMapping.get(subscribedURL);
+ protected Set<String> findMappedServices(URL subscribedURL, MappingListener listener) {
+ return serviceNameMapping.get(subscribedURL, listener);
}
/**
@@ -433,4 +437,27 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
return protocol == null || Objects.equals(protocol, targetURL.getParameter(PROTOCOL_KEY))
|| Objects.equals(protocol, targetURL.getProtocol());
}
+
+ private class DefaultMappingListener implements MappingListener {
+ private URL url;
+ private Set<String> oldApps;
+ private NotifyListener listener;
+
+ public DefaultMappingListener(URL subscribedURL, Set<String> serviceNames, NotifyListener listener) {
+ this.url = subscribedURL;
+ this.oldApps = serviceNames;
+ this.listener = listener;
+ }
+
+ @Override
+ public void onEvent(MappingChangedEvent event) {
+ Set<String> newApps = event.getApps();
+ if (CollectionUtils.isEmpty(newApps)) {
+ return;
+ }
+ if (!CollectionUtils.equals(oldApps, newApps) && newApps.size() >= oldApps.size()) {
+ subscribeURLs(url, listener, newApps);
+ }
+ }
+ }
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
new file mode 100644
index 0000000..9890988
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.Assert;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.registry.integration.DynamicDirectory;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.RpcContext;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
+
+public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> implements NotifyListener {
+ private static final Logger logger = LoggerFactory.getLogger(ServiceDiscoveryRegistryDirectory.class);
+
+ // Map<url, Invoker> cache service url to invoker mapping.
+ private volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+
+ private ServiceInstancesChangedListener listener;
+
+ public ServiceDiscoveryRegistryDirectory(Class<T> serviceType, URL url) {
+ super(serviceType, url);
+ }
+
+ @Override
+ public synchronized void notify(List<URL> instanceUrls) {
+ // Set the context of the address notification thread.
+ RpcContext.setRpcContext(getConsumerUrl());
+ if (CollectionUtils.isEmpty(instanceUrls)) {
+ // FIXME, empty protocol
+ }
+ refreshInvoker(instanceUrls);
+ }
+
+ private void refreshInvoker(List<URL> invokerUrls) {
+ Assert.notNull(invokerUrls, "invokerUrls should not be null");
+
+ if (invokerUrls.size() == 1
+ && invokerUrls.get(0) != null
+ && EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) {
+ this.forbidden = true; // Forbid to access
+ this.invokers = Collections.emptyList();
+ routerChain.setInvokers(this.invokers);
+ destroyAllInvokers(); // Close all invokers
+ } else {
+ this.forbidden = false; // Allow to access
+ Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
+ if (CollectionUtils.isEmpty(invokerUrls)) {
+ return;
+ }
+ Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
+
+ if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
+ logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
+ return;
+ }
+
+ List<Invoker<T>> newInvokers = Collections.unmodifiableList(new ArrayList<>(newUrlInvokerMap.values()));
+ // pre-route and build cache, notice that route cache should build on original Invoker list.
+ // toMergeMethodInvokerMap() will wrap some invokers having different groups, those wrapped invokers not should be routed.
+ routerChain.setInvokers(newInvokers);
+ this.invokers = multiGroup ? toMergeInvokerList(newInvokers) : newInvokers;
+ this.urlInvokerMap = newUrlInvokerMap;
+
+ if (oldUrlInvokerMap != null) {
+ try {
+ destroyUnusedInvokers(oldUrlInvokerMap, newUrlInvokerMap); // Close the unused Invoker
+ } catch (Exception e) {
+ logger.warn("destroyUnusedInvokers error. ", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Turn urls into invokers, and if url has been refer, will not re-reference.
+ *
+ * @param urls
+ * @return invokers
+ */
+ private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
+ Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+ if (urls == null || urls.isEmpty()) {
+ return newUrlInvokerMap;
+ }
+ for (URL url : urls) {
+ if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
+ continue;
+ }
+ if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(url.getProtocol())) {
+ logger.error(new IllegalStateException("Unsupported protocol " + url.getProtocol() +
+ " in notified url: " + url + " from registry " + getUrl().getAddress() +
+ " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
+ ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
+ continue;
+ }
+
+ if (urlInvokerMap != null && urlInvokerMap.containsKey(url)) { // Repeated url
+ continue;
+ }
+ Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(url);
+ if (invoker == null) { // Not in the cache, refer again
+ try {
+ boolean enabled = true;
+ if (url.hasParameter(DISABLED_KEY)) {
+ enabled = !url.getParameter(DISABLED_KEY, false);
+ } else {
+ enabled = url.getParameter(ENABLED_KEY, true);
+ }
+ if (enabled) {
+ invoker = protocol.refer(serviceType, url);
+ }
+ } catch (Throwable t) {
+ logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
+ }
+ if (invoker != null) { // Put new invoker in cache
+ newUrlInvokerMap.put(url, invoker);
+ }
+ } else {
+ newUrlInvokerMap.put(url, invoker);
+ }
+ }
+ return newUrlInvokerMap;
+ }
+
+ private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
+ return invokers;
+ }
+
+ /**
+ * Close all invokers
+ */
+ private void destroyAllInvokers() {
+ Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ if (localUrlInvokerMap != null) {
+ for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
+ try {
+ invoker.destroy();
+ } catch (Throwable t) {
+ logger.warn("Failed to destroy service " + serviceKey + " to provider " + invoker.getUrl(), t);
+ }
+ }
+ localUrlInvokerMap.clear();
+ }
+ invokers = null;
+ }
+
+ /**
+ * Check whether the invoker in the cache needs to be destroyed
+ * If set attribute of url: refer.autodestroy=false, the invokers will only increase without decreasing,there may be a refer leak
+ *
+ * @param oldUrlInvokerMap
+ * @param newUrlInvokerMap
+ */
+ private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) {
+ if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
+ destroyAllInvokers();
+ return;
+ }
+ // check deleted invoker
+ List<URL> deleted = null;
+ if (oldUrlInvokerMap != null) {
+ Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
+ for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
+ if (!newInvokers.contains(entry.getValue())) {
+ if (deleted == null) {
+ deleted = new ArrayList<>();
+ }
+ deleted.add(entry.getKey());
+ }
+ }
+ }
+
+ if (deleted != null) {
+ for (URL url : deleted) {
+ if (url != null) {
+ Invoker<T> invoker = oldUrlInvokerMap.remove(url);
+ if (invoker != null) {
+ try {
+ invoker.destroy();
+ if (logger.isDebugEnabled()) {
+ logger.debug("destroy invoker[" + invoker.getUrl() + "] success. ");
+ }
+ } catch (Exception e) {
+ logger.warn("destroy invoker[" + invoker.getUrl() + "] failed. " + e.getMessage(), e);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void destroy() {
+ if (isDestroyed()) {
+ return;
+ }
+
+ // unregister.
+ try {
+ if (getRegisteredConsumerUrl() != null && registry != null && registry.isAvailable()) {
+ registry.unregister(getRegisteredConsumerUrl());
+ }
+ } catch (Throwable t) {
+ logger.warn("unexpected error when unregister service " + serviceKey + "from registry" + registry.getUrl(), t);
+ }
+ // unsubscribe.
+ try {
+ if (getConsumerUrl() != null && registry != null && registry.isAvailable()) {
+ registry.unsubscribe(getConsumerUrl(), this);
+ }
+ } catch (Throwable t) {
+ logger.warn("unexpected error when unsubscribe service " + serviceKey + "from registry" + registry.getUrl(), t);
+ }
+ super.destroy(); // must be executed after unsubscribing
+ try {
+ destroyAllInvokers();
+ } catch (Throwable t) {
+ logger.warn("Failed to destroy service " + serviceKey, t);
+ }
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java
index c89c6c9..f4f872f 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.integration.DynamicDirectory;
import org.apache.dubbo.registry.integration.RegistryProtocol;
import org.apache.dubbo.rpc.Invoker;
@@ -26,7 +27,6 @@ import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGIST
* TODO, replace RegistryProtocol completely in the future.
*/
public class ServiceDiscoveryRegistryProtocol extends RegistryProtocol {
-
@Override
protected URL getRegistryUrl(Invoker<?> originInvoker) {
URL registryUrl = originInvoker.getUrl();
@@ -44,4 +44,8 @@ public class ServiceDiscoveryRegistryProtocol extends RegistryProtocol {
return super.getRegistryUrl(url);
}
+ @Override
+ protected <T> DynamicDirectory<T> createDirectory(Class<T> type, URL url) {
+ return new ServiceDiscoveryRegistryDirectory<>(type, url);
+ }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocolListener.java
similarity index 57%
copy from dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java
copy to dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocolListener.java
index c89c6c9..d213171 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocolListener.java
@@ -16,32 +16,24 @@
*/
package org.apache.dubbo.registry.client;
-import org.apache.dubbo.common.URL;
import org.apache.dubbo.registry.integration.RegistryProtocol;
+import org.apache.dubbo.registry.integration.RegistryProtocolListener;
+import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
-import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_PROTOCOL;
+public class ServiceDiscoveryRegistryProtocolListener implements RegistryProtocolListener {
+ @Override
+ public void onExport(RegistryProtocol registryProtocol, Exporter<?> exporter) {
-/**
- * TODO, replace RegistryProtocol completely in the future.
- */
-public class ServiceDiscoveryRegistryProtocol extends RegistryProtocol {
+ }
@Override
- protected URL getRegistryUrl(Invoker<?> originInvoker) {
- URL registryUrl = originInvoker.getUrl();
- if (SERVICE_REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
- return registryUrl;
- }
- return super.getRegistryUrl(originInvoker);
+ public void onRefer(RegistryProtocol registryProtocol, Invoker<?> invoker) {
+
}
@Override
- protected URL getRegistryUrl(URL url) {
- if (SERVICE_REGISTRY_PROTOCOL.equals(url.getProtocol())) {
- return url;
- }
- return super.getRegistryUrl(url);
- }
+ public void onDestroy() {
+ }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
index 164d146..ad950c1 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
@@ -16,8 +16,6 @@
*/
package org.apache.dubbo.registry.client;
-import org.apache.dubbo.common.URL;
-
import java.io.Serializable;
import java.util.Map;
@@ -57,6 +55,8 @@ public interface ServiceInstance extends Serializable {
*/
Integer getPort();
+ String getAddress();
+
/**
* The enable status of the registered service instance.
*
@@ -84,6 +84,8 @@ public interface ServiceInstance extends Serializable {
*/
Map<String, String> getMetadata();
+ Map<String, String> getExtendParams();
+
/**
* Get the value of metadata by the specified name
*
@@ -117,6 +119,6 @@ public interface ServiceInstance extends Serializable {
*/
boolean equals(Object another);
- URL toURL(String protocol, String path, String interfaceName, String group, String version, String serviceKey);
+ InstanceAddressURL toURL();
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 36365c1..1901e34 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -19,13 +19,14 @@ package org.apache.dubbo.registry.client.event.listener;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.event.ConditionalEventListener;
import org.apache.dubbo.event.EventListener;
import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo;
import org.apache.dubbo.metadata.MetadataService;
+import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
+import org.apache.dubbo.registry.client.RegistryClusterIdentifier;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
@@ -34,15 +35,15 @@ import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl;
import java.util.ArrayList;
-import java.util.Collection;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.TreeSet;
+import static org.apache.dubbo.common.constants.CommonConstants.REGISTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
@@ -52,28 +53,28 @@ import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataU
* @see ServiceInstancesChangedEvent
* @since 2.7.5
*/
-public abstract class ServiceInstancesChangedListener implements ConditionalEventListener<ServiceInstancesChangedEvent> {
+public class ServiceInstancesChangedListener implements ConditionalEventListener<ServiceInstancesChangedEvent> {
private static final Logger logger = LoggerFactory.getLogger(ServiceInstancesChangedListener.class);
- private final String serviceName;
+ private final Set<String> serviceNames;
private final ServiceDiscovery serviceDiscovery;
private URL url;
+ private Map<String, NotifyListener> listeners;
- private List<ServiceInstance> instances;
+ private Map<String, List<ServiceInstance>> allInstances;
- private Map<String, List<ServiceInstance>> revisionToInstances;
+ private Map<String, List<URL>> serviceUrls;
private Map<String, MetadataInfo> revisionToMetadata;
- private Map<String, Set<String>> serviceToRevisions;
-
- private Map<String, List<ServiceInstance>> serviceToInstances;
-
- protected ServiceInstancesChangedListener(String serviceName, ServiceDiscovery serviceDiscovery, URL url) {
- this.serviceName = serviceName;
+ public ServiceInstancesChangedListener(Set<String> serviceNames, ServiceDiscovery serviceDiscovery) {
+ this.serviceNames = serviceNames;
this.serviceDiscovery = serviceDiscovery;
- this.url = url;
+ this.listeners = new HashMap<>();
+ this.allInstances = new HashMap<>();
+ this.serviceUrls = new HashMap<>();
+ this.revisionToMetadata = new HashMap<>();
}
/**
@@ -81,69 +82,66 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
*
* @param event {@link ServiceInstancesChangedEvent}
*/
- public void onEvent(ServiceInstancesChangedEvent event) {
- instances = event.getServiceInstances();
-
- Map<String, List<ServiceInstance>> localRevisionToInstances = new HashMap<>();
- Map<String, MetadataInfo> localRevisionToMetadata = new HashMap<>();
- Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
- for (ServiceInstance instance : instances) {
- String revision = getExportedServicesRevision(instance);
- Collection<ServiceInstance> rInstances = localRevisionToInstances.computeIfAbsent(revision, r -> new ArrayList<>());
- rInstances.add(instance);
-
- MetadataInfo metadata = null;
- if (revisionToMetadata != null && ((metadata = revisionToMetadata.get(revision)) != null)) {
- localRevisionToMetadata.put(revision, metadata);
- } else {
- metadata = getMetadataInfo(instance);
- if (metadata != null) {
- localRevisionToMetadata.put(revision, getMetadataInfo(instance));
+ public synchronized void onEvent(ServiceInstancesChangedEvent event) {
+ String appName = event.getServiceName();
+ allInstances.put(appName, event.getServiceInstances());
+
+ for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
+ List<ServiceInstance> instances = entry.getValue();
+ for (ServiceInstance instance : instances) {
+ String revision = getExportedServicesRevision(instance);
+ Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
+ List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
+ subInstances.add(instance);
+
+ MetadataInfo metadata = revisionToMetadata.get(revision);
+ if (metadata == null) {
+ metadata = getMetadataInfo(instance);
+ if (metadata != null) {
+ revisionToMetadata.put(revision, getMetadataInfo(instance));
+ } else {
+
+ }
}
- }
- if (metadata != null) {
- parse(revision, metadata, localServiceToRevisions);
- ((DefaultServiceInstance) instance).setServiceMetadata(metadata);
- } else {
- logger.error("Failed to load service metadata for instance " + instance);
- Set<String> set = localServiceToRevisions.computeIfAbsent(url.getServiceKey(), k -> new HashSet<>());
- set.add(revision);
- }
- }
-
- this.revisionToInstances = localRevisionToInstances;
- this.revisionToMetadata = localRevisionToMetadata;
- this.serviceToRevisions = localServiceToRevisions;
-
- Map<String, List<ServiceInstance>> localServiceToInstances = new HashMap<>();
- for (String serviceKey : localServiceToRevisions.keySet()) {
- if (CollectionUtils.equals(localRevisionToInstances.keySet(), localServiceToRevisions.get(serviceKey))) {
- localServiceToInstances.put(serviceKey, instances);
+ Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
+ if (metadata != null) {
+ parseMetadata(revision, metadata, localServiceToRevisions);
+ ((DefaultServiceInstance) instance).setServiceMetadata(metadata);
+ }
+// else {
+// logger.error("Failed to load service metadata for instance " + instance);
+// Set<String> set = localServiceToRevisions.computeIfAbsent(url.getServiceKey(), k -> new TreeSet<>());
+// set.add(revision);
+// }
+
+ Map<Set<String>, List<URL>> revisionsToUrls = new HashMap();
+ localServiceToRevisions.forEach((serviceKey, revisions) -> {
+ List<URL> urls = revisionsToUrls.get(revisions);
+ if (urls != null) {
+ serviceUrls.put(serviceKey, urls);
+ } else {
+ urls = new ArrayList<>();
+ for (String r : revisions) {
+ for (ServiceInstance i : revisionToInstances.get(r)) {
+ urls.add(i.toURL());
+ }
+ }
+ revisionsToUrls.put(revisions, urls);
+ serviceUrls.put(serviceKey, urls);
+ }
+ });
}
}
- this.serviceToInstances = localServiceToInstances;
+ this.notifyAddressChanged();
}
- public List<ServiceInstance> getInstances(String serviceKey) {
- if (serviceToInstances.containsKey(serviceKey)) {
- return serviceToInstances.get(serviceKey);
- }
-
- Set<String> revisions = serviceToRevisions.get(serviceKey);
- List<ServiceInstance> allInstances = new LinkedList<>();
- for (String r : revisions) {
- allInstances.addAll(revisionToInstances.get(r));
- }
- return allInstances;
- }
-
- private Map<String, Set<String>> parse(String revision, MetadataInfo metadata, Map<String, Set<String>> localServiceToRevisions) {
+ private Map<String, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<String, Set<String>> localServiceToRevisions) {
Map<String, ServiceInfo> serviceInfos = metadata.getServices();
for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
String serviceKey = entry.getValue().getServiceKey();
- Set<String> set = localServiceToRevisions.computeIfAbsent(serviceKey, k -> new HashSet<>());
+ Set<String> set = localServiceToRevisions.computeIfAbsent(serviceKey, k -> new TreeSet<>());
set.add(revision);
}
@@ -152,7 +150,7 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
private MetadataInfo getMetadataInfo(ServiceInstance instance) {
String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
-
+ instance.getExtendParams().putIfAbsent(REGISTER_KEY, RegistryClusterIdentifier.getExtension().consumerKey(url));
MetadataInfo metadataInfo;
try {
if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
@@ -160,7 +158,7 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
metadataInfo = remoteMetadataService.getMetadata(instance);
} else {
MetadataService metadataServiceProxy = MetadataUtils.getMetadataServiceProxy(instance);
- metadataInfo = metadataServiceProxy.getMetadataInfo();
+ metadataInfo = metadataServiceProxy.getMetadataInfo(ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
}
} catch (Exception e) {
// TODO, load metadata backup
@@ -169,15 +167,32 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
return metadataInfo;
}
- protected abstract void notifyAddresses();
+ private void notifyAddressChanged() {
+ listeners.forEach((key, notifyListener) -> {
+ //FIXME, group wildcard match
+ notifyListener.notify(serviceUrls.get(key));
+ });
+ }
+
+ public void addListener(String serviceKey, NotifyListener listener) {
+ this.listeners.put(serviceKey, listener);
+ }
+
+ public void removeListener(String serviceKey) {
+ this.listeners.remove(serviceKey);
+ }
+
+ public List<URL> getUrls(String serviceKey) {
+ return serviceUrls.get(serviceKey);
+ }
/**
* Get the correlative service name
*
* @return the correlative service name
*/
- public final String getServiceName() {
- return serviceName;
+ public final Set<String> getServiceNames() {
+ return serviceNames;
}
public void setUrl(URL url) {
@@ -193,7 +208,7 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
* @return If service name matches, return <code>true</code>, or <code>false</code>
*/
public final boolean accept(ServiceInstancesChangedEvent event) {
- return Objects.equals(serviceName, event.getServiceName());
+ return serviceNames.contains(event.getServiceName());
}
@Override
@@ -201,11 +216,11 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
if (this == o) return true;
if (!(o instanceof ServiceInstancesChangedListener)) return false;
ServiceInstancesChangedListener that = (ServiceInstancesChangedListener) o;
- return Objects.equals(getServiceName(), that.getServiceName());
+ return Objects.equals(getServiceNames(), that.getServiceNames());
}
@Override
public int hashCode() {
- return Objects.hash(getClass(), getServiceName());
+ return Objects.hash(getClass(), getServiceNames());
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
index 588c7ae..8605eaa 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
@@ -26,6 +26,7 @@ import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.metadata.definition.ServiceDefinitionBuilder;
import org.apache.dubbo.metadata.definition.model.ServiceDefinition;
+import org.apache.dubbo.registry.client.RegistryClusterIdentifier;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.support.ProtocolUtils;
@@ -36,6 +37,8 @@ import java.util.Map;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.locks.Lock;
@@ -70,7 +73,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
* and value is the {@link SortedSet sorted set} of the {@link URL URLs}
*/
ConcurrentNavigableMap<String, SortedSet<URL>> exportedServiceURLs = new ConcurrentSkipListMap<>();
- MetadataInfo metadataInfo;
+ ConcurrentMap<String, MetadataInfo> metadataInfos;
// ==================================================================================== //
@@ -86,7 +89,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
ConcurrentNavigableMap<String, String> serviceDefinitions = new ConcurrentSkipListMap<>();
public InMemoryWritableMetadataService() {
- this.metadataInfo = new MetadataInfo(ApplicationModel.getName());
+ this.metadataInfos = new ConcurrentHashMap<>();
}
@Override
@@ -120,15 +123,28 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
@Override
public boolean exportURL(URL url) {
- ServiceInfo serviceInfo = new ServiceInfo(url);
- metadataInfo.addService(serviceInfo);
+ String registryKey = RegistryClusterIdentifier.getExtension().providerKey(url);
+ String[] keys = registryKey.split(",");
+ for (String key : keys) {
+ MetadataInfo metadataInfo = metadataInfos.computeIfAbsent(key, k -> {
+ return new MetadataInfo(ApplicationModel.getName());
+ });
+ metadataInfo.addService(new ServiceInfo(url));
+ }
return addURL(exportedServiceURLs, url);
}
@Override
public boolean unexportURL(URL url) {
- ServiceInfo serviceInfo = new ServiceInfo(url);
- metadataInfo.removeService(serviceInfo);
+ String registryKey = RegistryClusterIdentifier.getExtension().providerKey(url);
+ String[] keys = registryKey.split(",");
+ for (String key : keys) {
+ MetadataInfo metadataInfo = metadataInfos.get(key);
+ metadataInfo.removeService(url.getProtocolServiceKey());
+ if (metadataInfo.getServices().isEmpty()) {
+ metadataInfos.remove(key);
+ }
+ }
return removeURL(exportedServiceURLs, url);
}
@@ -173,8 +189,21 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
}
@Override
- public MetadataInfo getMetadataInfo() {
- return metadataInfo;
+ public MetadataInfo getMetadataInfo(String revision) {
+ if (StringUtils.isEmpty(revision)) {
+ return null;
+ }
+ for (Map.Entry<String, MetadataInfo> entry : metadataInfos.entrySet()) {
+ MetadataInfo metadataInfo = entry.getValue();
+ if (revision.equals(metadataInfo.getRevision())) {
+ return metadataInfo;
+ }
+ }
+ return null;
+ }
+
+ public Map<String, MetadataInfo> getMetadataInfos() {
+ return metadataInfos;
}
boolean addURL(Map<String, SortedSet<URL>> serviceURLs, URL url) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index 799d2aa..85d6855 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -33,7 +33,7 @@ import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.RpcException;
-import java.util.List;
+import java.util.Map;
import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
@@ -44,7 +44,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
-import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
public class RemoteMetadataServiceImpl {
@@ -55,15 +55,20 @@ public class RemoteMetadataServiceImpl {
this.localMetadataService = writableMetadataService;
}
- public List<MetadataReport> getMetadataReports() {
+ public Map<String, MetadataReport> getMetadataReports() {
return MetadataReportInstance.getMetadataReports(true);
}
public void publishMetadata(ServiceInstance instance) {
- MetadataInfo metadataInfo = localMetadataService.getMetadataInfo();
- SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), metadataInfo.getRevision());
- getMetadataReports().forEach(metadataReport -> {
- instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision());
+ Map<String, MetadataInfo> metadataInfos = localMetadataService.getMetadataInfos();
+ metadataInfos.forEach((registryKey, metadataInfo) -> {
+ SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), metadataInfo.getRevision());
+ metadataInfo.getRevision();
+ metadataInfo.getExtendParams().put(REGISTRY_KEY, registryKey);
+ MetadataReport metadataReport = getMetadataReports().get(registryKey);
+ if (metadataReport == null) {
+ metadataReport = getMetadataReports().entrySet().iterator().next().getValue();
+ }
metadataReport.publishAppMetadata(identifier, metadataInfo);
});
}
@@ -71,13 +76,14 @@ public class RemoteMetadataServiceImpl {
public MetadataInfo getMetadata(ServiceInstance instance) {
SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(),
ServiceInstanceMetadataUtils.getExportedServicesRevision(instance));
- for (MetadataReport reporter : getMetadataReports()) {
- MetadataInfo metadataInfo = reporter.getAppMetadata(identifier, instance.getMetadata());
- if (metadataInfo != null) {
- return metadataInfo;
- }
+
+ String registryCluster = instance.getExtendParams().get(REGISTRY_KEY);
+
+ MetadataReport metadataReport = getMetadataReports().get(registryCluster);
+ if (metadataReport == null) {
+ metadataReport = getMetadataReports().entrySet().iterator().next().getValue();
}
- return null;
+ return metadataReport.getAppMetadata(identifier, instance.getMetadata());
}
public void publishServiceDefinition(URL url) {
@@ -103,7 +109,8 @@ public class RemoteMetadataServiceImpl {
Class interfaceClass = Class.forName(interfaceName);
FullServiceDefinition fullServiceDefinition = ServiceDefinitionBuilder.buildFullDefinition(interfaceClass,
providerUrl.getParameters());
- for (MetadataReport metadataReport : getMetadataReports()) {
+ for (Map.Entry<String, MetadataReport> entry : getMetadataReports().entrySet()) {
+ MetadataReport metadataReport = entry.getValue();
metadataReport.storeProviderMetadata(new MetadataIdentifier(providerUrl.getServiceInterface(),
providerUrl.getParameter(VERSION_KEY), providerUrl.getParameter(GROUP_KEY),
PROVIDER_SIDE, providerUrl.getParameter(APPLICATION_KEY)), fullServiceDefinition);
@@ -120,7 +127,7 @@ public class RemoteMetadataServiceImpl {
private void publishConsumer(URL consumerURL) throws RpcException {
final URL url = consumerURL.removeParameters(PID_KEY, TIMESTAMP_KEY, Constants.BIND_IP_KEY,
Constants.BIND_PORT_KEY, TIMESTAMP_KEY);
- getMetadataReports().forEach(config -> {
+ getMetadataReports().forEach((registryKey, config) -> {
config.storeConsumerMetadata(new MetadataIdentifier(url.getServiceInterface(),
url.getParameter(VERSION_KEY), url.getParameter(GROUP_KEY), CONSUMER_SIDE,
url.getParameter(APPLICATION_KEY)), url.getParameters());
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
new file mode 100644
index 0000000..8268361
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.integration;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.common.Version;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.NetUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
+import org.apache.dubbo.rpc.cluster.Configurator;
+import org.apache.dubbo.rpc.cluster.RouterChain;
+import org.apache.dubbo.rpc.cluster.RouterFactory;
+import org.apache.dubbo.rpc.cluster.directory.AbstractDirectory;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
+import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
+import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
+import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
+
+
+/**
+ * RegistryDirectory
+ */
+public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
+
+ private static final Logger logger = LoggerFactory.getLogger(DynamicDirectory.class);
+
+ protected static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
+
+ protected static final RouterFactory ROUTER_FACTORY = ExtensionLoader.getExtensionLoader(RouterFactory.class)
+ .getAdaptiveExtension();
+
+ protected final String serviceKey; // Initialization at construction time, assertion not null
+ protected final Class<T> serviceType; // Initialization at construction time, assertion not null
+ protected final URL directoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
+ protected final boolean multiGroup;
+ protected Protocol protocol; // Initialization at the time of injection, the assertion is not null
+ protected Registry registry; // Initialization at the time of injection, the assertion is not null
+ protected volatile boolean forbidden = false;
+ protected boolean shouldRegister;
+ protected boolean shouldSimplified;
+
+ protected volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
+
+ protected volatile URL registeredConsumerUrl;
+
+ /**
+ * override rules
+ * Priority: override>-D>consumer>provider
+ * Rule one: for a certain provider <ip:port,timeout=100>
+ * Rule two: for all providers <* ,timeout=5000>
+ */
+ protected volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+
+ // Map<url, Invoker> cache service url to invoker mapping.
+ protected volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+ protected volatile List<Invoker<T>> invokers;
+
+ // Set<invokerUrls> cache invokeUrls to invokers mapping.
+ protected volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+
+ protected ServiceInstancesChangedListener serviceListener;
+
+ public DynamicDirectory(Class<T> serviceType, URL url) {
+ super(url);
+ if (serviceType == null) {
+ throw new IllegalArgumentException("service type is null.");
+ }
+
+ shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);
+ shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);
+ if (url.getServiceKey() == null || url.getServiceKey().length() == 0) {
+ throw new IllegalArgumentException("registry serviceKey is null.");
+ }
+ this.serviceType = serviceType;
+ this.serviceKey = super.getConsumerUrl().getServiceKey();
+
+ this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
+ String group = directoryUrl.getParameter(GROUP_KEY, "");
+ this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
+ }
+
+ @Override
+ public void addServiceListener(ServiceInstancesChangedListener instanceListener) {
+ this.serviceListener = instanceListener;
+ }
+
+ private URL turnRegistryUrlToConsumerUrl(URL url) {
+ return URLBuilder.from(url)
+ .setPath(url.getServiceInterface())
+ .clearParameters()
+ .addParameters(queryMap)
+ .removeParameter(MONITOR_KEY)
+ .build();
+ }
+
+ public void setProtocol(Protocol protocol) {
+ this.protocol = protocol;
+ }
+
+ public void setRegistry(Registry registry) {
+ this.registry = registry;
+ }
+
+ public Registry getRegistry() {
+ return registry;
+ }
+
+ public boolean isShouldRegister() {
+ return shouldRegister;
+ }
+
+ public void subscribe(URL url) {
+ setConsumerUrl(url);
+ registry.subscribe(url, this);
+ }
+
+ public void unSubscribe(URL url) {
+ setConsumerUrl(null);
+ registry.unsubscribe(url, this);
+ }
+
+ @Override
+ public List<Invoker<T>> doList(Invocation invocation) {
+ if (forbidden) {
+ // 1. No service provider 2. Service providers are disabled
+ throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " +
+ getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " +
+ NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() +
+ ", please check status of providers(disabled, not registered or in blacklist).");
+ }
+
+ if (multiGroup) {
+ return this.invokers == null ? Collections.emptyList() : this.invokers;
+ }
+
+ List<Invoker<T>> invokers = null;
+ try {
+ // Get invokers from cache, only runtime routers will be executed.
+ invokers = routerChain.route(getConsumerUrl(), invocation);
+ } catch (Throwable t) {
+ logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
+ }
+
+ return invokers == null ? Collections.emptyList() : invokers;
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return serviceType;
+ }
+
+ @Override
+ public List<Invoker<T>> getAllInvokers() {
+ return invokers;
+ }
+
+ @Override
+ public URL getConsumerUrl() {
+ return this.overrideDirectoryUrl;
+ }
+
+ public URL getRegisteredConsumerUrl() {
+ return registeredConsumerUrl;
+ }
+
+ public void setRegisteredConsumerUrl(URL url) {
+ if (!shouldSimplified) {
+ this.registeredConsumerUrl = url.addParameters(CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY,
+ String.valueOf(false));
+ } else {
+ this.registeredConsumerUrl = URL.valueOf(url, DEFAULT_REGISTER_CONSUMER_KEYS, null).addParameters(
+ CATEGORY_KEY, CONSUMERS_CATEGORY, CHECK_KEY, String.valueOf(false));
+ }
+ }
+
+ @Override
+ public boolean isAvailable() {
+ if (isDestroyed()) {
+ return false;
+ }
+ Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
+ if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
+ for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
+ if (invoker.isAvailable()) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+
+ public void buildRouterChain(URL url) {
+ this.setRouterChain(RouterChain.buildChain(url));
+ }
+
+ /**
+ * Haomin: added for test purpose
+ */
+ public Map<URL, Invoker<T>> getUrlInvokerMap() {
+ return urlInvokerMap;
+ }
+
+ public List<Invoker<T>> getInvokers() {
+ return invokers;
+ }
+
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 54104e9..c8cd6dd 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -17,7 +17,6 @@
package org.apache.dubbo.registry.integration;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.Version;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.extension.ExtensionLoader;
@@ -30,20 +29,14 @@ import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.AddressListener;
import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.Registry;
-import org.apache.dubbo.registry.client.ServiceInstance;
-import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Protocol;
import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.cluster.Cluster;
import org.apache.dubbo.rpc.cluster.Configurator;
import org.apache.dubbo.rpc.cluster.Router;
import org.apache.dubbo.rpc.cluster.RouterChain;
-import org.apache.dubbo.rpc.cluster.RouterFactory;
-import org.apache.dubbo.rpc.cluster.directory.AbstractDirectory;
import org.apache.dubbo.rpc.cluster.directory.StaticDirectory;
import org.apache.dubbo.rpc.cluster.governance.GovernanceRuleRepository;
import org.apache.dubbo.rpc.cluster.support.ClusterUtils;
@@ -55,7 +48,6 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -63,15 +55,12 @@ import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
-import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
import static org.apache.dubbo.common.constants.CommonConstants.DISABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
import static org.apache.dubbo.common.constants.CommonConstants.ENABLED_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.APP_DYNAMIC_CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.COMPATIBLE_CONFIG_KEY;
@@ -84,149 +73,40 @@ import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATE
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTE_PROTOCOL;
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
-import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
-import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.ROUTER_KEY;
/**
* RegistryDirectory
*/
-public class RegistryDirectory<T> extends AbstractDirectory<T> implements NotifyListener {
-
+public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyListener {
private static final Logger logger = LoggerFactory.getLogger(RegistryDirectory.class);
- private static final Cluster CLUSTER = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension();
-
- private static final RouterFactory ROUTER_FACTORY = ExtensionLoader.getExtensionLoader(RouterFactory.class)
- .getAdaptiveExtension();
-
- private final String serviceKey; // Initialization at construction time, assertion not null
- private final Class<T> serviceType; // Initialization at construction time, assertion not null
- private final Map<String, String> queryMap; // Initialization at construction time, assertion not null
- private final URL directoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
- private final boolean multiGroup;
- private Protocol protocol; // Initialization at the time of injection, the assertion is not null
- private Registry registry; // Initialization at the time of injection, the assertion is not null
- private volatile boolean forbidden = false;
- private boolean shouldRegister;
- private boolean shouldSimplified;
-
- private volatile URL overrideDirectoryUrl; // Initialization at construction time, assertion not null, and always assign non null value
-
- private volatile URL registeredConsumerUrl;
-
- /**
- * override rules
- * Priority: override>-D>consumer>provider
- * Rule one: for a certain provider <ip:port,timeout=100>
- * Rule two: for all providers <* ,timeout=5000>
- */
- private volatile List<Configurator> configurators; // The initial value is null and the midway may be assigned to null, please use the local variable reference
-
- // Map<url, Invoker> cache service url to invoker mapping.
- private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
- private volatile List<Invoker<T>> invokers;
-
- // Set<invokerUrls> cache invokeUrls to invokers mapping.
- private volatile Set<URL> cachedInvokerUrls; // The initial value is null and the midway may be assigned to null, please use the local variable reference
-
private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
- private ReferenceConfigurationListener serviceConfigurationListener;
-
- private Set<ServiceInstancesChangedListener> serviceListeners;
+ private ReferenceConfigurationListener referenceConfigurationListener;
public RegistryDirectory(Class<T> serviceType, URL url) {
- super(url);
- if (serviceType == null) {
- throw new IllegalArgumentException("service type is null.");
- }
-
- shouldRegister = !ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(REGISTER_KEY, true);
- shouldSimplified = url.getParameter(SIMPLIFIED_KEY, false);
- if (url.getServiceKey() == null || url.getServiceKey().length() == 0) {
- throw new IllegalArgumentException("registry serviceKey is null.");
- }
- this.serviceType = serviceType;
- this.serviceKey = super.getConsumerUrl().getServiceKey();
- this.queryMap = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
- this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
- String group = directoryUrl.getParameter(GROUP_KEY, "");
- this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
-
- this.serviceListeners = new HashSet<>();
+ super(serviceType, url);
}
@Override
- public void addServiceListener(ServiceInstancesChangedListener instanceListener) {
- this.serviceListeners.add(instanceListener);
- }
-
- @Override
- public void notifyServiceInstances() {
- List<URL> urls = new LinkedList<>();
- for (ServiceInstancesChangedListener listener : serviceListeners) {
- List<ServiceInstance> instances = listener.getInstances(serviceKey);
- for (ServiceInstance instance : instances) {
- // FIXME, the right protocol? the right path?
- urls.add(
- instance.toURL(
- "dubbo",
- serviceType.getName(),
- serviceType.getName(),
- queryMap.get(GROUP_KEY),
- queryMap.get(VERSION_KEY),
- serviceKey)
- );
- }
- }
- // FIXME, filter out unmatched urls before notify: version, group, protocol, registry
- notify(urls);
- }
-
- private URL turnRegistryUrlToConsumerUrl(URL url) {
- return URLBuilder.from(url)
- .setPath(url.getServiceInterface())
- .clearParameters()
- .addParameters(queryMap)
- .removeParameter(MONITOR_KEY)
- .build();
- }
-
- public void setProtocol(Protocol protocol) {
- this.protocol = protocol;
- }
-
- public void setRegistry(Registry registry) {
- this.registry = registry;
- }
-
- public Registry getRegistry() {
- return registry;
- }
-
- public boolean isShouldRegister() {
- return shouldRegister;
- }
-
public void subscribe(URL url) {
setConsumerUrl(url);
CONSUMER_CONFIGURATION_LISTENER.addNotifyListener(this);
- serviceConfigurationListener = new ReferenceConfigurationListener(this, url);
+ referenceConfigurationListener = new ReferenceConfigurationListener(this, url);
registry.subscribe(url, this);
}
+ @Override
public void unSubscribe(URL url) {
setConsumerUrl(null);
CONSUMER_CONFIGURATION_LISTENER.removeNotifyListener(this);
- serviceConfigurationListener.stop();
+ referenceConfigurationListener.stop();
registry.unsubscribe(url, this);
}
-
@Override
public void destroy() {
if (isDestroyed()) {
@@ -317,7 +197,6 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
*
* @param invokerUrls this parameter can't be null
*/
- // TODO: 2017/8/31 FIXME The thread pool should be used to refresh the address, otherwise the task may be accumulated.
private void refreshInvoker(List<URL> invokerUrls) {
Assert.notNull(invokerUrls, "invokerUrls should not be null");
@@ -330,7 +209,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
- Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (invokerUrls == Collections.<URL>emptyList()) {
invokerUrls = new ArrayList<>();
}
@@ -343,7 +222,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
if (invokerUrls.isEmpty()) {
return;
}
- Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
+ Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
/**
* If the calculation is wrong, it is not processed.
@@ -435,12 +314,12 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
* @param urls
* @return invokers
*/
- private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
- Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+ private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
+ Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
- Set<String> keys = new HashSet<>();
+ Set<URL> keys = new HashSet<>();
String queryProtocols = this.queryMap.get(PROTOCOL_KEY);
for (URL providerUrl : urls) {
// If protocol is configured at the reference side, only the matching protocol is selected
@@ -469,14 +348,13 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
}
URL url = mergeUrl(providerUrl);
- String key = url.toFullString(); // The parameter urls are sorted
- if (keys.contains(key)) { // Repeated url
+ if (keys.contains(url)) { // Repeated url
continue;
}
- keys.add(key);
+ keys.add(url);
// Cache key is url that does not merge with consumer side parameters, regardless of how the consumer combines parameters, if the server url changes, then refer again
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
- Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key);
+ Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(url);
if (invoker == null) { // Not in the cache, refer again
try {
boolean enabled = true;
@@ -492,10 +370,10 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
- newUrlInvokerMap.put(key, invoker);
+ newUrlInvokerMap.put(url, invoker);
}
} else {
- newUrlInvokerMap.put(key, invoker);
+ newUrlInvokerMap.put(url, invoker);
}
}
keys.clear();
@@ -545,8 +423,8 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
providerUrl = overrideWithConfigurators(CONSUMER_CONFIGURATION_LISTENER.getConfigurators(), providerUrl);
// override url with configurator from configurators from "service-name.configurators"
- if (serviceConfigurationListener != null) {
- providerUrl = overrideWithConfigurators(serviceConfigurationListener.getConfigurators(), providerUrl);
+ if (referenceConfigurationListener != null) {
+ providerUrl = overrideWithConfigurators(referenceConfigurationListener.getConfigurators(), providerUrl);
}
return providerUrl;
@@ -565,7 +443,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
* Close all invokers
*/
private void destroyAllInvokers() {
- Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
if (localUrlInvokerMap != null) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
try {
@@ -586,16 +464,16 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
* @param oldUrlInvokerMap
* @param newUrlInvokerMap
*/
- private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
+ private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
// check deleted invoker
- List<String> deleted = null;
+ List<URL> deleted = null;
if (oldUrlInvokerMap != null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
- for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
+ for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<>();
@@ -606,7 +484,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
}
if (deleted != null) {
- for (String url : deleted) {
+ for (URL url : deleted) {
if (url != null) {
Invoker<T> invoker = oldUrlInvokerMap.remove(url);
if (invoker != null) {
@@ -683,7 +561,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
if (isDestroyed()) {
return false;
}
- Map<String, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
+ Map<URL, Invoker<T>> localUrlInvokerMap = urlInvokerMap;
if (localUrlInvokerMap != null && localUrlInvokerMap.size() > 0) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
if (invoker.isAvailable()) {
@@ -701,7 +579,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
/**
* Haomin: added for test purpose
*/
- public Map<String, Invoker<T>> getUrlInvokerMap() {
+ public Map<URL, Invoker<T>> getUrlInvokerMap() {
return urlInvokerMap;
}
@@ -733,8 +611,8 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
doOverrideUrl(localConfigurators);
List<Configurator> localAppDynamicConfigurators = CONSUMER_CONFIGURATION_LISTENER.getConfigurators(); // local reference
doOverrideUrl(localAppDynamicConfigurators);
- if (serviceConfigurationListener != null) {
- List<Configurator> localDynamicConfigurators = serviceConfigurationListener.getConfigurators(); // local reference
+ if (referenceConfigurationListener != null) {
+ List<Configurator> localDynamicConfigurators = referenceConfigurationListener.getConfigurators(); // local reference
doOverrideUrl(localDynamicConfigurators);
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryInvokerWrapper.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryInvokerWrapper.java
index b7e03c4..c6ce46f 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryInvokerWrapper.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryInvokerWrapper.java
@@ -22,17 +22,18 @@ import org.apache.dubbo.rpc.Invoker;
import org.apache.dubbo.rpc.Result;
import org.apache.dubbo.rpc.RpcException;
import org.apache.dubbo.rpc.cluster.Cluster;
-import org.apache.dubbo.rpc.cluster.ClusterInvoker;
-class RegistryInvokerWrapper<T> implements ClusterInvoker<T> {
- private RegistryDirectory<T> directory;
+class RegistryInvokerWrapper<T> implements Invoker<T> {
+ private DynamicDirectory<T> directory;
private Cluster cluster;
private Invoker<T> invoker;
+ private URL url;
- public RegistryInvokerWrapper(RegistryDirectory<T> directory, Cluster cluster, Invoker<T> invoker) {
+ public RegistryInvokerWrapper(DynamicDirectory<T> directory, Cluster cluster, Invoker<T> invoker, URL url) {
this.directory = directory;
this.cluster = cluster;
this.invoker = invoker;
+ this.url = url;
}
@Override
@@ -47,14 +48,18 @@ class RegistryInvokerWrapper<T> implements ClusterInvoker<T> {
@Override
public URL getUrl() {
- return invoker.getUrl();
+ return url;
+ }
+
+ public void setUrl(URL url) {
+ this.url = url;
}
public void setInvoker(Invoker<T> invoker) {
this.invoker = invoker;
}
- public RegistryDirectory<T> getDirectory() {
+ public DynamicDirectory<T> getDirectory() {
return directory;
}
@@ -71,9 +76,4 @@ class RegistryInvokerWrapper<T> implements ClusterInvoker<T> {
public void destroy() {
invoker.destroy();
}
-
- @Override
- public URL getRegistryUrl() {
- return directory.getUrl();
- }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
index 9a4acc8..976d0da 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
@@ -454,8 +454,9 @@ public class RegistryProtocol implements Protocol {
return doRefer(cluster, registry, type, url);
}
- private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
- RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url);
+ protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
+ // FIXME, SPI extension, support prototype instance
+ DynamicDirectory<T> directory = createDirectory(type, url);
directory.setRegistry(registry);
directory.setProtocol(protocol);
// all attributes of REFER_KEY
@@ -481,6 +482,10 @@ public class RegistryProtocol implements Protocol {
return registryInvokerWrapper;
}
+ protected <T> DynamicDirectory<T> createDirectory(Class<T> type, URL url) {
+ return new RegistryDirectory<T>(type, url);
+ }
+
public <T> void reRefer(Invoker<T> invoker, URL newSubscribeUrl) {
if (!(invoker instanceof RegistryInvokerWrapper)) {
return;
@@ -488,7 +493,7 @@ public class RegistryProtocol implements Protocol {
RegistryInvokerWrapper<T> invokerWrapper = (RegistryInvokerWrapper<T>) invoker;
URL oldSubscribeUrl = invokerWrapper.getUrl();
- RegistryDirectory<T> directory = invokerWrapper.getDirectory();
+ DynamicDirectory<T> directory = invokerWrapper.getDirectory();
Registry registry = directory.getRegistry();
registry.unregister(directory.getRegisteredConsumerUrl());
directory.unSubscribe(toSubscribeUrl(oldSubscribeUrl));
diff --git a/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.RegistryClusterIdentifier b/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.RegistryClusterIdentifier
new file mode 100644
index 0000000..2b435cc
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.RegistryClusterIdentifier
@@ -0,0 +1 @@
+default=org.apache.dubbo.registry.client.DefaultRegistryClusterIdentifier
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.integration.RegistryProtocolListener b/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.integration.RegistryProtocolListener
new file mode 100644
index 0000000..d60633c
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.integration.RegistryProtocolListener
@@ -0,0 +1 @@
+service-discovery=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryProtocolListener
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
index b05f1d8..0c330aa 100644
--- a/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscovery.java
@@ -161,6 +161,11 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
return StringUtils.splitToList(value, COMMA_SEPARATOR_CHAR);
}
+ @Override
+ public URL getUrl() {
+ return url;
+ }
+
private List<String> getRegisteringTags(URL url) {
List<String> tags = new ArrayList<>();
String rawTag = url.getParameter(REGISTER_TAG);
@@ -190,7 +195,7 @@ public class ConsulServiceDiscovery implements ServiceDiscovery, EventListener<S
@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
if (notifier == null) {
- String serviceName = listener.getServiceName();
+ String serviceName = listener.getServiceNames();
Response<List<HealthService>> response = getHealthServices(serviceName, -1, buildWatchTimeout());
Long consulIndex = response.getConsulIndex();
notifier = new ConsulNotifier(serviceName, consulIndex);
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java
similarity index 65%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
copy to dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java
index ba4b5b8..bd77db8 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
+++ b/dubbo-registry/dubbo-registry-consul/src/main/java/org/apache/dubbo/registry/consul/ConsulServiceDiscoveryFactory.java
@@ -14,10 +14,17 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.registry.consul;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class ConsulServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+
+ @Override
+ protected ServiceDiscovery createDiscovery(URL registryURL) {
+ return new ConsulServiceDiscovery();
+ }
-/**
- *
- */
-public class AbstractPrefixConfigurationTest {
}
diff --git a/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
deleted file mode 100644
index e46973b..0000000
--- a/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscovery
+++ /dev/null
@@ -1 +0,0 @@
-consul=org.apache.dubbo.registry.consul.ConsulServiceDiscovery
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..a0f1252
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-consul/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+consul=org.apache.dubbo.registry.consul.ConsulServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
index c38a7f2..9cc954e 100644
--- a/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
+++ b/dubbo-registry/dubbo-registry-default/src/test/java/org/apache/dubbo/registry/dubbo/RegistryDirectoryTest.java
@@ -440,7 +440,7 @@ public class RegistryDirectoryTest {
registryDirectory.destroy();
List<Invoker<RegistryDirectoryTest>> cachedInvokers = registryDirectory.getInvokers();
- Map<String, Invoker<RegistryDirectoryTest>> urlInvokerMap = registryDirectory.getUrlInvokerMap();
+ Map<URL, Invoker<RegistryDirectoryTest>> urlInvokerMap = registryDirectory.getUrlInvokerMap();
Assertions.assertNull(cachedInvokers);
Assertions.assertEquals(0, urlInvokerMap.size());
diff --git a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
index a26ebc6..7916de2 100644
--- a/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-etcd3/src/main/java/org/apache/dubbo/registry/etcd/EtcdServiceDiscovery.java
@@ -158,7 +158,7 @@ public class EtcdServiceDiscovery implements ServiceDiscovery, EventListener<Ser
@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener) throws NullPointerException, IllegalArgumentException {
- registerServiceWatcher(listener.getServiceName());
+ registerServiceWatcher(listener.getServiceNames());
}
@Override
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
index b84bfa9..91de49d 100644
--- a/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscovery.java
@@ -54,10 +54,13 @@ public class NacosServiceDiscovery implements ServiceDiscovery {
private NamingService namingService;
+ private URL registryURL;
+
@Override
public void initialize(URL registryURL) throws Exception {
this.namingService = createNamingService(registryURL);
this.group = getGroup(registryURL);
+ this.registryURL = registryURL;
}
@Override
@@ -109,7 +112,7 @@ public class NacosServiceDiscovery implements ServiceDiscovery {
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
execute(namingService, service -> {
- service.subscribe(listener.getServiceName(), e -> { // Register Nacos EventListener
+ service.subscribe(listener.getServiceNames(), e -> { // Register Nacos EventListener
if (e instanceof NamingEvent) {
NamingEvent event = (NamingEvent) e;
handleEvent(event, listener);
@@ -118,6 +121,11 @@ public class NacosServiceDiscovery implements ServiceDiscovery {
});
}
+ @Override
+ public URL getUrl() {
+ return registryURL;
+ }
+
private void handleEvent(NamingEvent event, ServiceInstancesChangedListener listener) {
String serviceName = event.getServiceName();
List<ServiceInstance> serviceInstances = event.getInstances()
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscoveryFactory.java
similarity index 65%
copy from dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
copy to dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscoveryFactory.java
index ba4b5b8..c9c7fca 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/java/org/apache/dubbo/registry/nacos/NacosServiceDiscoveryFactory.java
@@ -14,10 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.registry.nacos;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
/**
*
*/
-public class AbstractPrefixConfigurationTest {
+public class NacosServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+
+ @Override
+ protected ServiceDiscovery createDiscovery(URL registryURL) {
+ return new NacosServiceDiscovery();
+ }
+
}
diff --git a/dubbo-registry/dubbo-registry-nacos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory b/dubbo-registry/dubbo-registry-nacos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
new file mode 100644
index 0000000..4fc3f4a
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-nacos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.registry.client.ServiceDiscoveryFactory
@@ -0,0 +1 @@
+nacos=org.apache.dubbo.registry.nacos.NacosServiceDiscoveryFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 037b5af..29e6790 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -24,10 +24,8 @@ import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.common.utils.DefaultPage;
import org.apache.dubbo.common.utils.Page;
import org.apache.dubbo.event.EventDispatcher;
-import org.apache.dubbo.event.EventListener;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
-import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.curator.framework.CuratorFramework;
@@ -52,10 +50,12 @@ import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.bui
* Zookeeper {@link ServiceDiscovery} implementation based on
* <a href="https://curator.apache.org/curator-x-discovery/index.html">Apache Curator X Discovery</a>
*/
-public class ZookeeperServiceDiscovery implements ServiceDiscovery, EventListener<ServiceInstancesChangedEvent> {
+public class ZookeeperServiceDiscovery implements ServiceDiscovery {
private final Logger logger = LoggerFactory.getLogger(getClass());
+ private URL registryURL;
+
private EventDispatcher dispatcher;
private CuratorFramework curatorFramework;
@@ -71,14 +71,18 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery, EventListene
@Override
public void initialize(URL registryURL) throws Exception {
- this.dispatcher = EventDispatcher.getDefaultExtension();
- this.dispatcher.addEventListener(this);
+ this.registryURL = registryURL;
this.curatorFramework = buildCuratorFramework(registryURL);
this.rootPath = ROOT_PATH.getParameterValue(registryURL);
this.serviceDiscovery = buildServiceDiscovery(curatorFramework, rootPath);
this.serviceDiscovery.start();
}
+ @Override
+ public URL getUrl() {
+ return registryURL;
+ }
+
public void destroy() throws Exception {
serviceDiscovery.close();
}
@@ -147,7 +151,7 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery, EventListene
@Override
public void addServiceInstancesChangedListener(ServiceInstancesChangedListener listener)
throws NullPointerException, IllegalArgumentException {
- registerServiceWatcher(listener.getServiceName());
+ listener.getServiceNames().forEach(serviceName -> registerServiceWatcher(serviceName, listener));
}
private void doInServiceRegistry(ThrowableConsumer<org.apache.curator.x.discovery.ServiceDiscovery> consumer) {
@@ -160,10 +164,10 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery, EventListene
return execute(serviceDiscovery, function);
}
- protected void registerServiceWatcher(String serviceName) {
+ protected void registerServiceWatcher(String serviceName, ServiceInstancesChangedListener listener) {
String path = buildServicePath(serviceName);
CuratorWatcher watcher = watcherCaches.computeIfAbsent(path, key ->
- new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName));
+ new ZookeeperServiceDiscoveryChangeWatcher(this, serviceName, listener));
try {
curatorFramework.getChildren().usingWatcher(watcher).forPath(path);
} catch (KeeperException.NoNodeException e) {
@@ -179,11 +183,4 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery, EventListene
private String buildServicePath(String serviceName) {
return rootPath + "/" + serviceName;
}
-
- @Override
- public void onEvent(ServiceInstancesChangedEvent event) {
- String serviceName = event.getServiceName();
- // re-register again
- registerServiceWatcher(serviceName);
- }
}
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
index 325c80c..5ee429a 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryChangeWatcher.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.registry.zookeeper;
import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.curator.framework.api.CuratorWatcher;
import org.apache.zookeeper.WatchedEvent;
@@ -34,15 +35,18 @@ import static org.apache.zookeeper.Watcher.Event.EventType.NodeDataChanged;
* @since 2.7.5
*/
public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {
+ private ServiceInstancesChangedListener listener;
private final ZookeeperServiceDiscovery zookeeperServiceDiscovery;
private final String serviceName;
public ZookeeperServiceDiscoveryChangeWatcher(ZookeeperServiceDiscovery zookeeperServiceDiscovery,
- String serviceName) {
+ String serviceName,
+ ServiceInstancesChangedListener listener) {
this.zookeeperServiceDiscovery = zookeeperServiceDiscovery;
this.serviceName = serviceName;
+ this.listener = listener;
}
@Override
@@ -51,6 +55,8 @@ public class ZookeeperServiceDiscoveryChangeWatcher implements CuratorWatcher {
Watcher.Event.EventType eventType = event.getType();
if (NodeChildrenChanged.equals(eventType) || NodeDataChanged.equals(eventType)) {
+ listener.onEvent(new ServiceInstancesChangedEvent(serviceName, zookeeperServiceDiscovery.getInstances(serviceName)));
+ zookeeperServiceDiscovery.registerServiceWatcher(serviceName, listener);
zookeeperServiceDiscovery.dispatchServiceInstancesChangedEvent(serviceName);
}
}
diff --git a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryFactory.java
similarity index 64%
rename from dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
rename to dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryFactory.java
index ba4b5b8..9db8235 100644
--- a/dubbo-common/src/test/java/org/apache/dubbo/common/config/AbstractPrefixConfigurationTest.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscoveryFactory.java
@@ -14,10 +14,16 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.common.config;
+package org.apache.dubbo.registry.zookeeper;
-/**
- *
- */
-public class AbstractPrefixConfigurationTest {
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.AbstractServiceDiscoveryFactory;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+
+public class ZookeeperServiceDiscoveryFactory extends AbstractServiceDiscoveryFactory {
+
+ @Override
+ protected ServiceDiscovery createDiscovery(URL registryURL) {
+ return new ZookeeperServiceDiscovery();
+ }
}
diff --git a/dubbo-registry/pom.xml b/dubbo-registry/pom.xml
index d141b29..5ab688a 100644
--- a/dubbo-registry/pom.xml
+++ b/dubbo-registry/pom.xml
@@ -31,15 +31,15 @@
</properties>
<modules>
<module>dubbo-registry-api</module>
- <module>dubbo-registry-default</module>
- <module>dubbo-registry-multicast</module>
+ <!-- <module>dubbo-registry-default</module>-->
+ <!-- <module>dubbo-registry-multicast</module>-->
<module>dubbo-registry-zookeeper</module>
- <module>dubbo-registry-redis</module>
- <module>dubbo-registry-consul</module>
- <module>dubbo-registry-etcd3</module>
- <module>dubbo-registry-nacos</module>
- <module>dubbo-registry-multiple</module>
- <module>dubbo-registry-sofa</module>
- <module>dubbo-registry-eureka</module>
+ <!-- <module>dubbo-registry-redis</module>-->
+ <!-- <module>dubbo-registry-consul</module>-->
+ <!-- <module>dubbo-registry-etcd3</module>-->
+ <!-- <module>dubbo-registry-nacos</module>-->
+ <!-- <module>dubbo-registry-multiple</module>-->
+ <!-- <module>dubbo-registry-sofa</module>-->
+ <!-- <module>dubbo-registry-eureka</module>-->
</modules>
</project>
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
index 66ac06f..2c3d2b8 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcContext.java
@@ -34,8 +34,12 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Future;
import static org.apache.dubbo.common.constants.CommonConstants.CONSUMER_SIDE;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.Constants.ASYNC_KEY;
import static org.apache.dubbo.rpc.Constants.RETURN_KEY;
@@ -795,4 +799,79 @@ public class RpcContext {
return asyncContext;
}
+ // RPC service context updated before each service call.
+ private String group;
+ private String version;
+ private String interfaceName;
+ private String protocol;
+ private String serviceKey;
+ private String protocolServiceKey;
+ private URL consumerUrl;
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getInterfaceName() {
+ return interfaceName;
+ }
+
+ public void setInterfaceName(String interfaceName) {
+ this.interfaceName = interfaceName;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public String getServiceKey() {
+ return serviceKey;
+ }
+
+ public void setServiceKey(String serviceKey) {
+ this.serviceKey = serviceKey;
+ }
+
+ public String getProtocolServiceKey() {
+ return protocolServiceKey;
+ }
+
+ public void setProtocolServiceKey(String protocolServiceKey) {
+ this.protocolServiceKey = protocolServiceKey;
+ }
+
+ public URL getConsumerUrl() {
+ return consumerUrl;
+ }
+
+ public void setConsumerUrl(URL consumerUrl) {
+ this.consumerUrl = consumerUrl;
+ }
+
+ public static void setRpcContext(URL url) {
+ RpcContext rpcContext = RpcContext.getContext();
+ rpcContext.setConsumerUrl(url);
+ rpcContext.setInterfaceName(url.getServiceInterface());
+ rpcContext.setVersion(url.getParameter(VERSION_KEY));
+ rpcContext.setGroup(url.getParameter(GROUP_KEY));
+ rpcContext.setProtocol(url.getParameter(PROTOCOL_KEY, DUBBO));
+ rpcContext.setServiceKey(url.getServiceKey());
+ rpcContext.setProtocolServiceKey(url.getProtocolServiceKey());
+ }
}
\ No newline at end of file
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
index 9041975..b5c9e67 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
import org.apache.dubbo.rpc.Constants;
import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.RpcContext;
import org.apache.dubbo.rpc.RpcInvocation;
import org.apache.dubbo.rpc.model.ApplicationModel;
import org.apache.dubbo.rpc.model.ConsumerModel;
@@ -65,7 +66,9 @@ public class InvokerInvocationHandler implements InvocationHandler {
RpcInvocation rpcInvocation = new RpcInvocation(method, invoker.getInterface().getName(), args);
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
-
+
+ RpcContext.setRpcContext(invoker.getUrl());
+
if (consumerModel != null) {
rpcInvocation.put(Constants.CONSUMER_MODEL, consumerModel);
rpcInvocation.put(Constants.METHOD_MODEL, consumerModel.getMethodModel(method));