You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/04 07:09:47 UTC
[dubbo] 10/27: service discovery
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit ca62d8352d7398a73f1a72097c665a9c3fceb1af
Author: ken.lj <ke...@gmail.com>
AuthorDate: Thu Jul 9 18:01:19 2020 +0800
service discovery
---
.../org.apache.dubbo.rpc.cluster.RouterFactory | 1 -
.../dubbo/common/config/ConfigurationUtils.java | 4 ++
.../dubbo/config/bootstrap/DubboBootstrap.java | 58 ++++++++------------
.../apache/dubbo/metadata/MetadataConstants.java | 27 ++++------
.../org/apache/dubbo/metadata/MetadataInfo.java | 7 +++
.../dubbo/qos/command/impl/PublishMetadata.java | 63 ++++++++++++++++++++++
.../org.apache.dubbo.qos.command.BaseCommand | 1 +
.../client/EventPublishingServiceDiscovery.java | 5 ++
.../client/FileSystemServiceDiscovery.java | 8 +++
.../dubbo/registry/client/ServiceDiscovery.java | 2 +
.../client/ServiceDiscoveryRegistryDirectory.java | 2 +-
.../listener/ServiceInstancesChangedListener.java | 14 +++--
.../metadata/ServiceInstanceMetadataUtils.java | 37 +++++++++++++
.../store/InMemoryWritableMetadataService.java | 12 +++++
.../metadata/store/RemoteMetadataServiceImpl.java | 6 +--
.../registry/support/AbstractRegistryFactory.java | 12 +++++
.../registry/client/InMemoryServiceDiscovery.java | 8 +++
.../zookeeper/ZookeeperServiceDiscovery.java | 18 +++++--
18 files changed, 218 insertions(+), 67 deletions(-)
diff --git a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
index 13e307b..2a807f0 100644
--- a/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
+++ b/dubbo-cluster/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.cluster.RouterFactory
@@ -5,4 +5,3 @@ service=org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouterFactor
app=org.apache.dubbo.rpc.cluster.router.condition.config.AppRouterFactory
tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory
mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory
-instance=org.apache.dubbo.rpc.cluster.router.service.InstanceRouterFactory
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
index 70fd9f1..a7c0693 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/config/ConfigurationUtils.java
@@ -100,6 +100,10 @@ public class ConfigurationUtils {
return StringUtils.trim(getGlobalConfiguration().getString(property, defaultValue));
}
+ public static int get(String property, int defaultValue) {
+ return getGlobalConfiguration().getInt(property, defaultValue);
+ }
+
public static Map<String, String> parseProperties(String content) throws IOException {
Map<String, String> map = new HashMap<>();
if (StringUtils.isEmpty(content)) {
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
index 455535c6..a9f2bcf 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/bootstrap/DubboBootstrap.java
@@ -17,6 +17,7 @@
package org.apache.dubbo.config.bootstrap;
import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
import org.apache.dubbo.common.config.Environment;
import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
import org.apache.dubbo.common.config.configcenter.wrapper.CompositeDynamicConfiguration;
@@ -58,17 +59,16 @@ import org.apache.dubbo.config.utils.ReferenceConfigCache;
import org.apache.dubbo.event.EventDispatcher;
import org.apache.dubbo.event.EventListener;
import org.apache.dubbo.event.GenericEventListener;
-import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.MetadataServiceExporter;
import org.apache.dubbo.metadata.WritableMetadataService;
import org.apache.dubbo.metadata.report.MetadataReportInstance;
import org.apache.dubbo.registry.client.DefaultServiceInstance;
-import org.apache.dubbo.registry.client.ServiceDiscovery;
-import org.apache.dubbo.registry.client.ServiceDiscoveryRegistry;
import org.apache.dubbo.registry.client.ServiceInstance;
import org.apache.dubbo.registry.client.ServiceInstanceCustomizer;
import org.apache.dubbo.registry.client.metadata.MetadataUtils;
+import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
+import org.apache.dubbo.registry.client.metadata.store.InMemoryWritableMetadataService;
import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl;
import org.apache.dubbo.registry.support.AbstractRegistryFactory;
import org.apache.dubbo.rpc.model.ApplicationModel;
@@ -89,7 +89,6 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
-import java.util.stream.Collectors;
import static java.util.Arrays.asList;
import static java.util.concurrent.Executors.newSingleThreadExecutor;
@@ -99,11 +98,13 @@ import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_METADATA
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
import static org.apache.dubbo.common.function.ThrowableAction.execute;
import static org.apache.dubbo.common.utils.StringUtils.isNotEmpty;
+import static org.apache.dubbo.metadata.MetadataConstants.DEFAULT_METADATA_PUBLISH_DELAY;
+import static org.apache.dubbo.metadata.MetadataConstants.METADATA_PUBLISH_DELAY_KEY;
import static org.apache.dubbo.metadata.WritableMetadataService.getDefaultExtension;
-import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.EXPORTED_SERVICES_REVISION_PROPERTY_NAME;
+import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.calInstanceRevision;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.setMetadataStorageType;
+import static org.apache.dubbo.registry.support.AbstractRegistryFactory.getServiceDiscoveries;
import static org.apache.dubbo.remoting.Constants.CLIENT_KEY;
-import static org.apache.dubbo.rpc.Constants.ID_KEY;
/**
* See {@link ApplicationModel} and {@link ExtensionLoader} for why this class is designed to be singleton.
@@ -734,15 +735,6 @@ public class DubboBootstrap extends GenericEventListener {
addEventListener(this);
}
- private List<ServiceDiscovery> getServiceDiscoveries() {
- return AbstractRegistryFactory.getRegistries()
- .stream()
- .filter(registry -> registry instanceof ServiceDiscoveryRegistry)
- .map(registry -> (ServiceDiscoveryRegistry) registry)
- .map(ServiceDiscoveryRegistry::getServiceDiscovery)
- .collect(Collectors.toList());
- }
-
/**
* Start the bootstrap
*/
@@ -1024,6 +1016,18 @@ public class DubboBootstrap extends GenericEventListener {
ServiceInstance serviceInstance = createServiceInstance(serviceName, host, port);
+ doRegisterServiceInstance(serviceInstance);
+
+ // scheduled task for updating Metadata and ServiceInstance
+ executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
+ InMemoryWritableMetadataService localMetadataService = (InMemoryWritableMetadataService) WritableMetadataService.getDefaultExtension();
+ localMetadataService.blockUntilUpdated();
+ ServiceInstanceMetadataUtils.refreshMetadataAndInstance();
+ }, 0, ConfigurationUtils.get(METADATA_PUBLISH_DELAY_KEY, DEFAULT_METADATA_PUBLISH_DELAY), TimeUnit.MICROSECONDS);
+ }
+
+ private void doRegisterServiceInstance(ServiceInstance serviceInstance) {
+ //FIXME
publishMetadataToRemote(serviceInstance);
getServiceDiscoveries().forEach(serviceDiscovery ->
@@ -1032,31 +1036,13 @@ public class DubboBootstrap extends GenericEventListener {
// register metadata
serviceDiscovery.register(serviceInstance);
});
-
- // scheduled task for updating Metadata and ServiceInstance
- executorRepository.nextScheduledExecutor().scheduleAtFixedRate(() -> {
- publishMetadataToRemote(serviceInstance);
-
- getServiceDiscoveries().forEach(serviceDiscovery ->
- {
- calInstanceRevision(serviceDiscovery, serviceInstance);
- // register metadata
- serviceDiscovery.register(serviceInstance);
- });
- }, 0, 5000, TimeUnit.MICROSECONDS);
}
private void publishMetadataToRemote(ServiceInstance serviceInstance) {
+// InMemoryWritableMetadataService localMetadataService = (InMemoryWritableMetadataService)WritableMetadataService.getDefaultExtension();
+// localMetadataService.blockUntilUpdated();
RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService();
- remoteMetadataService.publishMetadata(serviceInstance);
- }
-
- private void calInstanceRevision(ServiceDiscovery serviceDiscovery, ServiceInstance instance) {
- String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
- MetadataInfo metadataInfo = WritableMetadataService.getDefaultExtension().getMetadataInfos().get(registryCluster);
- if (metadataInfo != null) {
- instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision());
- }
+ remoteMetadataService.publishMetadata(serviceInstance.getServiceName());
}
private URL selectMetadataServiceExportedURL() {
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
index d670089..7ba0a43 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataConstants.java
@@ -16,22 +16,13 @@
*/
package org.apache.dubbo.metadata;
-public interface MetadataConstants {
- String KEY_SEPARATOR = ":";
- String DEFAULT_PATH_TAG = "metadata";
- String KEY_REVISON_PREFIX = "revision";
- String META_DATA_STORE_TAG = ".metaData";
- String SERVICE_META_DATA_STORE_TAG = ".smd";
- String CONSUMER_META_DATA_STORE_TAG = ".cmd";
-
- /**
- * @since 2.7.8
- */
- String EXPORTED_URLS_TAG = "exported-urls";
-
- /**
- * @since 2.7.8
- */
- String SUBSCRIBED_URLS_TAG = "subscribed-urls";
-
+public class MetadataConstants {
+ public static final String KEY_SEPARATOR = ":";
+ public static final String DEFAULT_PATH_TAG = "metadata";
+ public static final String KEY_REVISON_PREFIX = "revision";
+ public static final String META_DATA_STORE_TAG = ".metaData";
+ public static final String SERVICE_META_DATA_STORE_TAG = ".smd";
+ public static final String CONSUMER_META_DATA_STORE_TAG = ".cmd";
+ public static final String METADATA_PUBLISH_DELAY_KEY = "dubbo.application.metadata.delay";
+ public static final int DEFAULT_METADATA_PUBLISH_DELAY = 5000;
}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 73e736f..4db0298 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -20,6 +20,7 @@ import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.compiler.support.ClassUtils;
import org.apache.dubbo.common.extension.ExtensionLoader;
import org.apache.dubbo.common.utils.ArrayUtils;
+import org.apache.dubbo.common.utils.CollectionUtils;
import org.apache.dubbo.common.utils.StringUtils;
import java.io.Serializable;
@@ -39,6 +40,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
public class MetadataInfo implements Serializable {
+ public static String DEFAULT_REVISION = "0";
private String app;
private String revision;
private Map<String, ServiceInfo> services;
@@ -85,6 +87,11 @@ public class MetadataInfo implements Serializable {
if (revision != null && hasReported()) {
return revision;
}
+
+ if (CollectionUtils.isEmptyMap(services)) {
+ return DEFAULT_REVISION;
+ }
+
StringBuilder sb = new StringBuilder();
sb.append(app);
for (Map.Entry<String, ServiceInfo> entry : services.entrySet()) {
diff --git a/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java
new file mode 100644
index 0000000..990854c
--- /dev/null
+++ b/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/impl/PublishMetadata.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.qos.command.impl;
+
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.threadpool.manager.ExecutorRepository;
+import org.apache.dubbo.common.utils.ArrayUtils;
+import org.apache.dubbo.qos.command.BaseCommand;
+import org.apache.dubbo.qos.command.CommandContext;
+import org.apache.dubbo.qos.command.annotation.Cmd;
+import org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils;
+
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+@Cmd(name = "publishMetadata", summary = "update service metadata and service instance", example = {
+ "publishMetadata",
+ "publishMetadata 5"
+})
+public class PublishMetadata implements BaseCommand {
+ private static final Logger logger = LoggerFactory.getLogger(PublishMetadata.class);
+ private final ExecutorRepository executorRepository = ExtensionLoader.getExtensionLoader(ExecutorRepository.class).getDefaultExtension();
+ private ScheduledFuture future;
+
+ @Override
+ public String execute(CommandContext commandContext, String[] args) {
+ logger.info("received publishMetadata command.");
+
+ if (ArrayUtils.isEmpty(args)) {
+ ServiceInstanceMetadataUtils.refreshMetadataAndInstance();
+ return "publish metadata succeeded.";
+ }
+
+ try {
+ int delay = Integer.parseInt(args[0]);
+ if (future == null || future.isDone() || future.isCancelled()) {
+ future = executorRepository.nextScheduledExecutor()
+ .scheduleWithFixedDelay(ServiceInstanceMetadataUtils::refreshMetadataAndInstance, 0, delay, TimeUnit.MILLISECONDS);
+ }
+ } catch (NumberFormatException e) {
+ logger.error("Wrong delay param", e);
+ return "publishMetadata failed! Wrong delay param!";
+ }
+ return "publish task submitted, will publish in " + args[0] + " seconds.";
+ }
+
+}
diff --git a/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand
index cb6e9a7..b92b6b2 100644
--- a/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand
+++ b/dubbo-plugin/dubbo-qos/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.qos.command.BaseCommand
@@ -5,3 +5,4 @@ ls=org.apache.dubbo.qos.command.impl.Ls
offline=org.apache.dubbo.qos.command.impl.Offline
ready=org.apache.dubbo.qos.command.impl.Ready
version=org.apache.dubbo.qos.command.impl.Version
+publish-metadata=org.apache.dubbo.qos.command.impl.PublishMetadata
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
index b5517b7..ee99000 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/EventPublishingServiceDiscovery.java
@@ -229,6 +229,11 @@ final class EventPublishingServiceDiscovery implements ServiceDiscovery {
}
@Override
+ public ServiceInstance getLocalInstance() {
+ return serviceDiscovery.getLocalInstance();
+ }
+
+ @Override
public void initialize(URL registryURL) {
assertInitialized(INITIALIZE_ACTION);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
index ba8d7d3..2a51168 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/FileSystemServiceDiscovery.java
@@ -62,6 +62,8 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen
private FileSystemDynamicConfiguration dynamicConfiguration;
+ private ServiceInstance serviceInstance;
+
@Override
public void onEvent(ServiceInstancesChangedEvent event) {
@@ -134,7 +136,13 @@ public class FileSystemServiceDiscovery implements ServiceDiscovery, EventListen
}
@Override
+ public ServiceInstance getLocalInstance() {
+ return serviceInstance;
+ }
+
+ @Override
public void register(ServiceInstance serviceInstance) throws RuntimeException {
+ this.serviceInstance = serviceInstance;
String serviceInstanceId = getServiceInstanceId(serviceInstance);
String serviceName = getServiceName(serviceInstance);
String content = toJSONString(serviceInstance);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
index f9e667b..9800c35 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscovery.java
@@ -267,6 +267,8 @@ public interface ServiceDiscovery extends Prioritized {
return null;
}
+ ServiceInstance getLocalInstance();
+
/**
* A human-readable description of the implementation
*
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index 9890988..3fd7713 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -64,7 +64,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
}
private void refreshInvoker(List<URL> invokerUrls) {
- Assert.notNull(invokerUrls, "invokerUrls should not be null");
+ Assert.notNull(invokerUrls, "invokerUrls should not be null, use empty:// to clear address.");
if (invokerUrls.size() == 1
&& invokerUrls.get(0) != null
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index 1901e34..8af48a3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -45,6 +45,7 @@ import java.util.TreeSet;
import static org.apache.dubbo.common.constants.CommonConstants.REGISTER_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.REMOTE_METADATA_STORAGE_TYPE;
+import static org.apache.dubbo.metadata.MetadataInfo.DEFAULT_REVISION;
import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.getExportedServicesRevision;
/**
@@ -86,11 +87,17 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
String appName = event.getServiceName();
allInstances.put(appName, event.getServiceInstances());
+ Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
+ Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
+ Map<Set<String>, List<URL>> revisionsToUrls = new HashMap();
for (Map.Entry<String, List<ServiceInstance>> entry : allInstances.entrySet()) {
List<ServiceInstance> instances = entry.getValue();
for (ServiceInstance instance : instances) {
String revision = getExportedServicesRevision(instance);
- Map<String, List<ServiceInstance>> revisionToInstances = new HashMap<>();
+ if (DEFAULT_REVISION.equals(revision)) {
+ logger.info("Find instance without valid service metadata: " + instance.getAddress());
+ continue;
+ }
List<ServiceInstance> subInstances = revisionToInstances.computeIfAbsent(revision, r -> new LinkedList<>());
subInstances.add(instance);
@@ -104,7 +111,6 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
}
}
- Map<String, Set<String>> localServiceToRevisions = new HashMap<>();
if (metadata != null) {
parseMetadata(revision, metadata, localServiceToRevisions);
((DefaultServiceInstance) instance).setServiceMetadata(metadata);
@@ -115,7 +121,6 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
// set.add(revision);
// }
- Map<Set<String>, List<URL>> revisionsToUrls = new HashMap();
localServiceToRevisions.forEach((serviceKey, revisions) -> {
List<URL> urls = revisionsToUrls.get(revisions);
if (urls != null) {
@@ -140,8 +145,7 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
private Map<String, Set<String>> parseMetadata(String revision, MetadataInfo metadata, Map<String, Set<String>> localServiceToRevisions) {
Map<String, ServiceInfo> serviceInfos = metadata.getServices();
for (Map.Entry<String, ServiceInfo> entry : serviceInfos.entrySet()) {
- String serviceKey = entry.getValue().getServiceKey();
- Set<String> set = localServiceToRevisions.computeIfAbsent(serviceKey, k -> new TreeSet<>());
+ Set<String> set = localServiceToRevisions.computeIfAbsent(entry.getKey(), k -> new TreeSet<>());
set.add(revision);
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
index e55f6f2..686d4cb 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
@@ -18,9 +18,14 @@ package org.apache.dubbo.registry.client.metadata;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.metadata.MetadataInfo;
import org.apache.dubbo.metadata.MetadataService;
import org.apache.dubbo.metadata.WritableMetadataService;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
import org.apache.dubbo.registry.client.ServiceInstance;
+import org.apache.dubbo.registry.client.metadata.store.RemoteMetadataServiceImpl;
+import org.apache.dubbo.registry.support.AbstractRegistryFactory;
+import org.apache.dubbo.rpc.model.ApplicationModel;
import com.alibaba.fastjson.JSON;
@@ -39,6 +44,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.utils.StringUtils.isBlank;
import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_PROVIDER_KEYS;
import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
+import static org.apache.dubbo.rpc.Constants.ID_KEY;
/**
* The Utilities class for the {@link ServiceInstance#getMetadata() metadata of the service instance}
@@ -87,6 +93,8 @@ public class ServiceInstanceMetadataUtils {
public static String METADATA_CLUSTER_PROPERTY_NAME = "dubbo.metadata.cluster";
+ public static String INSTANCE_REVISION_UPDATED_KEY = "dubbo.instance.revision.updated";
+
/**
* Get the multiple {@link URL urls'} parameters of {@link MetadataService MetadataService's} Metadata
*
@@ -247,6 +255,35 @@ public class ServiceInstanceMetadataUtils {
return null;
}
+ public static void calInstanceRevision(ServiceDiscovery serviceDiscovery, ServiceInstance instance) {
+ String registryCluster = serviceDiscovery.getUrl().getParameter(ID_KEY);
+ MetadataInfo metadataInfo = WritableMetadataService.getDefaultExtension().getMetadataInfos().get(registryCluster);
+ if (metadataInfo != null) {
+ String existingInstanceRevision = instance.getMetadata().get(EXPORTED_SERVICES_REVISION_PROPERTY_NAME);
+ if (!metadataInfo.getRevision().equals(existingInstanceRevision)) {
+ instance.getMetadata().put(EXPORTED_SERVICES_REVISION_PROPERTY_NAME, metadataInfo.getRevision());
+ if (existingInstanceRevision != null) {// skip the first registration.
+ instance.getExtendParams().put(INSTANCE_REVISION_UPDATED_KEY, "true");
+ }
+ }
+ }
+ }
+
+ public static boolean isInstanceUpdated(ServiceInstance instance) {
+ return "true".equals(instance.getExtendParams().get(INSTANCE_REVISION_UPDATED_KEY));
+ }
+
+ public static void refreshMetadataAndInstance() {
+ RemoteMetadataServiceImpl remoteMetadataService = MetadataUtils.getRemoteMetadataService();
+ remoteMetadataService.publishMetadata(ApplicationModel.getName());
+
+ AbstractRegistryFactory.getServiceDiscoveries().forEach(serviceDiscovery -> {
+ calInstanceRevision(serviceDiscovery, serviceDiscovery.getLocalInstance());
+ // update service instance revision
+ serviceDiscovery.update(serviceDiscovery.getLocalInstance());
+ });
+ }
+
/**
* Set the default parameters via the specified {@link URL providerURL}
*
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
index 8605eaa..2b747de 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/InMemoryWritableMetadataService.java
@@ -41,6 +41,7 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
+import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -74,6 +75,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
*/
ConcurrentNavigableMap<String, SortedSet<URL>> exportedServiceURLs = new ConcurrentSkipListMap<>();
ConcurrentMap<String, MetadataInfo> metadataInfos;
+ final Semaphore metadataSemaphore = new Semaphore(1);
// ==================================================================================== //
@@ -131,6 +133,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
});
metadataInfo.addService(new ServiceInfo(url));
}
+ metadataSemaphore.release();
return addURL(exportedServiceURLs, url);
}
@@ -145,6 +148,7 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
metadataInfos.remove(key);
}
}
+ metadataSemaphore.release();
return removeURL(exportedServiceURLs, url);
}
@@ -202,6 +206,14 @@ public class InMemoryWritableMetadataService implements WritableMetadataService
return null;
}
+ public void blockUntilUpdated() {
+ try {
+ metadataSemaphore.acquire();
+ } catch (InterruptedException e) {
+ logger.warn("metadata refresh thread has been interrupted unexpectedly while wating for update.", e);
+ }
+ }
+
public Map<String, MetadataInfo> getMetadataInfos() {
return metadataInfos;
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index ee41050..00aa92a 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -47,7 +47,6 @@ import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
public class RemoteMetadataServiceImpl {
-
protected final Logger logger = LoggerFactory.getLogger(getClass());
private WritableMetadataService localMetadataService;
@@ -59,11 +58,11 @@ public class RemoteMetadataServiceImpl {
return MetadataReportInstance.getMetadataReports(true);
}
- public void publishMetadata(ServiceInstance instance) {
+ public void publishMetadata(String serviceName) {
Map<String, MetadataInfo> metadataInfos = localMetadataService.getMetadataInfos();
metadataInfos.forEach((registryKey, metadataInfo) -> {
if (!metadataInfo.hasReported()) {
- SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(instance.getServiceName(), metadataInfo.getRevision());
+ SubscriberMetadataIdentifier identifier = new SubscriberMetadataIdentifier(serviceName, metadataInfo.getRevision());
metadataInfo.getRevision();
metadataInfo.getExtendParams().put(REGISTRY_KEY, registryKey);
MetadataReport metadataReport = getMetadataReports().get(registryKey);
@@ -71,6 +70,7 @@ public class RemoteMetadataServiceImpl {
metadataReport = getMetadataReports().entrySet().iterator().next().getValue();
}
metadataReport.publishAppMetadata(identifier, metadataInfo);
+ metadataInfo.markReported();
}
});
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java
index 7ea5559..541c22a 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/support/AbstractRegistryFactory.java
@@ -24,6 +24,8 @@ import org.apache.dubbo.registry.NotifyListener;
import org.apache.dubbo.registry.Registry;
import org.apache.dubbo.registry.RegistryFactory;
import org.apache.dubbo.registry.RegistryService;
+import org.apache.dubbo.registry.client.ServiceDiscovery;
+import org.apache.dubbo.registry.client.ServiceDiscoveryRegistry;
import java.util.Collection;
import java.util.Collections;
@@ -33,6 +35,7 @@ import java.util.List;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY;
@@ -69,6 +72,15 @@ public abstract class AbstractRegistryFactory implements RegistryFactory {
return REGISTRIES.get(key);
}
+ public static List<ServiceDiscovery> getServiceDiscoveries() {
+ return AbstractRegistryFactory.getRegistries()
+ .stream()
+ .filter(registry -> registry instanceof ServiceDiscoveryRegistry)
+ .map(registry -> (ServiceDiscoveryRegistry) registry)
+ .map(ServiceDiscoveryRegistry::getServiceDiscovery)
+ .collect(Collectors.toList());
+ }
+
/**
* Close all created registries
*/
diff --git a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java
index fbc410b..93043ba 100644
--- a/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-api/src/test/java/org/apache/dubbo/registry/client/InMemoryServiceDiscovery.java
@@ -42,6 +42,8 @@ public class InMemoryServiceDiscovery implements ServiceDiscovery {
private Map<String, List<ServiceInstance>> repository = new HashMap<>();
+ private ServiceInstance serviceInstance;
+
@Override
public Set<String> getServices() {
return repository.keySet();
@@ -68,12 +70,18 @@ public class InMemoryServiceDiscovery implements ServiceDiscovery {
return new DefaultPage<>(offset, pageSize, data, totalSize);
}
+ @Override
+ public ServiceInstance getLocalInstance() {
+ return serviceInstance;
+ }
+
public String toString() {
return "InMemoryServiceDiscovery";
}
@Override
public void register(ServiceInstance serviceInstance) throws RuntimeException {
+ this.serviceInstance = serviceInstance;
String serviceName = serviceInstance.getServiceName();
List<ServiceInstance> serviceInstances = repository.computeIfAbsent(serviceName, s -> new LinkedList<>());
if (!serviceInstances.contains(serviceInstance)) {
diff --git a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
index 29e6790..a4cdd6c 100644
--- a/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
+++ b/dubbo-registry/dubbo-registry-zookeeper/src/main/java/org/apache/dubbo/registry/zookeeper/ZookeeperServiceDiscovery.java
@@ -41,6 +41,7 @@ import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import static org.apache.dubbo.common.function.ThrowableFunction.execute;
+import static org.apache.dubbo.registry.client.metadata.ServiceInstanceMetadataUtils.isInstanceUpdated;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkParams.ROOT_PATH;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.build;
import static org.apache.dubbo.registry.zookeeper.util.CuratorFrameworkUtils.buildCuratorFramework;
@@ -64,6 +65,8 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery {
private org.apache.curator.x.discovery.ServiceDiscovery<ZookeeperInstance> serviceDiscovery;
+ private ServiceInstance serviceInstance;
+
/**
* The Key is watched Zookeeper path, the value is an instance of {@link CuratorWatcher}
*/
@@ -87,16 +90,25 @@ public class ZookeeperServiceDiscovery implements ServiceDiscovery {
serviceDiscovery.close();
}
+ @Override
+ public ServiceInstance getLocalInstance() {
+ return serviceInstance;
+ }
+
public void register(ServiceInstance serviceInstance) throws RuntimeException {
+ this.serviceInstance = serviceInstance;
doInServiceRegistry(serviceDiscovery -> {
serviceDiscovery.registerService(build(serviceInstance));
});
}
public void update(ServiceInstance serviceInstance) throws RuntimeException {
- doInServiceRegistry(serviceDiscovery -> {
- serviceDiscovery.updateService(build(serviceInstance));
- });
+ this.serviceInstance = serviceInstance;
+ if (isInstanceUpdated(serviceInstance)) {
+ doInServiceRegistry(serviceDiscovery -> {
+ serviceDiscovery.updateService(build(serviceInstance));
+ });
+ }
}
public void unregister(ServiceInstance serviceInstance) throws RuntimeException {