You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/04 07:09:39 UTC
[dubbo] 02/27: service instance subscription
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 20cf141f2e8588e1695c37c14b4d89a268b96452
Author: ken.lj <ke...@gmail.com>
AuthorDate: Thu Jun 4 11:52:08 2020 +0800
service instance subscription
---
.../org/apache/dubbo/metadata/MetadataInfo.java | 181 +++++++
.../org/apache/dubbo/registry/NotifyListener.java | 6 +
.../registry/client/DefaultServiceInstance.java | 18 +
.../dubbo/registry/client/InstanceAddressURL.java | 132 +++++
.../dubbo/registry/client/ServiceDiscovery.java | 3 +-
.../registry/client/ServiceDiscoveryRegistry.java | 592 ++-------------------
.../dubbo/registry/client/ServiceInstance.java | 4 +
.../client/event/ServiceInstancesChangedEvent.java | 12 +-
.../listener/ServiceInstancesChangedListener.java | 113 +++-
.../registry/integration/RegistryDirectory.java | 33 ++
10 files changed, 544 insertions(+), 550 deletions(-)
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
new file mode 100644
index 0000000..20933fb
--- /dev/null
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -0,0 +1,181 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.metadata;
+
+import org.apache.dubbo.common.URL;
+
+import java.io.Serializable;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+public class MetadataInfo implements Serializable {
+ private String app;
+ private String revision;
+ private Map<String, ServiceInfo> services;
+
+ public MetadataInfo() {
+ }
+
+ public MetadataInfo(String app, String revision, Map<String, ServiceInfo> services) {
+ this.app = app;
+ this.revision = revision;
+ this.services = services == null ? new HashMap<>() : services;
+ }
+
+ public String getApp() {
+ return app;
+ }
+
+ public void setApp(String app) {
+ this.app = app;
+ }
+
+ public String getRevision() {
+ return revision;
+ }
+
+ public void setRevision(String revision) {
+ this.revision = revision;
+ }
+
+ public Map<String, ServiceInfo> getServices() {
+ return services;
+ }
+
+ public void setServices(Map<String, ServiceInfo> services) {
+ this.services = services;
+ }
+
+ public ServiceInfo getServiceInfo(String serviceKey) {
+ return services.get(serviceKey);
+ }
+
+ public String getParameter(String key, String serviceKey) {
+ ServiceInfo serviceInfo = services.get(serviceKey);
+ if (serviceInfo == null) {
+ return null;
+ }
+ return serviceInfo.getParameter(key);
+ }
+
+ public Map<String, String> getParameters(String serviceKey) {
+ ServiceInfo serviceInfo = services.get(serviceKey);
+ if (serviceInfo == null) {
+ return Collections.emptyMap();
+ }
+ return serviceInfo.getParams();
+ }
+
+ public static class ServiceInfo implements Serializable {
+ private String name;
+ private String group;
+ private String version;
+ private String protocol;
+ private String registry;
+ private Map<String, String> params;
+
+ private transient Map<String, Map<String, String>> methodParams;
+ private transient String serviceKey;
+
+ public ServiceInfo() {
+ }
+
+ public ServiceInfo(String name, String group, String version, String protocol, String registry, Map<String, String> params) {
+ this.name = name;
+ this.group = group;
+ this.version = version;
+ this.protocol = protocol;
+ this.registry = registry;
+ this.params = params == null ? new HashMap<>() : params;
+
+ this.serviceKey = URL.buildKey(name, group, version);
+ }
+
+ public String getServiceKey() {
+ return serviceKey;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public void setGroup(String group) {
+ this.group = group;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ public void setVersion(String version) {
+ this.version = version;
+ }
+
+ public String getProtocol() {
+ return protocol;
+ }
+
+ public void setProtocol(String protocol) {
+ this.protocol = protocol;
+ }
+
+ public String getRegistry() {
+ return registry;
+ }
+
+ public void setRegistry(String registry) {
+ this.registry = registry;
+ }
+
+ public Map<String, String> getParams() {
+ if (params == null) {
+ return Collections.emptyMap();
+ }
+ return params;
+ }
+
+ public void setParams(Map<String, String> params) {
+ this.params = params;
+ }
+
+ public String getParameter(String key) {
+ return params.get(key);
+ }
+
+ public String getMethodParameter(String method, String key, String defaultValue) {
+ if (methodParams == null) {
+ methodParams = URL.toMethodParameters(params);
+ }
+
+ Map<String, String> keyMap = methodParams.get(method);
+ String value = null;
+ if (keyMap != null) {
+ value = keyMap.get(key);
+ }
+ return value == null ? defaultValue : value;
+ }
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
index 0e41cc9..5b54000 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/NotifyListener.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.registry;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import java.util.List;
@@ -41,4 +42,9 @@ public interface NotifyListener {
*/
void notify(List<URL> urls);
+ default void addServiceListener(ServiceInstancesChangedListener instanceListener) {
+ }
+
+ default void notifyServiceInstances() {
+ }
}
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index 366f607..6c88e8c 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -16,6 +16,9 @@
*/
package org.apache.dubbo.registry.client;
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.metadata.MetadataInfo;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
@@ -43,6 +46,8 @@ public class DefaultServiceInstance implements ServiceInstance {
private Map<String, String> metadata = new HashMap<>();
+ private MetadataInfo serviceMetadata;
+
public DefaultServiceInstance() {
}
@@ -125,6 +130,19 @@ public class DefaultServiceInstance implements ServiceInstance {
this.metadata = metadata;
}
+ public MetadataInfo getServiceMetadata() {
+ return serviceMetadata;
+ }
+
+ public void setServiceMetadata(MetadataInfo serviceMetadata) {
+ this.serviceMetadata = serviceMetadata;
+ }
+
+ @Override
+ public URL toURL(String protocol, String path, String interfaceName, String group, String version, String serviceKey) {
+ return new InstanceAddressURL(protocol, host, port, path, interfaceName, group, version, serviceKey);
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) return true;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
new file mode 100644
index 0000000..a4ece68
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.metadata.MetadataInfo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+
+public class InstanceAddressURL extends URL {
+
+ private MetadataInfo metadataInfo; // points to metadata of one revision
+ private String interfaceName;
+ private String group;
+ private String version;
+ private String serviceKey;
+
+ public InstanceAddressURL(String protocol, String host, int port, String path, String interfaceName, String group, String version, String serviceKey) {
+ super(protocol, host, port, path);
+ this.interfaceName = interfaceName;
+ this.group = group;
+ this.version = version;
+ this.serviceKey = serviceKey;
+ }
+
+ @Override
+ public String getServiceInterface() {
+ return interfaceName;
+ }
+
+ public String getGroup() {
+ return group;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public String getServiceKey() {
+ return serviceKey;
+ }
+
+ @Override
+ public String getParameter(String key) {
+ if (VERSION_KEY.equals(key)) {
+ return getVersion();
+ } else if (GROUP_KEY.equals(key)) {
+ return getGroup();
+ } else if (INTERFACE_KEY.equals(key)) {
+ return getServiceInterface();
+ }
+
+ String value = super.getParameter(key);
+ if (StringUtils.isEmpty(value)) {
+ value = metadataInfo.getParameter(key, this.getServiceKey());
+ }
+ return value;
+ }
+
+ @Override
+ public String getParameter(String key, String defaultValue) {
+ if (VERSION_KEY.equals(key)) {
+ return getVersion();
+ } else if (GROUP_KEY.equals(key)) {
+ return getGroup();
+ } else if (INTERFACE_KEY.equals(key)) {
+ return getServiceInterface();
+ }
+
+ String value = getParameter(key);
+ if (StringUtils.isEmpty(value)) {
+ return defaultValue;
+ }
+ return value;
+ }
+
+ @Override
+ public String getMethodParameter(String method, String key) {
+ Map<String, Map<String, String>> instanceMethodParams = super.getMethodParameters();
+ Map<String, String> keyMap = instanceMethodParams.get(method);
+ String value = null;
+ if (keyMap != null) {
+ value = keyMap.get(key);
+ }
+
+ MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getServiceKey());
+ value = serviceInfo.getMethodParameter(method, key, value);
+ return value;
+ }
+
+ @Override
+ public Map<String, String> getParameters() {
+ Map<String, String> instanceParams = super.getParameters();
+ Map<String, String> metadataParams = metadataInfo.getParameters(getServiceKey());
+ int i = instanceParams == null ? 0 : instanceParams.size();
+ int j = metadataParams == null ? 0 : metadataParams.size();
+ Map<String, String> params = new HashMap<>((int) ((i + j) / 0.75) + 1);
+ if (instanceParams != null) {
+ params.putAll(instanceParams);
+ }
+ if (metadataParams != null) {
+ params.putAll(metadataParams);
+ }
+ return params;
+ }
+
+ public void setMetadata(MetadataInfo metadataInfo) {
+ this.metadataInfo = metadataInfo;
+ }
+
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index f2d428a..bc73cd0 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -27,7 +27,6 @@ import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-import java.util.Collection;
import java.util.LinkedHashMap;
import java.util.LinkedList;
import java.util.List;
@@ -247,7 +246,7 @@ public interface ServiceDiscovery extends Prioritized {
* @param serviceName the name of service whose service instances have been changed
* @param serviceInstances the service instances have been changed
*/
- default void dispatchServiceInstancesChangedEvent(String serviceName, Collection<ServiceInstance> serviceInstances) {
+ default void dispatchServiceInstancesChangedEvent(String serviceName, List<ServiceInstance> serviceInstances) {
dispatchServiceInstancesChangedEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index db03a3c..6232850 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -17,71 +17,54 @@
package org.apache.dubbo.registry.client;
import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.extension.SPI;
import org.apache.dubbo.common.logger.Logger;
import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
-import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
import org.apache.dubbo.registry.client.metadata.SubscribedURLsSynthesizer;
-import org.apache.dubbo.registry.client.metadata.proxy.MetadataServiceProxyFactory;
-import org.apache.dubbo.registry.client.selector.ServiceInstanceSelector;
import org.apache.dubbo.registry.support.FailbackRegistry;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
-import java.util.HashSet;
+import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
-import java.util.SortedSet;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-import java.util.function.Supplier;
import java.util.stream.Collectors;
import static java.lang.String.format;
-import static java.util.Collections.emptyList;
-import static org.apache.dubbo.common.URLBuilder.from;
-import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SEPARATOR_CHAR;
+import static java.util.Collections.emptySet;
+import static java.util.Collections.unmodifiableSet;
+import static java.util.stream.Collectors.toSet;
+import static java.util.stream.Stream.of;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_PROTOCOL;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.PID_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROVIDER_SIDE;
import static org.apache.dubbo.common.constants.CommonConstants.SIDE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.EMPTY_PROTOCOL;
import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDED_BY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_TYPE;
import static org.apache.dubbo.common.constants.RegistryConstants.SUBSCRIBED_SERVICE_NAMES_KEY;
-import static org.apache.dubbo.common.extension.ExtensionLoader.getExtensionLoader;
import static org.apache.dubbo.common.function.ThrowableAction.execute;
import static org.apache.dubbo.common.utils.CollectionUtils.isEmpty;
-import static org.apache.dubbo.common.utils.CollectionUtils.isNotEmpty;
-import static org.apache.dubbo.common.utils.StringUtils.splitToSet;
-import static org.apache.dubbo.metadata.MetadataService.toURLs;
+import static org.apache.dubbo.common.utils.StringUtils.isBlank;
import static org.apache.dubbo.registry.client.ServiceDiscoveryFactory.getExtension;
-import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getMetadataStorageType;
-import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getProtocolPort;
/**
* Being different to the traditional registry, {@link ServiceDiscoveryRegistry} that is a new service-oriented
@@ -104,7 +87,7 @@ import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataU
* {@link ServiceNameMapping} will help to figure out one or more services that exported correlative Dubbo services. If
* the service names can be found, the exported {@link URL URLs} will be get from the remote {@link MetadataService}
* being deployed on all {@link ServiceInstance instances} of services. The whole process runs under the
- * {@link #subscribeURLs(URL, List, String, Collection)} method. It's very expensive to invoke
+ * {@link #subscribeURLs(URL, NotifyListener, String, Collection)} method. It's very expensive to invoke
* {@link MetadataService} for each {@link ServiceInstance service instance}, thus {@link ServiceDiscoveryRegistry}
* introduces a cache to optimize the calculation with "revisions". If the revisions of N
* {@link ServiceInstance service instances} are same, {@link MetadataService} is invoked just only once, and then it
@@ -126,7 +109,7 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
private final ServiceDiscovery serviceDiscovery;
- private Set<String> subscribedServices;
+ private final Set<String> subscribedServices;
private final ServiceNameMapping serviceNameMapping;
@@ -134,9 +117,8 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
private final Set<String> registeredListeners = new LinkedHashSet<>();
- private final List<SubscribedURLsSynthesizer> subscribedURLsSynthesizers;
-
- private final ReadWriteLock lock = new ReentrantReadWriteLock();
+ /* app - listener */
+ private final Map<String, ServiceInstancesChangedListener> serviceListeners = new HashMap<>();
/**
* A cache for all URLs of services that the subscribed services exported
@@ -148,10 +130,10 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
public ServiceDiscoveryRegistry(URL registryURL) {
super(registryURL);
this.serviceDiscovery = createServiceDiscovery(registryURL);
+ this.subscribedServices = parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY));
this.serviceNameMapping = ServiceNameMapping.getDefaultExtension();
String metadataStorageType = getMetadataStorageType(registryURL);
this.writableMetadataService = WritableMetadataService.getExtension(metadataStorageType);
- this.subscribedURLsSynthesizers = initSubscribedURLsSynthesizers();
}
public ServiceDiscovery getServiceDiscovery() {
@@ -159,16 +141,6 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
}
/**
- * Get the subscribed services from the specified registry {@link URL url}
- *
- * @param registryURL the specified registry {@link URL url}
- * @return non-null
- */
- public static Set<String> getSubscribedServices(URL registryURL) {
- return parseServices(registryURL.getParameter(SUBSCRIBED_SERVICE_NAMES_KEY));
- }
-
- /**
* Create the {@link ServiceDiscovery} from the registry {@link URL}
*
* @param registryURL the {@link URL} to connect the registry
@@ -317,73 +289,24 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
writableMetadataService.subscribeURL(url);
Set<String> serviceNames = getServices(url);
-
- List<URL> subscribedURLs = new LinkedList<>();
-
- serviceNames.forEach(serviceName -> {
-
- subscribeURLs(url, subscribedURLs, serviceName);
-
- // register ServiceInstancesChangedListener
- registerServiceInstancesChangedListener(url, new ServiceInstancesChangedListener(serviceName) {
-
- @Override
- public void onEvent(ServiceInstancesChangedEvent event) {
- List<URL> subscribedURLs = new LinkedList<>();
- Set<String> others = new HashSet<>(serviceNames);
- others.remove(serviceName);
-
- // Collect the subscribedURLs
- subscribeURLs(url, subscribedURLs, serviceName, () -> event.getServiceInstances());
- subscribeURLs(url, subscribedURLs, others.toString(), () -> getServiceInstances(others));
-
- // Notify all
- notifyAllSubscribedURLs(url, subscribedURLs, listener);
-
- }
- });
- });
-
- // Notify all
- notifyAllSubscribedURLs(url, subscribedURLs, listener);
-
- }
-
- private void notifyAllSubscribedURLs(URL url, List<URL> subscribedURLs, NotifyListener listener) {
-
- if (subscribedURLs.isEmpty()) {
- // Add the EMPTY_PROTOCOL URL
- subscribedURLs.add(from(url).setProtocol(EMPTY_PROTOCOL).removeParameter(CATEGORY_KEY).build());
+ if (CollectionUtils.isEmpty(serviceNames)) {
+ throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + url);
}
- // Notify all
- listener.notify(subscribedURLs);
+ serviceNames.forEach(serviceName -> subscribeURLs(url, listener, serviceName));
}
- private List<ServiceInstance> getServiceInstances(Set<String> serviceNames) {
- if (isEmpty(serviceNames)) {
- return emptyList();
- }
- List<ServiceInstance> allServiceInstances = new LinkedList<>();
- for (String serviceName : serviceNames) {
- List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
- if (!isEmpty(serviceInstances)) {
- allServiceInstances.addAll(serviceInstances);
- }
- }
- return allServiceInstances;
- }
-
- protected void subscribeURLs(URL subscribedURL, List<URL> subscribedURLs,
- String serviceName, Supplier<Collection<ServiceInstance>> serviceInstancesSupplier) {
- Collection<ServiceInstance> serviceInstances = serviceInstancesSupplier.get();
- subscribeURLs(subscribedURL, subscribedURLs, serviceName, serviceInstances);
- }
-
-
- protected void subscribeURLs(URL url, List<URL> subscribedURLs, String serviceName) {
- List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
- subscribeURLs(url, subscribedURLs, serviceName, serviceInstances);
+ protected void subscribeURLs(URL url, NotifyListener listener, String serviceName) {
+ // register ServiceInstancesChangedListener
+ ServiceInstancesChangedListener serviceListener = serviceListeners.computeIfAbsent(serviceName,
+ k -> new ServiceInstancesChangedListener(serviceName) {
+ @Override
+ protected void notifyAddresses() {
+ listener.notifyServiceInstances();
+ }
+ });
+ listener.addServiceListener(serviceListener);
+ registerServiceInstancesChangedListener(url, serviceListener);
}
/**
@@ -403,432 +326,24 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
return listener.getServiceName() + ":" + url.toString(VERSION_KEY, GROUP_KEY, PROTOCOL_KEY);
}
- /**
- * Subscribe the {@link URL URLs} that the specified service exported are
- * {@link #getExportedURLs(ServiceInstance) get} from {@link MetadataService} if present, or try to
- * be {@link #synthesizeSubscribedURLs(URL, Collection) synthesized} by
- * the instances of {@link SubscribedURLsSynthesizer}
- *
- * @param subscribedURL the subscribed {@link URL url}
- * @param subscribedURLs {@link NotifyListener}
- * @param serviceName
- * @param serviceInstances
- * @see #getExportedURLs(URL, Collection)
- * @see #synthesizeSubscribedURLs(URL, Collection)
- */
- protected void subscribeURLs(URL subscribedURL, List<URL> subscribedURLs, String serviceName,
- Collection<ServiceInstance> serviceInstances) {
-
- if (isEmpty(serviceInstances)) {
- logger.warn(format("There is no instance in service[name : %s]", serviceName));
- return;
- }
-
- /**
- * Add the exported URLs from {@link MetadataService}
- */
- subscribedURLs.addAll(getExportedURLs(subscribedURL, serviceInstances));
-
- if (subscribedURLs.isEmpty()) { // If empty, try to synthesize
- /**
- * Add the subscribed URLs that were synthesized
- */
- subscribedURLs.addAll(synthesizeSubscribedURLs(subscribedURL, serviceInstances));
- }
- }
-
- /**
- * Get the exported {@link URL URLs} from the {@link MetadataService} in the specified
- * {@link ServiceInstance service instances}
- *
- * @param subscribedURL the subscribed {@link URL url}
- * @param instances {@link ServiceInstance service instances}
- * @return the exported {@link URL URLs} if present, or <code>{@link Collections#emptyList() empty list}</code>
- */
- private List<URL> getExportedURLs(URL subscribedURL, Collection<ServiceInstance> instances) {
-
- // local service instances could be mutable
- List<ServiceInstance> serviceInstances = instances.stream()
- .filter(ServiceInstance::isEnabled)
- .filter(ServiceInstance::isHealthy)
- .filter(ServiceInstanceMetadataUtils::isDubboServiceInstance)
- .collect(Collectors.toList());
-
- int size = serviceInstances.size();
-
- if (size == 0) {
- return emptyList();
- }
-
- // Prepare revision exported URLs
- prepareServiceRevisionExportedURLs(serviceInstances);
-
- // Clone the subscribed URLs from the template URLs
- List<URL> subscribedURLs = cloneExportedURLs(subscribedURL, serviceInstances);
-
- // clear local service instances
- serviceInstances.clear();
-
- return subscribedURLs;
- }
-
- /**
- * Prepare the {@link #serviceRevisionExportedURLsCache} exclusively
- *
- * @param serviceInstances {@link ServiceInstance service instances}
- * @see #expungeStaleRevisionExportedURLs(List)
- * @see #initializeRevisionExportedURLs(List)
- */
- private void prepareServiceRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
- executeExclusively(() -> {
- // 1. expunge stale
- expungeStaleRevisionExportedURLs(serviceInstances);
- // 2. Initialize
- initializeRevisionExportedURLs(serviceInstances);
- });
- }
-
- /**
- * Initialize the {@link URL URLs} that {@link ServiceInstance service instances} exported into
- * {@link #serviceRevisionExportedURLsCache the cache}.
- * <p>
- * Typically, the {@link URL URLs} that one {@link ServiceInstance service instance} exported can be get from
- * the same instances' {@link MetadataService}, but the cost is very expensive if there are a lot of instances
- * in this service. Thus, the exported {@link URL URls} should be cached and stored into
- * {@link #serviceRevisionExportedURLsCache the cache}.
- * <p>
- * In most cases, {@link #serviceRevisionExportedURLsCache the cache} only holds a single list of exported URLs for
- * each service because there is no difference on the Dubbo services(interfaces) between the service instances.
- * However, if there are one or more upgrading or increasing Dubbo services that are deploying on the some of
- * instances, other instances still maintain the previous ones, in this way, there are two versions of the services,
- * they are called "revisions", in other words, one revision associates a list of exported URLs that can be reused
- * for other instances with same revision, and one service allows one or more revisions.
- *
- * @param serviceInstances {@link ServiceInstance service instances}
- */
- private void initializeRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
- // initialize the revision exported URLs that the selected service instance exported
- initializeSelectedRevisionExportedURLs(serviceInstances);
- // initialize the revision exported URLs that other service instances exported
- serviceInstances.forEach(this::initializeRevisionExportedURLs);
- }
-
- /**
- * Initialize the {@link URL URLs} that the {@link #selectServiceInstance(List) selected service instance} exported
- * into {@link #serviceRevisionExportedURLsCache the cache}.
- *
- * @param serviceInstances {@link ServiceInstance service instances}
- */
- private void initializeSelectedRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
- // Try to initialize revision exported URLs until success
- for (int i = 0; i < serviceInstances.size(); i++) {
- // select a instance of {@link ServiceInstance}
- ServiceInstance selectedInstance = selectServiceInstance(serviceInstances);
- List<URL> revisionExportedURLs = initializeRevisionExportedURLs(selectedInstance);
- if (isNotEmpty(revisionExportedURLs)) { // If the result is valid
- break;
- }
- }
- }
-
- /**
- * Expunge the revision exported {@link URL URLs} in {@link #serviceRevisionExportedURLsCache the cache} if
- * some revisions of {@link ServiceInstance service instance} had been out of date possibly
- *
- * @param serviceInstances {@link ServiceInstance service instances}
- */
- private void expungeStaleRevisionExportedURLs(List<ServiceInstance> serviceInstances) {
-
- String serviceName = serviceInstances.get(0).getServiceName();
- // revisionExportedURLsMap is mutable
- Map<String, List<URL>> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
-
- if (revisionExportedURLsMap.isEmpty()) { // if empty, return immediately
- return;
- }
-
- Set<String> existedRevisions = revisionExportedURLsMap.keySet(); // read-only
- Set<String> currentRevisions = serviceInstances.stream()
- .map(ServiceInstanceMetadataUtils::getExportedServicesRevision)
- .collect(Collectors.toSet());
- // staleRevisions = existedRevisions(copy) - currentRevisions
- Set<String> staleRevisions = new HashSet<>(existedRevisions);
- staleRevisions.removeAll(currentRevisions);
- // remove exported URLs if staled
- staleRevisions.forEach(revisionExportedURLsMap::remove);
- }
-
- /**
- * Clone the exported URLs that are based on {@link #getTemplateExportedURLs(URL, ServiceInstance) the template URLs}
- * from the some of {@link ServiceInstance service instances} with different revisions
- *
- * @param subscribedURL the subscribed {@link URL url}
- * @param serviceInstances {@link ServiceInstance service instances}
- * @return non-null
- */
- private List<URL> cloneExportedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
-
- if (isEmpty(serviceInstances)) {
- return emptyList();
- }
-
- List<URL> clonedExportedURLs = new LinkedList<>();
-
- serviceInstances.forEach(serviceInstance -> {
-
- String host = serviceInstance.getHost();
-
- getTemplateExportedURLs(subscribedURL, serviceInstance)
- .stream()
- .map(templateURL -> templateURL.removeParameter(TIMESTAMP_KEY))
- .map(templateURL -> templateURL.removeParameter(PID_KEY))
- .map(templateURL -> {
- String protocol = templateURL.getProtocol();
- int port = getProtocolPort(serviceInstance, protocol);
- if (Objects.equals(templateURL.getHost(), host)
- && Objects.equals(templateURL.getPort(), port)) { // use templateURL if equals
- return templateURL;
- }
-
- URLBuilder clonedURLBuilder = from(templateURL) // remove the parameters from the template URL
- .setHost(host) // reset the host
- .setPort(port); // reset the port
-
- return clonedURLBuilder.build();
- })
- .forEach(clonedExportedURLs::add);
- });
- return clonedExportedURLs;
- }
-
-
- /**
- * Select one {@link ServiceInstance} by {@link ServiceInstanceSelector the strategy} if there are more that one
- * instances in order to avoid the hot spot appearing the some instance
- *
- * @param serviceInstances the {@link List list} of {@link ServiceInstance}
- * @return <code>null</code> if <code>serviceInstances</code> is empty.
- * @see ServiceInstanceSelector
- */
- private ServiceInstance selectServiceInstance(List<ServiceInstance> serviceInstances) {
- int size = serviceInstances.size();
- if (size == 0) {
- return null;
- } else if (size == 1) {
- return serviceInstances.get(0);
- }
- ServiceInstanceSelector selector = getExtensionLoader(ServiceInstanceSelector.class).getAdaptiveExtension();
- return selector.select(getUrl(), serviceInstances);
- }
-
- /**
- * Get the template exported {@link URL urls} from the specified {@link ServiceInstance}.
- * <p>
- * First, put the revision {@link ServiceInstance service instance}
- * associating {@link #getExportedURLs(ServiceInstance) exported URLs} into cache.
- * <p>
- * And then compare a new {@link ServiceInstance service instances'} revision with cached one,If they are equal,
- * return the cached template {@link URL urls} immediately, or to get template {@link URL urls} that the provider
- * {@link ServiceInstance instance} exported via executing {@link ##getExportedURLs(ServiceInstance) (ServiceInstance)}
- * method.
- * <p>
- * Eventually, the retrieving result will be cached and returned.
- *
- * @param subscribedURL the subscribed {@link URL url}
- * @param selectedInstance the {@link ServiceInstance}
- * associating with the {@link URL urls}
- * @return non-null {@link List} of {@link URL urls}
- */
- private List<URL> getTemplateExportedURLs(URL subscribedURL, ServiceInstance selectedInstance) {
-
- List<URL> exportedURLs = getRevisionExportedURLs(selectedInstance);
-
- if (isEmpty(exportedURLs)) {
- return emptyList();
- }
-
- return filterSubscribedURLs(subscribedURL, exportedURLs);
- }
-
- /**
- * Initialize the URLs that the specified {@link ServiceInstance service instance} exported
- *
- * @param serviceInstance the {@link ServiceInstance} exports the Dubbo Services
- * @return the {@link URL URLs} that the {@link ServiceInstance} exported, it's calculated from
- * The invocation to remote {@link MetadataService}, or get from {@link #serviceRevisionExportedURLsCache cache} if
- * {@link ServiceInstanceMetadataUtils#getExportedServicesRevision(ServiceInstance) revision} is hit
- */
- private List<URL> initializeRevisionExportedURLs(ServiceInstance serviceInstance) {
-
- if (serviceInstance == null) {
- return emptyList();
- }
-
- String serviceName = serviceInstance.getServiceName();
- // get the revision from the specified {@link ServiceInstance}
- String revision = getExportedServicesRevision(serviceInstance);
-
- Map<String, List<URL>> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
-
- List<URL> revisionExportedURLs = revisionExportedURLsMap.get(revision);
-
- boolean firstGet = false;
-
- if (revisionExportedURLs == null) { // The hit is missing in cache
-
- if (!revisionExportedURLsMap.isEmpty()) { // The case is that current ServiceInstance with the different revision
- if (logger.isWarnEnabled()) {
- logger.warn(format("The ServiceInstance[id: %s, host : %s , port : %s] has different revision : %s" +
- ", please make sure the service [name : %s] is changing or not.",
- serviceInstance.getId(),
- serviceInstance.getHost(),
- serviceInstance.getPort(),
- revision,
- serviceInstance.getServiceName()
- ));
- }
- } else { // Else, it's the first time to get the exported URLs
- firstGet = true;
- }
-
- revisionExportedURLs = getExportedURLs(serviceInstance);
-
- if (revisionExportedURLs != null) { // just allow the valid result into exportedURLsMap
-
- revisionExportedURLsMap.put(revision, revisionExportedURLs);
-
- if (logger.isDebugEnabled()) {
- logger.debug(format("Get the exported URLs[size : %s, first : %s] from the target service " +
- "instance [id: %s , service : %s , host : %s , port : %s , revision : %s]",
- revisionExportedURLs.size(), firstGet,
- serviceInstance.getId(),
- serviceInstance.getServiceName(),
- serviceInstance.getHost(),
- serviceInstance.getPort(),
- revision
- ));
- }
- }
- } else { // Else, The cache is hit
- if (logger.isDebugEnabled()) {
- logger.debug(format("Get the exported URLs[size : %s] from cache, the instance" +
- "[id: %s , service : %s , host : %s , port : %s , revision : %s]",
- revisionExportedURLs.size(),
- serviceInstance.getId(),
- serviceInstance.getServiceName(),
- serviceInstance.getHost(),
- serviceInstance.getPort(),
- revision
- ));
- }
- }
-
- return revisionExportedURLs;
- }
-
private Map<String, List<URL>> getRevisionExportedURLsMap(String serviceName) {
return serviceRevisionExportedURLsCache.computeIfAbsent(serviceName, s -> new LinkedHashMap());
}
/**
- * Get all services {@link URL URLs} that the specified {@link ServiceInstance service instance} exported from cache
- *
- * @param serviceInstance the {@link ServiceInstance} exports the Dubbo Services
- * @return the same as {@link #getExportedURLs(ServiceInstance)}
- */
- private List<URL> getRevisionExportedURLs(ServiceInstance serviceInstance) {
-
- if (serviceInstance == null) {
- return emptyList();
- }
-
- String serviceName = serviceInstance.getServiceName();
- // get the revision from the specified {@link ServiceInstance}
- String revision = getExportedServicesRevision(serviceInstance);
-
- return getRevisionExportedURLs(serviceName, revision);
- }
-
- private List<URL> getRevisionExportedURLs(String serviceName, String revision) {
- return executeShared(() -> {
- Map<String, List<URL>> revisionExportedURLsMap = getRevisionExportedURLsMap(serviceName);
- List<URL> exportedURLs = revisionExportedURLsMap.get(revision);
- // Get a copy from source in order to prevent the caller trying to change the cached data
- return exportedURLs != null ? new ArrayList<>(exportedURLs) : emptyList();
- });
- }
-
- /**
- * Get all services {@link URL URLs} that the specified {@link ServiceInstance service instance} exported
- * via the proxy to invoke the {@link MetadataService}
- *
- * @param providerServiceInstance the {@link ServiceInstance} exported the Dubbo services
- * @return The possible result :
- * <ol>
- * <li>The normal result</li>
- * <li>The empty result if the {@link ServiceInstance service instance} did not export yet</li>
- * <li><code>null</code> if there is an invocation error on {@link MetadataService} proxy</li>
- * </ol>
- * @see MetadataServiceProxyFactory
- * @see MetadataService
- */
- private List<URL> getExportedURLs(ServiceInstance providerServiceInstance) {
-
- List<URL> exportedURLs = null;
-
- String metadataStorageType = getMetadataStorageType(providerServiceInstance);
-
- try {
- MetadataService metadataService = MetadataServiceProxyFactory.getExtension(metadataStorageType)
- .getProxy(providerServiceInstance);
- if (metadataService != null) {
- SortedSet<String> urls = metadataService.getExportedURLs();
- exportedURLs = toURLs(urls);
- }
- } catch (Throwable e) {
- if (logger.isErrorEnabled()) {
- logger.error(format("Failed to get the exported URLs from the target service instance[%s]",
- providerServiceInstance), e);
- }
- exportedURLs = null; // set the result to be null if failed to get
- }
- return exportedURLs;
- }
-
- private void executeExclusively(Runnable runnable) {
- Lock writeLock = lock.writeLock();
- writeLock.lock();
- try {
- runnable.run();
- } finally {
- writeLock.unlock();
- }
- }
-
- private <T> T executeShared(Supplier<T> supplier) {
- Lock readLock = lock.readLock();
- readLock.lock();
- try {
- return supplier.get();
- } finally {
- readLock.unlock();
- }
- }
-
- /**
* Synthesize new subscribed {@link URL URLs} from old one
*
* @param subscribedURL
* @param serviceInstances
* @return non-null
*/
- private Collection<? extends URL> synthesizeSubscribedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
- return subscribedURLsSynthesizers.stream()
- .filter(synthesizer -> synthesizer.supports(subscribedURL))
- .map(synthesizer -> synthesizer.synthesize(subscribedURL, serviceInstances))
- .flatMap(Collection::stream)
- .collect(Collectors.toList());
- }
+// private Collection<? extends URL> synthesizeSubscribedURLs(URL subscribedURL, Collection<ServiceInstance> serviceInstances) {
+// return subscribedURLsSynthesizers.stream()
+// .filter(synthesizer -> synthesizer.supports(subscribedURL))
+// .map(synthesizer -> synthesizer.synthesize(subscribedURL, serviceInstances))
+// .flatMap(Collection::stream)
+// .collect(Collectors.toList());
+// }
/**
* 1.developer explicitly specifies the application name this interface belongs to
@@ -837,11 +352,9 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
*
* @param subscribedURL
* @return
- * @throws IllegalStateException If no service name is not found
*/
- protected Set<String> getServices(URL subscribedURL) throws IllegalStateException {
-
- Set<String> subscribedServices = null;
+ protected Set<String> getServices(URL subscribedURL) {
+ Set<String> subscribedServices = new LinkedHashSet<>();
String serviceNames = subscribedURL.getParameter(PROVIDED_BY);
if (StringUtils.isNotEmpty(serviceNames)) {
@@ -850,21 +363,19 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
if (isEmpty(subscribedServices)) {
subscribedServices = findMappedServices(subscribedURL);
+ if (isEmpty(subscribedServices)) {
+ subscribedServices = getSubscribedServices();
+ }
}
-
- if (isEmpty(subscribedServices)) {
- subscribedServices = getSubscribedServices();
- }
-
- if (isEmpty(subscribedServices)) {
- throw new IllegalStateException("Should has at least one way to know which services this interface belongs to, subscription url: " + subscribedURL);
- }
-
return subscribedServices;
}
public static Set<String> parseServices(String literalServices) {
- return splitToSet(literalServices, COMMA_SEPARATOR_CHAR, true);
+ return isBlank(literalServices) ? emptySet() :
+ unmodifiableSet(of(literalServices.split(","))
+ .map(String::trim)
+ .filter(StringUtils::isNotEmpty)
+ .collect(toSet()));
}
/**
@@ -873,20 +384,21 @@ public class ServiceDiscoveryRegistry extends FailbackRegistry {
* @return non-null
*/
public Set<String> getSubscribedServices() {
- if (subscribedServices == null) {
- subscribedServices = findMappedServices(getUrl());
- }
return subscribedServices;
}
/**
* Get the mapped services name by the specified {@link URL}
*
- * @param url the specified {@link URL}
- * @return empty {@link Set} if not found
+ * @param subscribedURL
+ * @return
*/
- protected Set<String> findMappedServices(URL url) {
- return serviceNameMapping.get(url);
+ protected Set<String> findMappedServices(URL subscribedURL) {
+ String serviceInterface = subscribedURL.getServiceInterface();
+ String group = subscribedURL.getParameter(GROUP_KEY);
+ String version = subscribedURL.getParameter(VERSION_KEY);
+ String protocol = subscribedURL.getParameter(PROTOCOL_KEY, DUBBO_PROTOCOL);
+ return serviceNameMapping.get(serviceInterface, group, version, protocol);
}
/**
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
index 896af9c..164d146 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceInstance.java
@@ -16,6 +16,8 @@
*/
package org.apache.dubbo.registry.client;
+import org.apache.dubbo.common.URL;
+
import java.io.Serializable;
import java.util.Map;
@@ -115,4 +117,6 @@ public interface ServiceInstance extends Serializable {
*/
boolean equals(Object another);
+ URL toURL(String protocol, String path, String interfaceName, String group, String version, String serviceKey);
+
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/ServiceInstancesChangedEvent.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/ServiceInstancesChangedEvent.java
index 4b91265..b206eb7 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/ServiceInstancesChangedEvent.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/ServiceInstancesChangedEvent.java
@@ -20,9 +20,9 @@ import org.apache.dubbo.event.Event;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
-import java.util.Collection;
+import java.util.List;
-import static java.util.Collections.unmodifiableCollection;
+import static java.util.Collections.unmodifiableList;
/**
* An event raised after the {@link ServiceInstance instances} of one service has been changed.
@@ -34,17 +34,17 @@ public class ServiceInstancesChangedEvent extends Event {
private final String serviceName;
- private final Collection<ServiceInstance> serviceInstances;
+ private final List<ServiceInstance> serviceInstances;
/**
* @param serviceName The name of service that was changed
* @param serviceInstances all {@link ServiceInstance service instances}
* @throws IllegalArgumentException if source is null.
*/
- public ServiceInstancesChangedEvent(String serviceName, Collection<ServiceInstance> serviceInstances) {
+ public ServiceInstancesChangedEvent(String serviceName, List<ServiceInstance> serviceInstances) {
super(serviceName);
this.serviceName = serviceName;
- this.serviceInstances = unmodifiableCollection(serviceInstances);
+ this.serviceInstances = unmodifiableList(serviceInstances);
}
/**
@@ -58,7 +58,7 @@ public class ServiceInstancesChangedEvent extends Event {
/**
* @return all {@link ServiceInstance service instances}
*/
- public Collection<ServiceInstance> getServiceInstances() {
+ public List<ServiceInstance> getServiceInstances() {
return serviceInstances;
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 1028301..33f0bcc 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -16,11 +16,30 @@
*/
package org.apache.dubbo.registry.client.event.listener;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.event.ConditionalEventListener;
import org.apache.dubbo.event.EventListener;
+import org.apache.dubbo.metadata.MetadataInfo;
+import org.apache.dubbo.metadata.MetadataInfo.ServiceInfo;
+import org.apache.dubbo.metadata.MetadataService;
+import org.apache.dubbo.metadata.MetadataUtils;
+import org.apache.dubbo.metadata.store.RemoteMetadataServiceImpl;
+import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.event.ServiceInstancesChangedEvent;
+import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import java.util.Set;
+
+import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
+import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
/**
* The Service Discovery Changed {@link EventListener Event Listener}
@@ -32,6 +51,16 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
private final String serviceName;
+ private List<ServiceInstance> instances;
+
+ private Map<String, List<ServiceInstance>> revisionToInstances;
+
+ private Map<String, MetadataInfo> revisionToMetadata;
+
+ private Map<String, Set<String>> serviceToRevisions;
+
+ private Map<String, List<ServiceInstance>> serviceToInstances;
+
protected ServiceInstancesChangedListener(String serviceName) {
this.serviceName = serviceName;
}
@@ -41,7 +70,87 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
*
* @param event {@link ServiceInstancesChangedEvent}
*/
- public abstract void onEvent(ServiceInstancesChangedEvent event);
+ public void onEvent(ServiceInstancesChangedEvent event) {
+ instances = event.getServiceInstances();
+
+ Map<String, List<ServiceInstance>> localRevisionToInstances = new HashMap<>();
+ Map<String, MetadataInfo> localRevisionToMetadata = new HashMap<>();
+ Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
+ for (ServiceInstance instance : instances) {
+ String revision = getExportedServicesRevision(instance);
+ Collection<ServiceInstance> rInstances = localRevisionToInstances.computeIfAbsent(revision, r -> new ArrayList<>());
+ rInstances.add(instance);
+
+ MetadataInfo metadata = revisionToMetadata.get(revision);
+ if (metadata != null) {
+ localRevisionToMetadata.put(revision, metadata);
+ } else {
+ metadata = getMetadataInfo(instance);
+ localRevisionToMetadata.put(revision, getMetadataInfo(instance));
+ }
+ parse(revision, metadata, localServiceToRevisions);
+ }
+
+ this.revisionToInstances = localRevisionToInstances;
+ this.revisionToMetadata = localRevisionToMetadata;
+ this.serviceToRevisions = localServiceToRevisions;
+
+ Map<String, List<ServiceInstance>> localServiceToInstances = new HashMap<>();
+ for (String serviceKey : localServiceToRevisions.keySet()) {
+ if (CollectionUtils.equals(localRevisionToInstances.keySet(), localServiceToRevisions.get(serviceKey))) {
+ localServiceToInstances.put(serviceKey, instances);
+ }
+ }
+
+ this.serviceToInstances = localServiceToInstances;
+ }
+
+ public List<ServiceInstance> getInstances(String serviceKey) {
+ if (serviceToInstances.containsKey(serviceKey)) {
+ return serviceToInstances.get(serviceKey);
+ }
+
+ Set<String> revisions = serviceToRevisions.get(serviceKey);
+ List<ServiceInstance> allInstances = new LinkedList<>();
+ for (String r : revisions) {
+ allInstances.addAll(revisionToInstances.get(r));
+ }
+ return allInstances;
+ }
+
+ private Map<String, Set<String>> parse(String revision, MetadataInfo metadata, Map<String, Set<String>> localServiceToRevisions) {
+ Map<String, ServiceInfo> serviceInfos = metadata.getServices();
+ for (Map.Entry<String, ServiceInfo> serviceInfo : serviceInfos.entrySet()) {
+ String serviceKey = serviceInfo.getValue().getServiceKey();
+ Set<String> set = localServiceToRevisions.computeIfAbsent(serviceKey, k -> new HashSet<>());
+ set.add(revision);
+ }
+
+ return localServiceToRevisions;
+ }
+
+ private MetadataInfo getMetadataInfo(ServiceInstance instance) {
+ String metadataType = ServiceInstanceMetadataUtils.getMetadataStorageType(instance);
+ String cluster = ServiceInstanceMetadataUtils.getRemoteCluster(instance);
+
+ MetadataInfo metadataInfo;
+ try {
+ if (REMOTE_METADATA_STORAGE_TYPE.equals(metadataType)) {
+ RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getConsumerRemoteMetadataService(consumerUrl.getServiceKey());
+ metadataInfo = remoteMetadataService.getMetadata(serviceName, revision);
+ } else {
+ MetadataService localMetadataService = MetadataUtils.getLocalMetadataService();
+ metadataInfo = localMetadataService.getMetadataInfo();
+ }
+ } catch (Exception e) {
+ // TODO, load metadata backup
+ metadataInfo = null;
+ }
+ return metadataInfo;
+ }
+
+
+ protected abstract void notifyAddresses();
/**
* Get the correlative service name
@@ -57,7 +166,7 @@ public abstract class ServiceInstancesChangedListener implements ConditionalEven
* @return If service name matches, return <code>true</code>, or <code>false</code>
*/
public final boolean accept(ServiceInstancesChangedEvent event) {
- return Objects.equals(getServiceName(), event.getServiceName());
+ return Objects.equals(serviceName, event.getServiceName());
}
@Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 01bde1d..16f5446 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -31,6 +31,8 @@ import org.apache.dubbo.common.utils.UrlUtils;
import org.apache.dubbo.registry.AddressListener;
import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.event.listener.ServiceInstancesChangedListener;
import org.apache.dubbo.remoting.Constants;
import org.apache.dubbo.rpc.Invocation;
import org.apache.dubbo.rpc.Invoker;
@@ -53,6 +55,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -68,6 +71,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.APP_DYNAMIC_CONFIGURATORS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.COMPATIBLE_CONFIG_KEY;
@@ -133,6 +137,7 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
private static final ConsumerConfigurationListener CONSUMER_CONFIGURATION_LISTENER = new ConsumerConfigurationListener();
private ReferenceConfigurationListener serviceConfigurationListener;
+ private Set<ServiceInstancesChangedListener> serviceListeners;
public RegistryDirectory(Class<T> serviceType, URL url) {
super(url);
@@ -151,6 +156,34 @@ public class RegistryDirectory<T> extends AbstractDirectory<T> implements Notify
this.overrideDirectoryUrl = this.directoryUrl = turnRegistryUrlToConsumerUrl(url);
String group = directoryUrl.getParameter(GROUP_KEY, "");
this.multiGroup = group != null && (ANY_VALUE.equals(group) || group.contains(","));
+
+ this.serviceListeners = new HashSet<>();
+ }
+
+ @Override
+ public void addServiceListener(ServiceInstancesChangedListener instanceListener) {
+ this.serviceListeners.add(instanceListener);
+ }
+
+ @Override
+ public void notifyServiceInstances() {
+ List<URL> urls = new LinkedList<>();
+ for (ServiceInstancesChangedListener listener : serviceListeners) {
+ List<ServiceInstance> instances = listener.getInstances(serviceKey);
+ for (ServiceInstance instance : instances) {
+ // FIXME, the right protocol? the right path?
+ urls.add(
+ instance.toURL(
+ "dubbo",
+ serviceType.getName(),
+ serviceType.getName(),
+ queryMap.get(GROUP_KEY),
+ queryMap.get(VERSION_KEY),
+ serviceKey)
+ );
+ }
+ }
+ notify(urls);
}
private URL turnRegistryUrlToConsumerUrl(URL url) {