You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/08/04 07:10:02 UTC
[dubbo] 25/27: migrating from interface address pool to instance
address pool
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 5db015ccde960708decf19e07f40a1a6b4d95454
Author: ken.lj <ke...@gmail.com>
AuthorDate: Mon Aug 3 16:41:37 2020 +0800
migrating from interface address pool to instance address pool
---
.../apache/dubbo/rpc/cluster/ClusterInvoker.java | 2 +
.../org/apache/dubbo/rpc/cluster/Directory.java | 2 +
.../rpc/cluster/directory/AbstractDirectory.java | 1 +
.../cluster/support/AbstractClusterInvoker.java | 5 +
.../support/wrapper/MockClusterInvoker.java | 5 +
.../dubbo/common/constants/RegistryConstants.java | 6 +
.../org/apache/dubbo/config/ApplicationConfig.java | 8 +-
.../org/apache/dubbo/rpc/model/ConsumerModel.java | 6 +
.../dubbo/config/utils/ConfigValidationUtils.java | 30 +-
.../src/main/resources/spring/dubbo-consumer.xml | 4 +-
.../DynamicConfigurationServiceNameMapping.java | 2 +-
.../apache/dubbo/metadata/MappingChangedEvent.java | 9 +
.../org/apache/dubbo/metadata/MetadataInfo.java | 7 +
.../apache/dubbo/metadata/ServiceNameMapping.java | 3 +-
.../metadata/report/MetadataReportInstance.java | 10 +
.../org.apache.dubbo.metadata.ServiceNameMapping | 3 +-
...che.dubbo.metadata.report.MetadataReportFactory | 1 -
.../{integration => client}/RegistryProtocol.java | 1655 ++++++++++----------
.../registry/client/ServiceDiscoveryRegistry.java | 2 +-
.../client/ServiceDiscoveryRegistryProtocol.java | 51 -
.../ServiceDiscoveryRegistryProtocolListener.java | 1 -
.../listener/ServiceInstancesChangedListener.java | 4 -
.../metadata/MetadataServiceNameMapping.java | 51 +-
.../registry/client/metadata/MetadataUtils.java | 2 +
.../metadata/ServiceInstanceMetadataUtils.java | 2 +-
.../metadata/store/RemoteMetadataServiceImpl.java | 2 +-
.../registry/integration/DynamicDirectory.java | 2 +-
.../InterfaceCompatibleRegistryProtocol.java | 177 +++
.../registry/integration/RegistryDirectory.java | 2 +-
.../integration/RegistryInvokerWrapper.java | 79 -
.../integration/RegistryProtocolListener.java | 1 +
.../org.apache.dubbo.metadata.ServiceNameMapping | 1 +
.../dubbo/internal/org.apache.dubbo.rpc.Protocol | 4 +-
33 files changed, 1139 insertions(+), 1001 deletions(-)
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/ClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/ClusterInvoker.java
index 4b9199e..a89bdf4 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/ClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/ClusterInvoker.java
@@ -35,4 +35,6 @@ public interface ClusterInvoker<T> extends Invoker<T> {
URL getRegistryUrl();
Directory<T> getDirectory();
+
+ boolean isDestroyed();
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
index 2940bf6..5d48264 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/Directory.java
@@ -51,4 +51,6 @@ public interface Directory<T> extends Node {
URL getConsumerUrl();
+ boolean isDestroyed();
+
}
\ No newline at end of file
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
index 6b4773f..c663552 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/directory/AbstractDirectory.java
@@ -113,6 +113,7 @@ public abstract class AbstractDirectory<T> implements Directory<T> {
this.consumerUrl = consumerUrl;
}
+ @Override
public boolean isDestroyed() {
return destroyed;
}
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
index b1e8a95..4464763 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/AbstractClusterInvoker.java
@@ -113,6 +113,11 @@ public abstract class AbstractClusterInvoker<T> implements ClusterInvoker<T> {
}
}
+ @Override
+ public boolean isDestroyed() {
+ return destroyed.get();
+ }
+
/**
* Select a invoker using loadbalance policy.</br>
* a) Firstly, select an invoker using loadbalance. If this invoker is in previously selected list, or,
diff --git a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
index 90c43bd..a682297 100644
--- a/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
+++ b/dubbo-cluster/src/main/java/org/apache/dubbo/rpc/cluster/support/wrapper/MockClusterInvoker.java
@@ -64,6 +64,11 @@ public class MockClusterInvoker<T> implements ClusterInvoker<T> {
}
@Override
+ public boolean isDestroyed() {
+ return directory.isDestroyed();
+ }
+
+ @Override
public boolean isAvailable() {
return directory.isAvailable();
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/RegistryConstants.java b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/RegistryConstants.java
index d25e9fc..3feeeb2 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/constants/RegistryConstants.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/constants/RegistryConstants.java
@@ -23,6 +23,8 @@ public interface RegistryConstants {
String REGISTRY_CLUSTER_KEY = "REGISTRY_CLUSTER";
+ String REGISTRY_CLUSTER = "REGISTRY_CLUSTER";
+
String REGISTRY_CLUSTER_TYPE_KEY = "registry-cluster-type";
String REGISTRY_PROTOCOL = "registry";
@@ -57,6 +59,10 @@ public interface RegistryConstants {
String COMPATIBLE_CONFIG_KEY = "compatible_config";
+ String REGISTRY_DUPLICATE_KEY = "duplicate";
+
+ String ENABLE_REGISTRY_DIRECTORY_AUTO_MIGRATION = "enable-auto-migration";
+
/**
* The parameter key of Dubbo Registry type
*
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java b/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
index 460dbc6..7e54e42 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/config/ApplicationConfig.java
@@ -464,7 +464,13 @@ public class ApplicationConfig extends AbstractConfig {
for (InfraAdapter adapter : adapters) {
Map<String, String> extraParameters = adapter.getExtraAttributes(inputParameters);
if (CollectionUtils.isNotEmptyMap(extraParameters)) {
- parameters.putAll(extraParameters);
+ extraParameters.forEach((key, value) -> {
+ String prefix = this.getPrefix() + ".";
+ if (key.startsWith(prefix)) {
+ key = key.substring(prefix.length());
+ }
+ parameters.put(key, value);
+ });
}
}
}
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
index e49888d..34fe276 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/rpc/model/ConsumerModel.java
@@ -28,6 +28,7 @@ import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
+import java.util.TreeSet;
/**
* This model is bound to your reference's configuration, for example, group, version or method level configuration.
@@ -36,6 +37,7 @@ public class ConsumerModel {
private String serviceKey;
private final ServiceDescriptor serviceModel;
private final ReferenceConfigBase<?> referenceConfig;
+ private final Set<String> apps = new TreeSet<>();
private Object proxyObject;
@@ -110,6 +112,10 @@ public class ConsumerModel {
return referenceConfig;
}
+ public Set<String> getApps() {
+ return apps;
+ }
+
public AsyncMethodInfo getAsyncInfo(String methodName) {
return methodConfigs.get(methodName);
}
diff --git a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
index 8a91d1c..13e5fb1 100644
--- a/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
+++ b/dubbo-config/dubbo-config-api/src/main/java/org/apache/dubbo/config/utils/ConfigValidationUtils.java
@@ -86,8 +86,10 @@ import static org.apache.dubbo.common.constants.CommonConstants.SHUTDOWN_WAIT_SE
import static org.apache.dubbo.common.constants.CommonConstants.THREADPOOL_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.USERNAME_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_DUPLICATE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_TYPE_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_PROTOCOL;
import static org.apache.dubbo.common.constants.RemotingConstants.BACKUP_KEY;
import static org.apache.dubbo.common.extension.ExtensionLoader.getExtensionLoader;
@@ -202,7 +204,33 @@ public class ConfigValidationUtils {
}
}
}
- return registryList;
+ return genCompatibleRegistries(registryList, provider);
+ }
+
+ private static List<URL> genCompatibleRegistries(List<URL> registryList, boolean provider) {
+ List<URL> result = new ArrayList<>(registryList.size());
+ registryList.forEach(registryURL -> {
+ result.add(registryURL);
+ if (provider) {
+ // for registries enabled service discovery, automatically register interface compatible addresses.
+ if (SERVICE_REGISTRY_PROTOCOL.equals(registryURL.getProtocol())
+ && registryURL.getParameter(REGISTRY_DUPLICATE_KEY, true)
+ && registryNotExists(registryURL, registryList, REGISTRY_PROTOCOL)) {
+ URL interfaceCompatibleRegistryURL = URLBuilder.from(registryURL)
+ .setProtocol(REGISTRY_PROTOCOL)
+ .removeParameter(REGISTRY_TYPE_KEY)
+ .build();
+ result.add(interfaceCompatibleRegistryURL);
+ }
+ }
+ });
+ return result;
+ }
+
+ private static boolean registryNotExists(URL registryURL, List<URL> registryList, String registryType) {
+ return registryList.stream().noneMatch(
+ url -> registryType.equals(url.getProtocol()) && registryURL.getBackupAddress().equals(url.getBackupAddress())
+ );
}
public static URL loadMonitor(AbstractInterfaceConfig interfaceConfig, URL registryURL) {
diff --git a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml
index 44e8712..9225959 100644
--- a/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml
+++ b/dubbo-demo/dubbo-demo-xml/dubbo-demo-xml-consumer/src/main/resources/spring/dubbo-consumer.xml
@@ -23,12 +23,12 @@
<dubbo:application name="demo-consumer">
<dubbo:parameter key="mapping-type" value="metadata"/>
+ <dubbo:parameter key="enable-auto-migration" value="true"/>
</dubbo:application>
<!-- <dubbo:metadata-report address="zookeeper://127.0.0.1:2181"/>-->
- <dubbo:registry address="zookeeper://127.0.0.1:2181?registry-type=service"/>
-
+ <dubbo:registry address="zookeeper://127.0.0.1:2181"/>
<dubbo:reference provided-by="demo-provider" id="demoService" check="false"
interface="org.apache.dubbo.demo.DemoService"/>
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DynamicConfigurationServiceNameMapping.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DynamicConfigurationServiceNameMapping.java
index 21df199..12d6665 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DynamicConfigurationServiceNameMapping.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/DynamicConfigurationServiceNameMapping.java
@@ -72,7 +72,7 @@ public class DynamicConfigurationServiceNameMapping implements ServiceNameMappin
}
@Override
- public Set<String> get(URL url, MappingListener mappingListener) {
+ public Set<String> getAndListen(URL url, MappingListener mappingListener) {
String serviceInterface = url.getServiceInterface();
String group = url.getParameter(GROUP_KEY);
String version = url.getParameter(VERSION_KEY);
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingChangedEvent.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingChangedEvent.java
index 27a7a3f..36b6275 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingChangedEvent.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MappingChangedEvent.java
@@ -21,6 +21,7 @@ import java.util.Set;
public class MappingChangedEvent {
private String serviceKey;
private Set<String> apps;
+ private Set<String> oldApps;
public String getServiceKey() {
return serviceKey;
@@ -37,4 +38,12 @@ public class MappingChangedEvent {
public void setApps(Set<String> apps) {
this.apps = apps;
}
+
+ public Set<String> getOldApps() {
+ return oldApps;
+ }
+
+ public void setOldApps(Set<String> oldApps) {
+ this.oldApps = oldApps;
+ }
}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 47e8705..7ce4cfb 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -191,12 +191,15 @@ public class MetadataInfo implements Serializable {
// service + group + version + protocol
private transient String matchKey;
+ private URL url;
+
public ServiceInfo() {
}
public ServiceInfo(URL url) {
this(url.getServiceInterface(), url.getParameter(GROUP_KEY), url.getParameter(VERSION_KEY), url.getProtocol(), url.getPath(), null);
+ this.url = url;
Map<String, String> params = new HashMap<>();
List<MetadataParamsFilter> filters = loader.getActivateExtension(url, "params-filter");
for (MetadataParamsFilter filter : filters) {
@@ -409,6 +412,10 @@ public class MetadataInfo implements Serializable {
return methodNumbers;
}
+ public URL getUrl() {
+ return url;
+ }
+
@Override
public boolean equals(Object obj) {
if (obj == null) {
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/ServiceNameMapping.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/ServiceNameMapping.java
index b4ad31e..f08a748 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/ServiceNameMapping.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/ServiceNameMapping.java
@@ -44,8 +44,7 @@ public interface ServiceNameMapping {
*
* @return
*/
- Set<String> get(URL url, MappingListener mappingListener);
-
+ Set<String> getAndListen(URL url, MappingListener mappingListener);
/**
* Get the default extension of {@link ServiceNameMapping}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReportInstance.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReportInstance.java
index c3afdb0..8d11784 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReportInstance.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/report/MetadataReportInstance.java
@@ -65,6 +65,16 @@ public class MetadataReportInstance {
return metadataReports;
}
+ public static MetadataReport getMetadataReport(String registryKey) {
+ checkInit();
+ MetadataReport metadataReport = metadataReports.get(registryKey);
+ if (metadataReport == null) {
+ metadataReport = metadataReports.values().iterator().next();
+ }
+ return metadataReport;
+ }
+
+
private static void checkInit() {
if (!init.get()) {
throw new IllegalStateException("the metadata report was not inited.");
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.ServiceNameMapping b/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.ServiceNameMapping
index d43b342..b991bc3 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.ServiceNameMapping
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.ServiceNameMapping
@@ -1,2 +1 @@
-config=org.apache.dubbo.metadata.DynamicConfigurationServiceNameMapping
-metadata=org.apache.dubbo.metadata.MetadataServiceNameMapping
\ No newline at end of file
+config=org.apache.dubbo.metadata.DynamicConfigurationServiceNameMapping
\ No newline at end of file
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory b/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory
deleted file mode 100644
index b686fce..0000000
--- a/dubbo-metadata/dubbo-metadata-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.report.MetadataReportFactory
+++ /dev/null
@@ -1 +0,0 @@
-file = org.apache.dubbo.metadata.report.support.file.FileSystemMetadataReportFactory
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/RegistryProtocol.java
similarity index 92%
rename from dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
rename to dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/RegistryProtocol.java
index 976d0da..700062a 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocol.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/RegistryProtocol.java
@@ -1,830 +1,825 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry.integration;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.common.URLBuilder;
-import org.apache.dubbo.common.config.ConfigurationUtils;
-import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
-import org.apache.dubbo.common.extension.ExtensionLoader;
-import org.apache.dubbo.common.logger.Logger;
-import org.apache.dubbo.common.logger.LoggerFactory;
-import org.apache.dubbo.common.timer.HashedWheelTimer;
-import org.apache.dubbo.common.utils.CollectionUtils;
-import org.apache.dubbo.common.utils.NamedThreadFactory;
-import org.apache.dubbo.common.utils.StringUtils;
-import org.apache.dubbo.common.utils.UrlUtils;
-import org.apache.dubbo.registry.NotifyListener;
-import org.apache.dubbo.registry.Registry;
-import org.apache.dubbo.registry.RegistryFactory;
-import org.apache.dubbo.registry.RegistryService;
-import org.apache.dubbo.registry.retry.ReExportTask;
-import org.apache.dubbo.registry.support.SkipFailbackWrapperException;
-import org.apache.dubbo.rpc.Exporter;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.Protocol;
-import org.apache.dubbo.rpc.ProtocolServer;
-import org.apache.dubbo.rpc.ProxyFactory;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.cluster.Cluster;
-import org.apache.dubbo.rpc.cluster.Configurator;
-import org.apache.dubbo.rpc.cluster.governance.GovernanceRuleRepository;
-import org.apache.dubbo.rpc.cluster.support.MergeableCluster;
-import org.apache.dubbo.rpc.model.ApplicationModel;
-import org.apache.dubbo.rpc.model.ProviderModel;
-import org.apache.dubbo.rpc.protocol.InvokerWrapper;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import static java.util.concurrent.Executors.newSingleThreadExecutor;
-import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
-import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.EXTRA_KEYS_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.HIDE_KEY_PREFIX;
-import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
-import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
-import static org.apache.dubbo.common.constants.FilterConstants.VALIDATION_KEY;
-import static org.apache.dubbo.common.constants.QosConstants.ACCEPT_FOREIGN_IP;
-import static org.apache.dubbo.common.constants.QosConstants.QOS_ENABLE;
-import static org.apache.dubbo.common.constants.QosConstants.QOS_HOST;
-import static org.apache.dubbo.common.constants.QosConstants.QOS_PORT;
-import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
-import static org.apache.dubbo.common.constants.RegistryConstants.OVERRIDE_PROTOCOL;
-import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
-import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
-import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
-import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
-import static org.apache.dubbo.common.utils.UrlUtils.classifyUrls;
-import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
-import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
-import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY;
-import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RETRY_PERIOD;
-import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
-import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
-import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
-import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY;
-import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
-import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
-import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
-import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
-import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
-import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
-import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY;
-import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
-import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
-import static org.apache.dubbo.rpc.Constants.INTERFACES;
-import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
-import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
-import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
-
-/**
- * RegistryProtocol
- */
-public class RegistryProtocol implements Protocol {
- public static final String[] DEFAULT_REGISTER_PROVIDER_KEYS = {
- APPLICATION_KEY, CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY, CONNECTIONS_KEY, DEPRECATED_KEY,
- GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY, PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY,
- WEIGHT_KEY, TIMESTAMP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
- };
-
- public static final String[] DEFAULT_REGISTER_CONSUMER_KEYS = {
- APPLICATION_KEY, VERSION_KEY, GROUP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
- };
-
- private final static Logger logger = LoggerFactory.getLogger(RegistryProtocol.class);
- private final Map<URL, NotifyListener> overrideListeners = new ConcurrentHashMap<>();
- private final Map<String, ServiceConfigurationListener> serviceConfigurationListeners = new ConcurrentHashMap<>();
- private final ProviderConfigurationListener providerConfigurationListener = new ProviderConfigurationListener();
- //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
- //providerurl <--> exporter
- private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>();
- private Protocol protocol;
- private RegistryFactory registryFactory;
- private ProxyFactory proxyFactory;
-
- private ConcurrentMap<URL, ReExportTask> reExportFailedTasks = new ConcurrentHashMap<>();
- private HashedWheelTimer retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboReexportTimer", true), DEFAULT_REGISTRY_RETRY_PERIOD, TimeUnit.MILLISECONDS, 128);
-
- //Filter the parameters that do not need to be output in url(Starting with .)
- private static String[] getFilteredKeys(URL url) {
- Map<String, String> params = url.getParameters();
- if (CollectionUtils.isNotEmptyMap(params)) {
- return params.keySet().stream()
- .filter(k -> k.startsWith(HIDE_KEY_PREFIX))
- .toArray(String[]::new);
- } else {
- return new String[0];
- }
- }
-
- public void setProtocol(Protocol protocol) {
- this.protocol = protocol;
- }
-
- public void setRegistryFactory(RegistryFactory registryFactory) {
- this.registryFactory = registryFactory;
- }
-
- public void setProxyFactory(ProxyFactory proxyFactory) {
- this.proxyFactory = proxyFactory;
- }
-
- @Override
- public int getDefaultPort() {
- return 9090;
- }
-
- public Map<URL, NotifyListener> getOverrideListeners() {
- return overrideListeners;
- }
-
- private void register(URL registryUrl, URL registeredProviderUrl) {
- Registry registry = registryFactory.getRegistry(registryUrl);
- registry.register(registeredProviderUrl);
- }
-
- private void registerStatedUrl(URL registryUrl, URL registeredProviderUrl, boolean registered) {
- ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
- model.addStatedUrl(new ProviderModel.RegisterStatedURL(
- registeredProviderUrl,
- registryUrl,
- registered));
- }
-
- @Override
- public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
- URL registryUrl = getRegistryUrl(originInvoker);
- // url to export locally
- URL providerUrl = getProviderUrl(originInvoker);
-
- // Subscribe the override data
- // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
- // the same service. Because the subscribed is cached key with the name of the service, it causes the
- // subscription information to cover.
- final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
- final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
- overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
-
- providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
- //export invoker
- final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
-
- // url to registry
- final Registry registry = getRegistry(originInvoker);
- final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
-
- // decide if we need to delay publish
- boolean register = providerUrl.getParameter(REGISTER_KEY, true);
- if (register) {
- register(registryUrl, registeredProviderUrl);
- }
-
- // register stated url on provider model
- registerStatedUrl(registryUrl, registeredProviderUrl, register);
-
-
- exporter.setRegisterUrl(registeredProviderUrl);
- exporter.setSubscribeUrl(overrideSubscribeUrl);
-
- // Deprecated! Subscribe to override rules in 2.6.x or before.
- registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
-
- notifyExport(exporter);
- //Ensure that a new exporter instance is returned every time export
- return new DestroyableExporter<>(exporter);
- }
-
- private <T> void notifyExport(ExporterChangeableWrapper<T> exporter) {
- List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
- .getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener");
- if (CollectionUtils.isNotEmpty(listeners)) {
- for (RegistryProtocolListener listener : listeners) {
- listener.onExport(this, exporter);
- }
- }
- }
-
- private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) {
- providerUrl = providerConfigurationListener.overrideUrl(providerUrl);
- ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener);
- serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener);
- return serviceConfigurationListener.overrideUrl(providerUrl);
- }
-
- @SuppressWarnings("unchecked")
- private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
- String key = getCacheKey(originInvoker);
-
- return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
- Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
- return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
- });
- }
-
- public <T> void reExport(Exporter<T> exporter, URL newInvokerUrl) {
- if (exporter instanceof ExporterChangeableWrapper) {
- ExporterChangeableWrapper<T> exporterWrapper = (ExporterChangeableWrapper<T>) exporter;
- Invoker<T> originInvoker = exporterWrapper.getOriginInvoker();
- reExport(originInvoker, newInvokerUrl);
- }
- }
-
- /**
- * Reexport the invoker of the modified url
- *
- * @param originInvoker
- * @param newInvokerUrl
- * @param <T>
- */
- @SuppressWarnings("unchecked")
- public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
- String key = getCacheKey(originInvoker);
- ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
- URL registeredUrl = exporter.getRegisterUrl();
-
- URL registryUrl = getRegistryUrl(originInvoker);
- URL newProviderUrl = getUrlToRegistry(newInvokerUrl, registryUrl);
-
- // update local exporter
- Invoker<T> invokerDelegate = new InvokerDelegate<T>(originInvoker, newInvokerUrl);
- exporter.setExporter(protocol.export(invokerDelegate));
-
- // update registry
- if (!newProviderUrl.equals(registeredUrl)) {
- try {
- doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl);
- } catch (Exception e) {
- ReExportTask oldTask = reExportFailedTasks.get(registeredUrl);
- if (oldTask != null) {
- return;
- }
- ReExportTask task = new ReExportTask(
- () -> doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl),
- registeredUrl,
- null
- );
- oldTask = reExportFailedTasks.putIfAbsent(registeredUrl, task);
- if (oldTask == null) {
- // never has a retry task. then start a new task for retry.
- retryTimer.newTimeout(task, registryUrl.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD), TimeUnit.MILLISECONDS);
- }
- }
- }
- }
-
- private <T> void doReExport(final Invoker<T> originInvoker, ExporterChangeableWrapper<T> exporter,
- URL registryUrl, URL oldProviderUrl, URL newProviderUrl) {
- if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) {
- Registry registry = null;
- try {
- registry = getRegistry(originInvoker);
- } catch (Exception e) {
- throw new SkipFailbackWrapperException(e);
- }
-
- logger.info("Try to unregister old url: " + oldProviderUrl);
- registry.reExportUnregister(oldProviderUrl);
-
- logger.info("Try to register new url: " + newProviderUrl);
- registry.reExportRegister(newProviderUrl);
- }
- try {
- ProviderModel.RegisterStatedURL statedUrl = getStatedUrl(registryUrl, newProviderUrl);
- statedUrl.setProviderUrl(newProviderUrl);
- exporter.setRegisterUrl(newProviderUrl);
- } catch (Exception e) {
- throw new SkipFailbackWrapperException(e);
- }
- }
-
- private ProviderModel.RegisterStatedURL getStatedUrl(URL registryUrl, URL providerUrl) {
- ProviderModel providerModel = ApplicationModel.getServiceRepository()
- .lookupExportedService(providerUrl.getServiceKey());
-
- List<ProviderModel.RegisterStatedURL> statedUrls = providerModel.getStatedUrl();
- return statedUrls.stream()
- .filter(u -> u.getRegistryUrl().equals(registryUrl)
- && u.getProviderUrl().getProtocol().equals(providerUrl.getProtocol()))
- .findFirst().orElseThrow(() -> new IllegalStateException("There should have at least one registered url."));
- }
-
- /**
- * Get an instance of registry based on the address of invoker
- *
- * @param originInvoker
- * @return
- */
- protected Registry getRegistry(final Invoker<?> originInvoker) {
- URL registryUrl = getRegistryUrl(originInvoker);
- return registryFactory.getRegistry(registryUrl);
- }
-
- protected URL getRegistryUrl(Invoker<?> originInvoker) {
- URL registryUrl = originInvoker.getUrl();
- if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
- String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
- registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
- }
- return registryUrl;
- }
-
- protected URL getRegistryUrl(URL url) {
- return URLBuilder.from(url)
- .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
- .removeParameter(REGISTRY_KEY)
- .build();
- }
-
-
- /**
- * Return the url that is registered to the registry and filter the url parameter once
- *
- * @param providerUrl
- * @return url to registry.
- */
- private URL getUrlToRegistry(final URL providerUrl, final URL registryUrl) {
- //The address you see at the registry
- if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
- return providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameters(
- MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE, QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY,
- INTERFACES);
- } else {
- String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
- // if path is not the same as interface name then we should keep INTERFACE_KEY,
- // otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',
- // but what we expect is '/dubbo/interface/providers'
- if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
- if (StringUtils.isNotEmpty(extraKeys)) {
- extraKeys += ",";
- }
- extraKeys += INTERFACE_KEY;
- }
- String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS
- , COMMA_SPLIT_PATTERN.split(extraKeys));
- return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
- }
-
- }
-
- private URL getSubscribedOverrideUrl(URL registeredProviderUrl) {
- return registeredProviderUrl.setProtocol(PROVIDER_PROTOCOL)
- .addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false));
- }
-
- /**
- * Get the address of the providerUrl through the url of the invoker
- *
- * @param originInvoker
- * @return
- */
- private URL getProviderUrl(final Invoker<?> originInvoker) {
- String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY);
- if (export == null || export.length() == 0) {
- throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
- }
- return URL.valueOf(export);
- }
-
- /**
- * Get the key cached in bounds by invoker
- *
- * @param originInvoker
- * @return
- */
- private String getCacheKey(final Invoker<?> originInvoker) {
- URL providerUrl = getProviderUrl(originInvoker);
- String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
- return key;
- }
-
- @Override
- @SuppressWarnings("unchecked")
- public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
- url = getRegistryUrl(url);
- Registry registry = registryFactory.getRegistry(url);
- if (RegistryService.class.equals(type)) {
- return proxyFactory.getInvoker((T) registry, type, url);
- }
-
- // group="a,b" or group="*"
- Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
- String group = qs.get(GROUP_KEY);
- if (group != null && group.length() > 0) {
- if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
- return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
- }
- }
-
- Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
- return doRefer(cluster, registry, type, url);
- }
-
- protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
- // FIXME, SPI extension, support prototype instance
- DynamicDirectory<T> directory = createDirectory(type, url);
- directory.setRegistry(registry);
- directory.setProtocol(protocol);
- // all attributes of REFER_KEY
- Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
- URL subscribeUrl = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
- if (directory.isShouldRegister()) {
- directory.setRegisteredConsumerUrl(subscribeUrl);
- registry.register(directory.getRegisteredConsumerUrl());
- }
- directory.buildRouterChain(subscribeUrl);
- directory.subscribe(toSubscribeUrl(subscribeUrl));
-
- Invoker<T> invoker = cluster.join(directory);
- List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
- if (CollectionUtils.isEmpty(listeners)) {
- return invoker;
- }
-
- RegistryInvokerWrapper<T> registryInvokerWrapper = new RegistryInvokerWrapper<>(directory, cluster, invoker);
- for (RegistryProtocolListener listener : listeners) {
- listener.onRefer(this, registryInvokerWrapper);
- }
- return registryInvokerWrapper;
- }
-
- protected <T> DynamicDirectory<T> createDirectory(Class<T> type, URL url) {
- return new RegistryDirectory<T>(type, url);
- }
-
- public <T> void reRefer(Invoker<T> invoker, URL newSubscribeUrl) {
- if (!(invoker instanceof RegistryInvokerWrapper)) {
- return;
- }
-
- RegistryInvokerWrapper<T> invokerWrapper = (RegistryInvokerWrapper<T>) invoker;
- URL oldSubscribeUrl = invokerWrapper.getUrl();
- DynamicDirectory<T> directory = invokerWrapper.getDirectory();
- Registry registry = directory.getRegistry();
- registry.unregister(directory.getRegisteredConsumerUrl());
- directory.unSubscribe(toSubscribeUrl(oldSubscribeUrl));
-
- directory.setRegisteredConsumerUrl(newSubscribeUrl);
- registry.register(directory.getRegisteredConsumerUrl());
- directory.buildRouterChain(newSubscribeUrl);
- directory.subscribe(toSubscribeUrl(newSubscribeUrl));
-
- invokerWrapper.setInvoker(invokerWrapper.getCluster().join(directory));
- }
-
- private static URL toSubscribeUrl(URL url) {
- return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
- }
-
- private List<RegistryProtocolListener> findRegistryProtocolListeners(URL url) {
- return ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
- .getActivateExtension(url, "registry.protocol.listener");
- }
-
- // available to test
- public String[] getParamsToRegistry(String[] defaultKeys, String[] additionalParameterKeys) {
- int additionalLen = additionalParameterKeys.length;
- String[] registryParams = new String[defaultKeys.length + additionalLen];
- System.arraycopy(defaultKeys, 0, registryParams, 0, defaultKeys.length);
- System.arraycopy(additionalParameterKeys, 0, registryParams, defaultKeys.length, additionalLen);
- return registryParams;
- }
-
- @Override
- public void destroy() {
- List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
- .getLoadedExtensionInstances();
- if (CollectionUtils.isNotEmpty(listeners)) {
- for (RegistryProtocolListener listener : listeners) {
- listener.onDestroy();
- }
- }
-
- List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
- for (Exporter<?> exporter : exporters) {
- exporter.unexport();
- }
- bounds.clear();
-
- ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
- .removeListener(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX, providerConfigurationListener);
- }
-
- @Override
- public List<ProtocolServer> getServers() {
- return protocol.getServers();
- }
-
- //Merge the urls of configurators
- private static URL getConfigedInvokerUrl(List<Configurator> configurators, URL url) {
- if (configurators != null && configurators.size() > 0) {
- for (Configurator configurator : configurators) {
- url = configurator.configure(url);
- }
- }
- return url;
- }
-
- public static class InvokerDelegate<T> extends InvokerWrapper<T> {
- private final Invoker<T> invoker;
-
- /**
- * @param invoker
- * @param url invoker.getUrl return this value
- */
- public InvokerDelegate(Invoker<T> invoker, URL url) {
- super(invoker, url);
- this.invoker = invoker;
- }
-
- public Invoker<T> getInvoker() {
- if (invoker instanceof InvokerDelegate) {
- return ((InvokerDelegate<T>) invoker).getInvoker();
- } else {
- return invoker;
- }
- }
- }
-
- private static class DestroyableExporter<T> implements Exporter<T> {
-
- private Exporter<T> exporter;
-
- public DestroyableExporter(Exporter<T> exporter) {
- this.exporter = exporter;
- }
-
- @Override
- public Invoker<T> getInvoker() {
- return exporter.getInvoker();
- }
-
- @Override
- public void unexport() {
- exporter.unexport();
- }
- }
-
- /**
- * Reexport: the exporter destroy problem in protocol
- * 1.Ensure that the exporter returned by registryprotocol can be normal destroyed
- * 2.No need to re-register to the registry after notify
- * 3.The invoker passed by the export method , would better to be the invoker of exporter
- */
- private class OverrideListener implements NotifyListener {
- private final URL subscribeUrl;
- private final Invoker originInvoker;
-
-
- private List<Configurator> configurators;
-
- public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
- this.subscribeUrl = subscribeUrl;
- this.originInvoker = originalInvoker;
- }
-
- /**
- * @param urls The list of registered information, is always not empty, The meaning is the same as the
- * return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.
- */
- @Override
- public synchronized void notify(List<URL> urls) {
- logger.debug("original override urls: " + urls);
-
- List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY,
- CONFIGURATORS_CATEGORY));
- logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
-
- // No matching results
- if (matchedUrls.isEmpty()) {
- return;
- }
-
- this.configurators = Configurator.toConfigurators(classifyUrls(matchedUrls, UrlUtils::isConfigurator))
- .orElse(configurators);
-
- doOverrideIfNecessary();
- }
-
- public synchronized void doOverrideIfNecessary() {
- final Invoker<?> invoker;
- if (originInvoker instanceof InvokerDelegate) {
- invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
- } else {
- invoker = originInvoker;
- }
- //The origin invoker
- URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
- String key = getCacheKey(originInvoker);
- ExporterChangeableWrapper<?> exporter = bounds.get(key);
- if (exporter == null) {
- logger.warn(new IllegalStateException("error state, exporter should not be null"));
- return;
- }
- //The current, may have been merged many times
- URL currentUrl = exporter.getInvoker().getUrl();
- //Merged with this configuration
- URL newUrl = getConfigedInvokerUrl(configurators, currentUrl);
- newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
- newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
- .getConfigurators(), newUrl);
- if (!currentUrl.equals(newUrl)) {
- RegistryProtocol.this.reExport(originInvoker, newUrl);
- logger.info("exported provider url changed, origin url: " + originUrl +
- ", old export url: " + currentUrl + ", new export url: " + newUrl);
- }
- }
-
- private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) {
- List<URL> result = new ArrayList<URL>();
- for (URL url : configuratorUrls) {
- URL overrideUrl = url;
- // Compatible with the old version
- if (url.getParameter(CATEGORY_KEY) == null && OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
- overrideUrl = url.addParameter(CATEGORY_KEY, CONFIGURATORS_CATEGORY);
- }
-
- // Check whether url is to be applied to the current service
- if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) {
- result.add(url);
- }
- }
- return result;
- }
- }
-
- private class ServiceConfigurationListener extends AbstractConfiguratorListener {
- private URL providerUrl;
- private OverrideListener notifyListener;
-
- public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
- this.providerUrl = providerUrl;
- this.notifyListener = notifyListener;
- this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX);
- }
-
- private <T> URL overrideUrl(URL providerUrl) {
- return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
- }
-
- @Override
- protected void notifyOverrides() {
- notifyListener.doOverrideIfNecessary();
- }
- }
-
- private class ProviderConfigurationListener extends AbstractConfiguratorListener {
-
- public ProviderConfigurationListener() {
- this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
- }
-
- /**
- * Get existing configuration rule and override provider url before exporting.
- *
- * @param providerUrl
- * @param <T>
- * @return
- */
- private <T> URL overrideUrl(URL providerUrl) {
- return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
- }
-
- @Override
- protected void notifyOverrides() {
- overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
- }
- }
-
- /**
- * exporter proxy, establish the corresponding relationship between the returned exporter and the exporter
- * exported by the protocol, and can modify the relationship at the time of override.
- *
- * @param <T>
- */
- private class ExporterChangeableWrapper<T> implements Exporter<T> {
-
- private final ExecutorService executor = newSingleThreadExecutor(new NamedThreadFactory("Exporter-Unexport", true));
-
- private final Invoker<T> originInvoker;
- private Exporter<T> exporter;
- private URL subscribeUrl;
- private URL registerUrl;
-
- public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
- this.exporter = exporter;
- this.originInvoker = originInvoker;
- }
-
- public Invoker<T> getOriginInvoker() {
- return originInvoker;
- }
-
- @Override
- public Invoker<T> getInvoker() {
- return exporter.getInvoker();
- }
-
- public void setExporter(Exporter<T> exporter) {
- this.exporter = exporter;
- }
-
- @Override
- public void unexport() {
- String key = getCacheKey(this.originInvoker);
- bounds.remove(key);
-
- Registry registry = RegistryProtocol.this.getRegistry(originInvoker);
- try {
- registry.unregister(registerUrl);
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
- try {
- NotifyListener listener = RegistryProtocol.this.overrideListeners.remove(subscribeUrl);
- registry.unsubscribe(subscribeUrl, listener);
- ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
- .removeListener(subscribeUrl.getServiceKey() + CONFIGURATORS_SUFFIX,
- serviceConfigurationListeners.get(subscribeUrl.getServiceKey()));
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
-
- executor.submit(() -> {
- try {
- int timeout = ConfigurationUtils.getServerShutdownTimeout();
- if (timeout > 0) {
- logger.info("Waiting " + timeout + "ms for registry to notify all consumers before unexport. " +
- "Usually, this is called when you use dubbo API");
- Thread.sleep(timeout);
- }
- exporter.unexport();
- } catch (Throwable t) {
- logger.warn(t.getMessage(), t);
- }
- });
- }
-
- public void setSubscribeUrl(URL subscribeUrl) {
- this.subscribeUrl = subscribeUrl;
- }
-
- public void setRegisterUrl(URL registerUrl) {
- this.registerUrl = registerUrl;
- }
-
- public URL getRegisterUrl() {
- return registerUrl;
- }
- }
-
- // for unit test
- private static RegistryProtocol INSTANCE;
-
- // for unit test
- public RegistryProtocol() {
- INSTANCE = this;
- }
-
- // for unit test
- public static RegistryProtocol getRegistryProtocol() {
- if (INSTANCE == null) {
- ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(REGISTRY_PROTOCOL); // load
- }
- return INSTANCE;
- }
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.client;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.config.ConfigurationUtils;
+import org.apache.dubbo.common.config.configcenter.DynamicConfiguration;
+import org.apache.dubbo.common.extension.ExtensionLoader;
+import org.apache.dubbo.common.logger.Logger;
+import org.apache.dubbo.common.logger.LoggerFactory;
+import org.apache.dubbo.common.timer.HashedWheelTimer;
+import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.common.utils.NamedThreadFactory;
+import org.apache.dubbo.common.utils.StringUtils;
+import org.apache.dubbo.common.utils.UrlUtils;
+import org.apache.dubbo.registry.NotifyListener;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.RegistryFactory;
+import org.apache.dubbo.registry.RegistryService;
+import org.apache.dubbo.registry.integration.AbstractConfiguratorListener;
+import org.apache.dubbo.registry.integration.DynamicDirectory;
+import org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol;
+import org.apache.dubbo.registry.integration.RegistryProtocolListener;
+import org.apache.dubbo.registry.retry.ReExportTask;
+import org.apache.dubbo.registry.support.SkipFailbackWrapperException;
+import org.apache.dubbo.rpc.Exporter;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Protocol;
+import org.apache.dubbo.rpc.ProtocolServer;
+import org.apache.dubbo.rpc.ProxyFactory;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
+import org.apache.dubbo.rpc.cluster.Configurator;
+import org.apache.dubbo.rpc.cluster.governance.GovernanceRuleRepository;
+import org.apache.dubbo.rpc.cluster.support.MergeableCluster;
+import org.apache.dubbo.rpc.model.ApplicationModel;
+import org.apache.dubbo.rpc.model.ProviderModel;
+import org.apache.dubbo.rpc.protocol.InvokerWrapper;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.TimeUnit;
+
+import static java.util.concurrent.Executors.newSingleThreadExecutor;
+import static org.apache.dubbo.common.constants.CommonConstants.APPLICATION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.CLUSTER_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.COMMA_SPLIT_PATTERN;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO_VERSION_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.EXTRA_KEYS_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.HIDE_KEY_PREFIX;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.LOADBALANCE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.METHODS_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PATH_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.RELEASE_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMEOUT_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+import static org.apache.dubbo.common.constants.FilterConstants.VALIDATION_KEY;
+import static org.apache.dubbo.common.constants.QosConstants.ACCEPT_FOREIGN_IP;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_ENABLE;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_HOST;
+import static org.apache.dubbo.common.constants.QosConstants.QOS_PORT;
+import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.CONFIGURATORS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.OVERRIDE_PROTOCOL;
+import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
+import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
+import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_PROTOCOL;
+import static org.apache.dubbo.common.utils.UrlUtils.classifyUrls;
+import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
+import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY_RETRY_PERIOD;
+import static org.apache.dubbo.registry.Constants.PROVIDER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
+import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
+import static org.apache.dubbo.registry.Constants.REGISTRY_RETRY_PERIOD_KEY;
+import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
+import static org.apache.dubbo.remoting.Constants.BIND_IP_KEY;
+import static org.apache.dubbo.remoting.Constants.BIND_PORT_KEY;
+import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
+import static org.apache.dubbo.remoting.Constants.CODEC_KEY;
+import static org.apache.dubbo.remoting.Constants.CONNECTIONS_KEY;
+import static org.apache.dubbo.remoting.Constants.EXCHANGER_KEY;
+import static org.apache.dubbo.remoting.Constants.SERIALIZATION_KEY;
+import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
+import static org.apache.dubbo.rpc.Constants.INTERFACES;
+import static org.apache.dubbo.rpc.Constants.MOCK_KEY;
+import static org.apache.dubbo.rpc.Constants.TOKEN_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.EXPORT_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.REFER_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.WARMUP_KEY;
+import static org.apache.dubbo.rpc.cluster.Constants.WEIGHT_KEY;
+
+/**
+ * TODO, replace RegistryProtocol completely in the future.
+ */
+public class RegistryProtocol implements Protocol {
+ public static final String[] DEFAULT_REGISTER_PROVIDER_KEYS = {
+ APPLICATION_KEY, CODEC_KEY, EXCHANGER_KEY, SERIALIZATION_KEY, CLUSTER_KEY, CONNECTIONS_KEY, DEPRECATED_KEY,
+ GROUP_KEY, LOADBALANCE_KEY, MOCK_KEY, PATH_KEY, TIMEOUT_KEY, TOKEN_KEY, VERSION_KEY, WARMUP_KEY,
+ WEIGHT_KEY, TIMESTAMP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
+ };
+
+ public static final String[] DEFAULT_REGISTER_CONSUMER_KEYS = {
+ APPLICATION_KEY, VERSION_KEY, GROUP_KEY, DUBBO_VERSION_KEY, RELEASE_KEY
+ };
+
+ private final static Logger logger = LoggerFactory.getLogger(InterfaceCompatibleRegistryProtocol.class);
+ private final Map<URL, NotifyListener> overrideListeners = new ConcurrentHashMap<>();
+ private final Map<String, ServiceConfigurationListener> serviceConfigurationListeners = new ConcurrentHashMap<>();
+ private final ProviderConfigurationListener providerConfigurationListener = new ProviderConfigurationListener();
+ //To solve the problem of RMI repeated exposure port conflicts, the services that have been exposed are no longer exposed.
+ //providerurl <--> exporter
+ private final ConcurrentMap<String, ExporterChangeableWrapper<?>> bounds = new ConcurrentHashMap<>();
+ protected Protocol protocol;
+ protected RegistryFactory registryFactory;
+ protected ProxyFactory proxyFactory;
+
+ private ConcurrentMap<URL, ReExportTask> reExportFailedTasks = new ConcurrentHashMap<>();
+ private HashedWheelTimer retryTimer = new HashedWheelTimer(new NamedThreadFactory("DubboReexportTimer", true), DEFAULT_REGISTRY_RETRY_PERIOD, TimeUnit.MILLISECONDS, 128);
+
+ //Filter the parameters that do not need to be output in url(Starting with .)
+ private static String[] getFilteredKeys(URL url) {
+ Map<String, String> params = url.getParameters();
+ if (CollectionUtils.isNotEmptyMap(params)) {
+ return params.keySet().stream()
+ .filter(k -> k.startsWith(HIDE_KEY_PREFIX))
+ .toArray(String[]::new);
+ } else {
+ return new String[0];
+ }
+ }
+
+ public void setProtocol(Protocol protocol) {
+ this.protocol = protocol;
+ }
+
+ public void setRegistryFactory(RegistryFactory registryFactory) {
+ this.registryFactory = registryFactory;
+ }
+
+ public void setProxyFactory(ProxyFactory proxyFactory) {
+ this.proxyFactory = proxyFactory;
+ }
+
+ @Override
+ public int getDefaultPort() {
+ return 9090;
+ }
+
+ public Map<URL, NotifyListener> getOverrideListeners() {
+ return overrideListeners;
+ }
+
+ private void register(URL registryUrl, URL registeredProviderUrl) {
+ Registry registry = registryFactory.getRegistry(registryUrl);
+ registry.register(registeredProviderUrl);
+ }
+
+ private void registerStatedUrl(URL registryUrl, URL registeredProviderUrl, boolean registered) {
+ ProviderModel model = ApplicationModel.getProviderModel(registeredProviderUrl.getServiceKey());
+ model.addStatedUrl(new ProviderModel.RegisterStatedURL(
+ registeredProviderUrl,
+ registryUrl,
+ registered));
+ }
+
+ @Override
+ public <T> Exporter<T> export(final Invoker<T> originInvoker) throws RpcException {
+ URL registryUrl = getRegistryUrl(originInvoker);
+ // url to export locally
+ URL providerUrl = getProviderUrl(originInvoker);
+
+ // Subscribe the override data
+ // FIXME When the provider subscribes, it will affect the scene : a certain JVM exposes the service and call
+ // the same service. Because the subscribed is cached key with the name of the service, it causes the
+ // subscription information to cover.
+ final URL overrideSubscribeUrl = getSubscribedOverrideUrl(providerUrl);
+ final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
+ overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
+
+ providerUrl = overrideUrlWithConfig(providerUrl, overrideSubscribeListener);
+ //export invoker
+ final ExporterChangeableWrapper<T> exporter = doLocalExport(originInvoker, providerUrl);
+
+ // url to registry
+ final Registry registry = getRegistry(originInvoker);
+ final URL registeredProviderUrl = getUrlToRegistry(providerUrl, registryUrl);
+
+ // decide if we need to delay publish
+ boolean register = providerUrl.getParameter(REGISTER_KEY, true);
+ if (register) {
+ register(registryUrl, registeredProviderUrl);
+ }
+
+ // register stated url on provider model
+ registerStatedUrl(registryUrl, registeredProviderUrl, register);
+
+
+ exporter.setRegisterUrl(registeredProviderUrl);
+ exporter.setSubscribeUrl(overrideSubscribeUrl);
+
+ // Deprecated! Subscribe to override rules in 2.6.x or before.
+ registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
+
+ notifyExport(exporter);
+ //Ensure that a new exporter instance is returned every time export
+ return new DestroyableExporter<>(exporter);
+ }
+
+ private <T> void notifyExport(ExporterChangeableWrapper<T> exporter) {
+ List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
+ .getActivateExtension(exporter.getOriginInvoker().getUrl(), "registry.protocol.listener");
+ if (CollectionUtils.isNotEmpty(listeners)) {
+ for (RegistryProtocolListener listener : listeners) {
+ listener.onExport(this, exporter);
+ }
+ }
+ }
+
+ private URL overrideUrlWithConfig(URL providerUrl, OverrideListener listener) {
+ providerUrl = providerConfigurationListener.overrideUrl(providerUrl);
+ ServiceConfigurationListener serviceConfigurationListener = new ServiceConfigurationListener(providerUrl, listener);
+ serviceConfigurationListeners.put(providerUrl.getServiceKey(), serviceConfigurationListener);
+ return serviceConfigurationListener.overrideUrl(providerUrl);
+ }
+
+ @SuppressWarnings("unchecked")
+ private <T> ExporterChangeableWrapper<T> doLocalExport(final Invoker<T> originInvoker, URL providerUrl) {
+ String key = getCacheKey(originInvoker);
+
+ return (ExporterChangeableWrapper<T>) bounds.computeIfAbsent(key, s -> {
+ Invoker<?> invokerDelegate = new InvokerDelegate<>(originInvoker, providerUrl);
+ return new ExporterChangeableWrapper<>((Exporter<T>) protocol.export(invokerDelegate), originInvoker);
+ });
+ }
+
+ public <T> void reExport(Exporter<T> exporter, URL newInvokerUrl) {
+ if (exporter instanceof ExporterChangeableWrapper) {
+ ExporterChangeableWrapper<T> exporterWrapper = (ExporterChangeableWrapper<T>) exporter;
+ Invoker<T> originInvoker = exporterWrapper.getOriginInvoker();
+ reExport(originInvoker, newInvokerUrl);
+ }
+ }
+
+ /**
+ * Reexport the invoker of the modified url
+ *
+ * @param originInvoker
+ * @param newInvokerUrl
+ * @param <T>
+ */
+ @SuppressWarnings("unchecked")
+ public <T> void reExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
+ String key = getCacheKey(originInvoker);
+ ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
+ URL registeredUrl = exporter.getRegisterUrl();
+
+ URL registryUrl = getRegistryUrl(originInvoker);
+ URL newProviderUrl = getUrlToRegistry(newInvokerUrl, registryUrl);
+
+ // update local exporter
+ Invoker<T> invokerDelegate = new InvokerDelegate<T>(originInvoker, newInvokerUrl);
+ exporter.setExporter(protocol.export(invokerDelegate));
+
+ // update registry
+ if (!newProviderUrl.equals(registeredUrl)) {
+ try {
+ doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl);
+ } catch (Exception e) {
+ ReExportTask oldTask = reExportFailedTasks.get(registeredUrl);
+ if (oldTask != null) {
+ return;
+ }
+ ReExportTask task = new ReExportTask(
+ () -> doReExport(originInvoker, exporter, registryUrl, registeredUrl, newProviderUrl),
+ registeredUrl,
+ null
+ );
+ oldTask = reExportFailedTasks.putIfAbsent(registeredUrl, task);
+ if (oldTask == null) {
+ // never has a retry task. then start a new task for retry.
+ retryTimer.newTimeout(task, registryUrl.getParameter(REGISTRY_RETRY_PERIOD_KEY, DEFAULT_REGISTRY_RETRY_PERIOD), TimeUnit.MILLISECONDS);
+ }
+ }
+ }
+ }
+
+ private <T> void doReExport(final Invoker<T> originInvoker, ExporterChangeableWrapper<T> exporter,
+ URL registryUrl, URL oldProviderUrl, URL newProviderUrl) {
+ if (getProviderUrl(originInvoker).getParameter(REGISTER_KEY, true)) {
+ Registry registry = null;
+ try {
+ registry = getRegistry(originInvoker);
+ } catch (Exception e) {
+ throw new SkipFailbackWrapperException(e);
+ }
+
+ logger.info("Try to unregister old url: " + oldProviderUrl);
+ registry.reExportUnregister(oldProviderUrl);
+
+ logger.info("Try to register new url: " + newProviderUrl);
+ registry.reExportRegister(newProviderUrl);
+ }
+ try {
+ ProviderModel.RegisterStatedURL statedUrl = getStatedUrl(registryUrl, newProviderUrl);
+ statedUrl.setProviderUrl(newProviderUrl);
+ exporter.setRegisterUrl(newProviderUrl);
+ } catch (Exception e) {
+ throw new SkipFailbackWrapperException(e);
+ }
+ }
+
+ private ProviderModel.RegisterStatedURL getStatedUrl(URL registryUrl, URL providerUrl) {
+ ProviderModel providerModel = ApplicationModel.getServiceRepository()
+ .lookupExportedService(providerUrl.getServiceKey());
+
+ List<ProviderModel.RegisterStatedURL> statedUrls = providerModel.getStatedUrl();
+ return statedUrls.stream()
+ .filter(u -> u.getRegistryUrl().equals(registryUrl)
+ && u.getProviderUrl().getProtocol().equals(providerUrl.getProtocol()))
+ .findFirst().orElseThrow(() -> new IllegalStateException("There should have at least one registered url."));
+ }
+
+ /**
+ * Get an instance of registry based on the address of invoker
+ *
+ * @param originInvoker
+ * @return
+ */
+ protected Registry getRegistry(final Invoker<?> originInvoker) {
+ URL registryUrl = getRegistryUrl(originInvoker);
+ return registryFactory.getRegistry(registryUrl);
+ }
+
+ protected URL getRegistryUrl(Invoker<?> originInvoker) {
+ return originInvoker.getUrl();
+ }
+
+ protected URL getRegistryUrl(URL url) {
+ if (SERVICE_REGISTRY_PROTOCOL.equals(url.getProtocol())) {
+ return url;
+ }
+ return url.addParameter(REGISTRY_KEY, url.getProtocol()).setProtocol(SERVICE_REGISTRY_PROTOCOL);
+ }
+
+ /**
+ * Return the url that is registered to the registry and filter the url parameter once
+ *
+ * @param providerUrl
+ * @return url to registry.
+ */
+ private URL getUrlToRegistry(final URL providerUrl, final URL registryUrl) {
+ //The address you see at the registry
+ if (!registryUrl.getParameter(SIMPLIFIED_KEY, false)) {
+ return providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameters(
+ MONITOR_KEY, BIND_IP_KEY, BIND_PORT_KEY, QOS_ENABLE, QOS_HOST, QOS_PORT, ACCEPT_FOREIGN_IP, VALIDATION_KEY,
+ INTERFACES);
+ } else {
+ String extraKeys = registryUrl.getParameter(EXTRA_KEYS_KEY, "");
+ // if path is not the same as interface name then we should keep INTERFACE_KEY,
+ // otherwise, the registry structure of zookeeper would be '/dubbo/path/providers',
+ // but what we expect is '/dubbo/interface/providers'
+ if (!providerUrl.getPath().equals(providerUrl.getParameter(INTERFACE_KEY))) {
+ if (StringUtils.isNotEmpty(extraKeys)) {
+ extraKeys += ",";
+ }
+ extraKeys += INTERFACE_KEY;
+ }
+ String[] paramsToRegistry = getParamsToRegistry(DEFAULT_REGISTER_PROVIDER_KEYS
+ , COMMA_SPLIT_PATTERN.split(extraKeys));
+ return URL.valueOf(providerUrl, paramsToRegistry, providerUrl.getParameter(METHODS_KEY, (String[]) null));
+ }
+
+ }
+
+ private URL getSubscribedOverrideUrl(URL registeredProviderUrl) {
+ return registeredProviderUrl.setProtocol(PROVIDER_PROTOCOL)
+ .addParameters(CATEGORY_KEY, CONFIGURATORS_CATEGORY, CHECK_KEY, String.valueOf(false));
+ }
+
+ /**
+ * Get the address of the providerUrl through the url of the invoker
+ *
+ * @param originInvoker
+ * @return
+ */
+ private URL getProviderUrl(final Invoker<?> originInvoker) {
+ String export = originInvoker.getUrl().getParameterAndDecoded(EXPORT_KEY);
+ if (export == null || export.length() == 0) {
+ throw new IllegalArgumentException("The registry export url is null! registry: " + originInvoker.getUrl());
+ }
+ return URL.valueOf(export);
+ }
+
+ /**
+ * Get the key cached in bounds by invoker
+ *
+ * @param originInvoker
+ * @return
+ */
+ private String getCacheKey(final Invoker<?> originInvoker) {
+ URL providerUrl = getProviderUrl(originInvoker);
+ String key = providerUrl.removeParameters("dynamic", "enabled").toFullString();
+ return key;
+ }
+
+ @Override
+ @SuppressWarnings("unchecked")
+ public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException {
+ url = getRegistryUrl(url);
+ Registry registry = registryFactory.getRegistry(url);
+ if (RegistryService.class.equals(type)) {
+ return proxyFactory.getInvoker((T) registry, type, url);
+ }
+
+ // group="a,b" or group="*"
+ Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(REFER_KEY));
+ String group = qs.get(GROUP_KEY);
+ if (group != null && group.length() > 0) {
+ if ((COMMA_SPLIT_PATTERN.split(group)).length > 1 || "*".equals(group)) {
+ return doRefer(Cluster.getCluster(MergeableCluster.NAME), registry, type, url);
+ }
+ }
+
+ Cluster cluster = Cluster.getCluster(qs.get(CLUSTER_KEY));
+ return doRefer(cluster, registry, type, url);
+ }
+
+ protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
+ return interceptInvoker(getInvoker(cluster, registry, type, url), url);
+ }
+
+ protected <T> Invoker<T> interceptInvoker(ClusterInvoker<T> invoker, URL url) {
+ List<RegistryProtocolListener> listeners = findRegistryProtocolListeners(url);
+ if (CollectionUtils.isEmpty(listeners)) {
+ return invoker;
+ }
+
+ for (RegistryProtocolListener listener : listeners) {
+ listener.onRefer(this, invoker);
+ }
+ return invoker;
+ }
+
+ protected <T> ClusterInvoker<T> getInvoker(Cluster cluster, Registry registry, Class<T> type, URL url) {
+ DynamicDirectory<T> directory = createDirectory(type, url);
+ directory.setRegistry(registry);
+ directory.setProtocol(protocol);
+ // all attributes of REFER_KEY
+ Map<String, String> parameters = new HashMap<String, String>(directory.getConsumerUrl().getParameters());
+ URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
+ if (directory.isShouldRegister()) {
+ directory.setRegisteredConsumerUrl(urlToRegistry);
+ registry.register(directory.getRegisteredConsumerUrl());
+ }
+ directory.buildRouterChain(urlToRegistry);
+ directory.subscribe(toSubscribeUrl(urlToRegistry));
+
+ return (ClusterInvoker<T>) cluster.join(directory);
+ }
+
+ protected <T> DynamicDirectory<T> createDirectory(Class<T> type, URL url) {
+ return new ServiceDiscoveryRegistryDirectory<>(type, url);
+ }
+
+ public <T> void reRefer(DynamicDirectory<T> directory, URL newSubscribeUrl) {
+ URL oldSubscribeUrl = directory.getRegisteredConsumerUrl();
+ Registry registry = directory.getRegistry();
+ registry.unregister(directory.getRegisteredConsumerUrl());
+ directory.unSubscribe(toSubscribeUrl(oldSubscribeUrl));
+ registry.register(directory.getRegisteredConsumerUrl());
+
+ directory.setRegisteredConsumerUrl(newSubscribeUrl);
+ directory.buildRouterChain(newSubscribeUrl);
+ directory.subscribe(toSubscribeUrl(newSubscribeUrl));
+ }
+
+ protected static URL toSubscribeUrl(URL url) {
+ return url.addParameter(CATEGORY_KEY, PROVIDERS_CATEGORY + "," + CONFIGURATORS_CATEGORY + "," + ROUTERS_CATEGORY);
+ }
+
+ protected List<RegistryProtocolListener> findRegistryProtocolListeners(URL url) {
+ return ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
+ .getActivateExtension(url, "registry.protocol.listener");
+ }
+
+ // available to test
+ public String[] getParamsToRegistry(String[] defaultKeys, String[] additionalParameterKeys) {
+ int additionalLen = additionalParameterKeys.length;
+ String[] registryParams = new String[defaultKeys.length + additionalLen];
+ System.arraycopy(defaultKeys, 0, registryParams, 0, defaultKeys.length);
+ System.arraycopy(additionalParameterKeys, 0, registryParams, defaultKeys.length, additionalLen);
+ return registryParams;
+ }
+
+ @Override
+ public void destroy() {
+ List<RegistryProtocolListener> listeners = ExtensionLoader.getExtensionLoader(RegistryProtocolListener.class)
+ .getLoadedExtensionInstances();
+ if (CollectionUtils.isNotEmpty(listeners)) {
+ for (RegistryProtocolListener listener : listeners) {
+ listener.onDestroy();
+ }
+ }
+
+ List<Exporter<?>> exporters = new ArrayList<Exporter<?>>(bounds.values());
+ for (Exporter<?> exporter : exporters) {
+ exporter.unexport();
+ }
+ bounds.clear();
+
+ ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
+ .removeListener(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX, providerConfigurationListener);
+ }
+
+ @Override
+ public List<ProtocolServer> getServers() {
+ return protocol.getServers();
+ }
+
+ //Merge the urls of configurators
+ private static URL getConfigedInvokerUrl(List<Configurator> configurators, URL url) {
+ if (configurators != null && configurators.size() > 0) {
+ for (Configurator configurator : configurators) {
+ url = configurator.configure(url);
+ }
+ }
+ return url;
+ }
+
+ public static class InvokerDelegate<T> extends InvokerWrapper<T> {
+ private final Invoker<T> invoker;
+
+ /**
+ * @param invoker
+ * @param url invoker.getUrl return this value
+ */
+ public InvokerDelegate(Invoker<T> invoker, URL url) {
+ super(invoker, url);
+ this.invoker = invoker;
+ }
+
+ public Invoker<T> getInvoker() {
+ if (invoker instanceof InvokerDelegate) {
+ return ((InvokerDelegate<T>) invoker).getInvoker();
+ } else {
+ return invoker;
+ }
+ }
+ }
+
+ private static class DestroyableExporter<T> implements Exporter<T> {
+
+ private Exporter<T> exporter;
+
+ public DestroyableExporter(Exporter<T> exporter) {
+ this.exporter = exporter;
+ }
+
+ @Override
+ public Invoker<T> getInvoker() {
+ return exporter.getInvoker();
+ }
+
+ @Override
+ public void unexport() {
+ exporter.unexport();
+ }
+ }
+
+ /**
+ * Reexport: the exporter destroy problem in protocol
+ * 1.Ensure that the exporter returned by registryprotocol can be normal destroyed
+ * 2.No need to re-register to the registry after notify
+ * 3.The invoker passed by the export method , would better to be the invoker of exporter
+ */
+ private class OverrideListener implements NotifyListener {
+ private final URL subscribeUrl;
+ private final Invoker originInvoker;
+
+
+ private List<Configurator> configurators;
+
+ public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
+ this.subscribeUrl = subscribeUrl;
+ this.originInvoker = originalInvoker;
+ }
+
+ /**
+ * @param urls The list of registered information, is always not empty, The meaning is the same as the
+ * return value of {@link org.apache.dubbo.registry.RegistryService#lookup(URL)}.
+ */
+ @Override
+ public synchronized void notify(List<URL> urls) {
+ logger.debug("original override urls: " + urls);
+
+ List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl.addParameter(CATEGORY_KEY,
+ CONFIGURATORS_CATEGORY));
+ logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
+
+ // No matching results
+ if (matchedUrls.isEmpty()) {
+ return;
+ }
+
+ this.configurators = Configurator.toConfigurators(classifyUrls(matchedUrls, UrlUtils::isConfigurator))
+ .orElse(configurators);
+
+ doOverrideIfNecessary();
+ }
+
+ public synchronized void doOverrideIfNecessary() {
+ final Invoker<?> invoker;
+ if (originInvoker instanceof InvokerDelegate) {
+ invoker = ((InvokerDelegate<?>) originInvoker).getInvoker();
+ } else {
+ invoker = originInvoker;
+ }
+ //The origin invoker
+ URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);
+ String key = getCacheKey(originInvoker);
+ ExporterChangeableWrapper<?> exporter = bounds.get(key);
+ if (exporter == null) {
+ logger.warn(new IllegalStateException("error state, exporter should not be null"));
+ return;
+ }
+ //The current, may have been merged many times
+ URL currentUrl = exporter.getInvoker().getUrl();
+ //Merged with this configuration
+ URL newUrl = getConfigedInvokerUrl(configurators, currentUrl);
+ newUrl = getConfigedInvokerUrl(providerConfigurationListener.getConfigurators(), newUrl);
+ newUrl = getConfigedInvokerUrl(serviceConfigurationListeners.get(originUrl.getServiceKey())
+ .getConfigurators(), newUrl);
+ if (!currentUrl.equals(newUrl)) {
+ RegistryProtocol.this.reExport(originInvoker, newUrl);
+ logger.info("exported provider url changed, origin url: " + originUrl +
+ ", old export url: " + currentUrl + ", new export url: " + newUrl);
+ }
+ }
+
+ private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) {
+ List<URL> result = new ArrayList<URL>();
+ for (URL url : configuratorUrls) {
+ URL overrideUrl = url;
+ // Compatible with the old version
+ if (url.getParameter(CATEGORY_KEY) == null && OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
+ overrideUrl = url.addParameter(CATEGORY_KEY, CONFIGURATORS_CATEGORY);
+ }
+
+ // Check whether url is to be applied to the current service
+ if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) {
+ result.add(url);
+ }
+ }
+ return result;
+ }
+ }
+
+ private class ServiceConfigurationListener extends AbstractConfiguratorListener {
+ private URL providerUrl;
+ private OverrideListener notifyListener;
+
+ public ServiceConfigurationListener(URL providerUrl, OverrideListener notifyListener) {
+ this.providerUrl = providerUrl;
+ this.notifyListener = notifyListener;
+ this.initWith(DynamicConfiguration.getRuleKey(providerUrl) + CONFIGURATORS_SUFFIX);
+ }
+
+ private <T> URL overrideUrl(URL providerUrl) {
+ return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
+ }
+
+ @Override
+ protected void notifyOverrides() {
+ notifyListener.doOverrideIfNecessary();
+ }
+ }
+
+ private class ProviderConfigurationListener extends AbstractConfiguratorListener {
+
+ public ProviderConfigurationListener() {
+ this.initWith(ApplicationModel.getApplication() + CONFIGURATORS_SUFFIX);
+ }
+
+ /**
+ * Get existing configuration rule and override provider url before exporting.
+ *
+ * @param providerUrl
+ * @param <T>
+ * @return
+ */
+ private <T> URL overrideUrl(URL providerUrl) {
+ return RegistryProtocol.getConfigedInvokerUrl(configurators, providerUrl);
+ }
+
+ @Override
+ protected void notifyOverrides() {
+ overrideListeners.values().forEach(listener -> ((OverrideListener) listener).doOverrideIfNecessary());
+ }
+ }
+
+ /**
+ * exporter proxy, establish the corresponding relationship between the returned exporter and the exporter
+ * exported by the protocol, and can modify the relationship at the time of override.
+ *
+ * @param <T>
+ */
+ private class ExporterChangeableWrapper<T> implements Exporter<T> {
+
+ private final ExecutorService executor = newSingleThreadExecutor(new NamedThreadFactory("Exporter-Unexport", true));
+
+ private final Invoker<T> originInvoker;
+ private Exporter<T> exporter;
+ private URL subscribeUrl;
+ private URL registerUrl;
+
+ public ExporterChangeableWrapper(Exporter<T> exporter, Invoker<T> originInvoker) {
+ this.exporter = exporter;
+ this.originInvoker = originInvoker;
+ }
+
+ public Invoker<T> getOriginInvoker() {
+ return originInvoker;
+ }
+
+ @Override
+ public Invoker<T> getInvoker() {
+ return exporter.getInvoker();
+ }
+
+ public void setExporter(Exporter<T> exporter) {
+ this.exporter = exporter;
+ }
+
+ @Override
+ public void unexport() {
+ String key = getCacheKey(this.originInvoker);
+ bounds.remove(key);
+
+ Registry registry = RegistryProtocol.this.getRegistry(originInvoker);
+ try {
+ registry.unregister(registerUrl);
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ try {
+ NotifyListener listener = RegistryProtocol.this.overrideListeners.remove(subscribeUrl);
+ registry.unsubscribe(subscribeUrl, listener);
+ ExtensionLoader.getExtensionLoader(GovernanceRuleRepository.class).getDefaultExtension()
+ .removeListener(subscribeUrl.getServiceKey() + CONFIGURATORS_SUFFIX,
+ serviceConfigurationListeners.get(subscribeUrl.getServiceKey()));
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+
+ executor.submit(() -> {
+ try {
+ int timeout = ConfigurationUtils.getServerShutdownTimeout();
+ if (timeout > 0) {
+ logger.info("Waiting " + timeout + "ms for registry to notify all consumers before unexport. " +
+ "Usually, this is called when you use dubbo API");
+ Thread.sleep(timeout);
+ }
+ exporter.unexport();
+ } catch (Throwable t) {
+ logger.warn(t.getMessage(), t);
+ }
+ });
+ }
+
+ public void setSubscribeUrl(URL subscribeUrl) {
+ this.subscribeUrl = subscribeUrl;
+ }
+
+ public void setRegisterUrl(URL registerUrl) {
+ this.registerUrl = registerUrl;
+ }
+
+ public URL getRegisterUrl() {
+ return registerUrl;
+ }
+ }
+
+ // for unit test
+ private static RegistryProtocol INSTANCE;
+
+ // for unit test
+ public RegistryProtocol() {
+ INSTANCE = this;
+ }
+
+ // for unit test
+ public static RegistryProtocol getRegistryProtocol() {
+ if (INSTANCE == null) {
+ ExtensionLoader.getExtensionLoader(Protocol.class).getExtension(REGISTRY_PROTOCOL); // load
+ }
+ return INSTANCE;
+ }
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index efd3c57..1315c54 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -392,7 +392,7 @@ public class ServiceDiscoveryRegistry implements Registry {
* @return
*/
protected Set<String> findMappedServices(URL subscribedURL, MappingListener listener) {
- return serviceNameMapping.get(subscribedURL, listener);
+ return serviceNameMapping.getAndListen(subscribedURL, listener);
}
/**
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java
deleted file mode 100644
index f4f872f..0000000
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocol.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry.client;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.registry.integration.DynamicDirectory;
-import org.apache.dubbo.registry.integration.RegistryProtocol;
-import org.apache.dubbo.rpc.Invoker;
-
-import static org.apache.dubbo.common.constants.RegistryConstants.SERVICE_REGISTRY_PROTOCOL;
-
-/**
- * TODO, replace RegistryProtocol completely in the future.
- */
-public class ServiceDiscoveryRegistryProtocol extends RegistryProtocol {
- @Override
- protected URL getRegistryUrl(Invoker<?> originInvoker) {
- URL registryUrl = originInvoker.getUrl();
- if (SERVICE_REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
- return registryUrl;
- }
- return super.getRegistryUrl(originInvoker);
- }
-
- @Override
- protected URL getRegistryUrl(URL url) {
- if (SERVICE_REGISTRY_PROTOCOL.equals(url.getProtocol())) {
- return url;
- }
- return super.getRegistryUrl(url);
- }
-
- @Override
- protected <T> DynamicDirectory<T> createDirectory(Class<T> type, URL url) {
- return new ServiceDiscoveryRegistryDirectory<>(type, url);
- }
-}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocolListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocolListener.java
index d213171..bc9748c 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocolListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryProtocolListener.java
@@ -16,7 +16,6 @@
*/
package org.apache.dubbo.registry.client;
-import org.apache.dubbo.registry.integration.RegistryProtocol;
import org.apache.dubbo.registry.integration.RegistryProtocolListener;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
index cb43cee..b915cb3 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/event/listener/ServiceInstancesChangedListener.java
@@ -194,10 +194,6 @@ public class ServiceInstancesChangedListener implements ConditionalEventListener
this.listeners.put(serviceKey, listener);
}
- public void removeListener(String serviceKey) {
- this.listeners.remove(serviceKey);
- }
-
public List<URL> getUrls(String serviceKey) {
return toUrlsWithEmpty(serviceUrls.get(serviceKey));
}
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataServiceNameMapping.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
similarity index 57%
rename from dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataServiceNameMapping.java
rename to dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
index 5389e63..b6d7a9c 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataServiceNameMapping.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataServiceNameMapping.java
@@ -14,19 +14,23 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-package org.apache.dubbo.metadata;
+package org.apache.dubbo.registry.client.metadata;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.utils.CollectionUtils;
+import org.apache.dubbo.metadata.MappingListener;
+import org.apache.dubbo.metadata.MetadataService;
+import org.apache.dubbo.metadata.ServiceNameMapping;
import org.apache.dubbo.metadata.report.MetadataReport;
import org.apache.dubbo.metadata.report.MetadataReportInstance;
+import org.apache.dubbo.registry.client.RegistryClusterIdentifier;
import java.util.LinkedHashSet;
import java.util.List;
-import java.util.Map;
import java.util.Set;
import static java.util.Arrays.asList;
+import static org.apache.dubbo.common.constants.CommonConstants.DEFAULT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
import static org.apache.dubbo.rpc.model.ApplicationModel.getName;
@@ -44,33 +48,42 @@ public class MetadataServiceNameMapping implements ServiceNameMapping {
if (IGNORED_SERVICE_INTERFACES.contains(serviceInterface)) {
return;
}
-
- Map<String, MetadataReport> metadataReports = MetadataReportInstance.getMetadataReports(true);
- metadataReports.forEach((key, reporter) -> {
- reporter.registerServiceAppMapping(ServiceNameMapping.buildGroup(serviceInterface, group, version, protocol), getName(), url);
- });
+ String registryCluster = getRegistryCluster(url);
+ MetadataReport metadataReport = MetadataReportInstance.getMetadataReport(registryCluster);
+ metadataReport.registerServiceAppMapping(ServiceNameMapping.buildGroup(serviceInterface, group, version, protocol), getName(), url);
}
@Override
- public Set<String> get(URL url, MappingListener mappingListener) {
+ public Set<String> getAndListen(URL url, MappingListener mappingListener) {
String serviceInterface = url.getServiceInterface();
String group = url.getParameter(GROUP_KEY);
String version = url.getParameter(VERSION_KEY);
String protocol = url.getProtocol();
- Map<String, MetadataReport> metadataReports = MetadataReportInstance.getMetadataReports(true);
+ String mappingKey = ServiceNameMapping.buildGroup(serviceInterface, group, version, protocol);
Set<String> serviceNames = new LinkedHashSet<>();
- for (Map.Entry<String, MetadataReport> entry : metadataReports.entrySet()) {
- MetadataReport reporter = entry.getValue();
- Set<String> apps = reporter.getServiceAppMapping(
- ServiceNameMapping.buildGroup(serviceInterface, group, version, protocol),
- mappingListener,
- url);
- if (CollectionUtils.isNotEmpty(apps)) {
- serviceNames.addAll(apps);
- break;
- }
+ String registryCluster = getRegistryCluster(url);
+ MetadataReport metadataReport = MetadataReportInstance.getMetadataReport(registryCluster);
+ Set<String> apps = metadataReport.getServiceAppMapping(
+ mappingKey,
+ mappingListener,
+ url);
+ if (CollectionUtils.isNotEmpty(apps)) {
+ serviceNames.addAll(apps);
}
+
return serviceNames;
}
+
+ protected String getRegistryCluster(URL url) {
+ String registryCluster = RegistryClusterIdentifier.getExtension(url).providerKey(url);
+ if (registryCluster == null) {
+ registryCluster = DEFAULT_KEY;
+ }
+ int i = registryCluster.indexOf(",");
+ if (i > 0) {
+ registryCluster = registryCluster.substring(0, i);
+ }
+ return registryCluster;
+ }
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
index b65dba1..0a6a106 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/MetadataUtils.java
@@ -65,7 +65,9 @@ public class MetadataUtils {
// store in local
WritableMetadataService.getDefaultExtension().publishServiceDefinition(url);
// send to remote
+// if (REMOTE_METADATA_STORAGE_TYPE.equals(url.getParameter(METADATA_KEY))) {
getRemoteMetadataService().publishServiceDefinition(url);
+// }
}
public static MetadataService getMetadataServiceProxy(ServiceInstance instance, ServiceDiscovery serviceDiscovery) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
index 0881e0c..95e5349 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
@@ -42,7 +42,7 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.PORT_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.TIMESTAMP_KEY;
import static org.apache.dubbo.common.utils.StringUtils.isBlank;
-import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_PROVIDER_KEYS;
+import static org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.DEFAULT_REGISTER_PROVIDER_KEYS;
import static org.apache.dubbo.rpc.Constants.DEPRECATED_KEY;
import static org.apache.dubbo.rpc.Constants.ID_KEY;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
index c4235f4..93c02fe 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/store/RemoteMetadataServiceImpl.java
@@ -55,7 +55,7 @@ public class RemoteMetadataServiceImpl {
}
public Map<String, MetadataReport> getMetadataReports() {
- return MetadataReportInstance.getMetadataReports(true);
+ return MetadataReportInstance.getMetadataReports(false);
}
public void publishMetadata(String serviceName) {
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 80aadc8..a53378c 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -53,7 +53,7 @@ import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATE
import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
-import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
+import static org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/InterfaceCompatibleRegistryProtocol.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/InterfaceCompatibleRegistryProtocol.java
new file mode 100644
index 0000000..d591e02
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/InterfaceCompatibleRegistryProtocol.java
@@ -0,0 +1,177 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.dubbo.registry.integration;
+
+import org.apache.dubbo.common.URL;
+import org.apache.dubbo.common.URLBuilder;
+import org.apache.dubbo.registry.Registry;
+import org.apache.dubbo.registry.client.RegistryProtocol;
+import org.apache.dubbo.rpc.Invocation;
+import org.apache.dubbo.rpc.Invoker;
+import org.apache.dubbo.rpc.Result;
+import org.apache.dubbo.rpc.RpcException;
+import org.apache.dubbo.rpc.cluster.Cluster;
+import org.apache.dubbo.rpc.cluster.ClusterInvoker;
+import org.apache.dubbo.rpc.cluster.Directory;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.apache.dubbo.common.constants.RegistryConstants.ENABLE_REGISTRY_DIRECTORY_AUTO_MIGRATION;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_KEY;
+import static org.apache.dubbo.common.constants.RegistryConstants.REGISTRY_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.CONSUMER_PROTOCOL;
+import static org.apache.dubbo.registry.Constants.DEFAULT_REGISTRY;
+import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
+
+/**
+ * RegistryProtocol
+ */
+public class InterfaceCompatibleRegistryProtocol extends RegistryProtocol {
+
+ @Override
+ protected URL getRegistryUrl(Invoker<?> originInvoker) {
+ URL registryUrl = originInvoker.getUrl();
+ if (REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
+ String protocol = registryUrl.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY);
+ registryUrl = registryUrl.setProtocol(protocol).removeParameter(REGISTRY_KEY);
+ }
+ return registryUrl;
+ }
+
+ @Override
+ protected URL getRegistryUrl(URL url) {
+ return URLBuilder.from(url)
+ .setProtocol(url.getParameter(REGISTRY_KEY, DEFAULT_REGISTRY))
+ .removeParameter(REGISTRY_KEY)
+ .build();
+ }
+
+ @Override
+ protected <T> DynamicDirectory<T> createDirectory(Class<T> type, URL url) {
+ return new RegistryDirectory<>(type, url);
+ }
+
+ protected <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) {
+ ClusterInvoker<T> invoker = getInvoker(cluster, registry, type, url);
+ ClusterInvoker<T> serviceDiscoveryInvoker = getServiceDiscoveryInvoker(cluster, type, url);
+ ClusterInvoker<T> migrationInvoker = new MigrationInvoker<>(invoker, serviceDiscoveryInvoker);
+
+ return interceptInvoker(migrationInvoker, url);
+ }
+
+ protected <T> ClusterInvoker<T> getServiceDiscoveryInvoker(Cluster cluster, Class<T> type, URL url) {
+ Registry registry = registryFactory.getRegistry(super.getRegistryUrl(url));
+ ClusterInvoker<T> serviceDiscoveryInvoker = null;
+ // enable auto migration from interface address pool to instance address pool
+ boolean autoMigration = url.getParameter(ENABLE_REGISTRY_DIRECTORY_AUTO_MIGRATION, false);
+ if (autoMigration) {
+ DynamicDirectory<T> serviceDiscoveryDirectory = super.createDirectory(type, url);
+ serviceDiscoveryDirectory.setRegistry(registry);
+ serviceDiscoveryDirectory.setProtocol(protocol);
+ Map<String, String> parameters = new HashMap<String, String>(serviceDiscoveryDirectory.getConsumerUrl().getParameters());
+ URL urlToRegistry = new URL(CONSUMER_PROTOCOL, parameters.remove(REGISTER_IP_KEY), 0, type.getName(), parameters);
+ if (serviceDiscoveryDirectory.isShouldRegister()) {
+ serviceDiscoveryDirectory.setRegisteredConsumerUrl(urlToRegistry);
+ registry.register(serviceDiscoveryDirectory.getRegisteredConsumerUrl());
+ }
+ serviceDiscoveryDirectory.buildRouterChain(urlToRegistry);
+ serviceDiscoveryDirectory.subscribe(toSubscribeUrl(urlToRegistry));
+ serviceDiscoveryInvoker = (ClusterInvoker<T>) cluster.join(serviceDiscoveryDirectory);
+ }
+ return serviceDiscoveryInvoker;
+ }
+
+ private static class MigrationInvoker<T> implements ClusterInvoker<T> {
+ private ClusterInvoker<T> invoker;
+ private ClusterInvoker<T> serviceDiscoveryInvoker;
+
+ public MigrationInvoker(ClusterInvoker<T> invoker, ClusterInvoker<T> serviceDiscoveryInvoker) {
+ this.invoker = invoker;
+ this.serviceDiscoveryInvoker = serviceDiscoveryInvoker;
+ }
+
+ public ClusterInvoker<T> getInvoker() {
+ return invoker;
+ }
+
+ public void setInvoker(ClusterInvoker<T> invoker) {
+ this.invoker = invoker;
+ }
+
+ public ClusterInvoker<T> getServiceDiscoveryInvoker() {
+ return serviceDiscoveryInvoker;
+ }
+
+ public void setServiceDiscoveryInvoker(ClusterInvoker<T> serviceDiscoveryInvoker) {
+ this.serviceDiscoveryInvoker = serviceDiscoveryInvoker;
+ }
+
+ @Override
+ public Class<T> getInterface() {
+ return invoker.getInterface();
+ }
+
+ @Override
+ public Result invoke(Invocation invocation) throws RpcException {
+ if (serviceDiscoveryInvoker == null) {
+ return invoker.invoke(invocation);
+ }
+
+ if (invoker.isDestroyed()) {
+ return serviceDiscoveryInvoker.invoke(invocation);
+ }
+ if (serviceDiscoveryInvoker.isAvailable()) {
+ invoker.destroy(); // can be destroyed asynchronously
+ return serviceDiscoveryInvoker.invoke(invocation);
+ }
+ return invoker.invoke(invocation);
+ }
+
+ @Override
+ public URL getUrl() {
+ return invoker.getUrl();
+ }
+
+ @Override
+ public boolean isAvailable() {
+ return invoker.isAvailable() || serviceDiscoveryInvoker.isAvailable();
+ }
+
+ @Override
+ public void destroy() {
+ invoker.destroy();
+ serviceDiscoveryInvoker.destroy();
+ }
+
+ @Override
+ public URL getRegistryUrl() {
+ return invoker.getRegistryUrl();
+ }
+
+ @Override
+ public Directory<T> getDirectory() {
+ return invoker.getDirectory();
+ }
+
+ @Override
+ public boolean isDestroyed() {
+ return invoker.isDestroyed() && serviceDiscoveryInvoker.isDestroyed();
+ }
+ }
+
+}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index f0d7938..e94f79b 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -73,7 +73,7 @@ import static org.apache.dubbo.common.constants.RegistryConstants.PROVIDERS_CATE
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTERS_CATEGORY;
import static org.apache.dubbo.common.constants.RegistryConstants.ROUTE_PROTOCOL;
import static org.apache.dubbo.registry.Constants.CONFIGURATORS_SUFFIX;
-import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
+import static org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
import static org.apache.dubbo.remoting.Constants.CHECK_KEY;
import static org.apache.dubbo.rpc.cluster.Constants.ROUTER_KEY;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryInvokerWrapper.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryInvokerWrapper.java
deleted file mode 100644
index c6ce46f..0000000
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryInvokerWrapper.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.dubbo.registry.integration;
-
-import org.apache.dubbo.common.URL;
-import org.apache.dubbo.rpc.Invocation;
-import org.apache.dubbo.rpc.Invoker;
-import org.apache.dubbo.rpc.Result;
-import org.apache.dubbo.rpc.RpcException;
-import org.apache.dubbo.rpc.cluster.Cluster;
-
-class RegistryInvokerWrapper<T> implements Invoker<T> {
- private DynamicDirectory<T> directory;
- private Cluster cluster;
- private Invoker<T> invoker;
- private URL url;
-
- public RegistryInvokerWrapper(DynamicDirectory<T> directory, Cluster cluster, Invoker<T> invoker, URL url) {
- this.directory = directory;
- this.cluster = cluster;
- this.invoker = invoker;
- this.url = url;
- }
-
- @Override
- public Class<T> getInterface() {
- return invoker.getInterface();
- }
-
- @Override
- public Result invoke(Invocation invocation) throws RpcException {
- return invoker.invoke(invocation);
- }
-
- @Override
- public URL getUrl() {
- return url;
- }
-
- public void setUrl(URL url) {
- this.url = url;
- }
-
- public void setInvoker(Invoker<T> invoker) {
- this.invoker = invoker;
- }
-
- public DynamicDirectory<T> getDirectory() {
- return directory;
- }
-
- public Cluster getCluster() {
- return cluster;
- }
-
- @Override
- public boolean isAvailable() {
- return invoker.isAvailable();
- }
-
- @Override
- public void destroy() {
- invoker.destroy();
- }
-}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
index 2a4f3c3..5bf47ca 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryProtocolListener.java
@@ -18,6 +18,7 @@ package org.apache.dubbo.registry.integration;
import org.apache.dubbo.common.URL;
import org.apache.dubbo.common.extension.SPI;
+import org.apache.dubbo.registry.client.RegistryProtocol;
import org.apache.dubbo.rpc.Exporter;
import org.apache.dubbo.rpc.Invoker;
diff --git a/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.ServiceNameMapping b/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.ServiceNameMapping
new file mode 100644
index 0000000..562d8fa
--- /dev/null
+++ b/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.metadata.ServiceNameMapping
@@ -0,0 +1 @@
+metadata=org.apache.dubbo.registry.client.metadata.MetadataServiceNameMapping
\ No newline at end of file
diff --git a/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol b/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
index c8903b0..5dda00e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
+++ b/dubbo-registry/dubbo-registry-api/src/main/resources/META-INF/dubbo/internal/org.apache.dubbo.rpc.Protocol
@@ -1,2 +1,2 @@
-registry=org.apache.dubbo.registry.integration.RegistryProtocol
-service-discovery-registry=org.apache.dubbo.registry.client.ServiceDiscoveryRegistryProtocol
\ No newline at end of file
+registry=org.apache.dubbo.registry.integration.InterfaceCompatibleRegistryProtocol
+service-discovery-registry=org.apache.dubbo.registry.client.RegistryProtocol
\ No newline at end of file