You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/07/21 05:57:24 UTC
[dubbo] 03/04: can basically work with InstanceAddressURL
This is an automated email from the ASF dual-hosted git repository.
liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git
commit 42f0529054187227dedd153b14d2b7b4605b8ff4
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Jul 14 01:51:51 2020 +0800
can basically work with InstanceAddressURL
---
.../src/main/java/org/apache/dubbo/common/URL.java | 12 +-
.../org/apache/dubbo/metadata/MetadataInfo.java | 118 ++++++++++++++++++--
.../registry/client/DefaultServiceInstance.java | 27 +++--
.../dubbo/registry/client/InstanceAddressURL.java | 123 +++++++++++++++++----
.../registry/client/ServiceDiscoveryRegistry.java | 4 +-
.../client/ServiceDiscoveryRegistryDirectory.java | 69 +++++++-----
.../metadata/ServiceInstanceMetadataUtils.java | 18 +--
.../registry/integration/DynamicDirectory.java | 9 +-
.../registry/integration/RegistryDirectory.java | 2 +-
.../apache/dubbo/remoting/exchange/Exchangers.java | 2 +-
.../dubbo/remoting/transport/AbstractEndpoint.java | 2 +-
.../dubbo/rpc/proxy/InvokerInvocationHandler.java | 1 +
.../dubbo/rpc/protocol/dubbo/DubboProtocol.java | 5 +-
.../dubbo/rpc/protocol/thrift/ThriftProtocol.java | 2 +-
14 files changed, 292 insertions(+), 102 deletions(-)
diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index 5f811fa..9742ebb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -93,19 +93,19 @@ class URL implements Serializable {
private static final long serialVersionUID = -1985165475234910535L;
- private final String protocol;
+ protected String protocol;
- private final String username;
+ protected String username;
- private final String password;
+ protected String password;
// by default, host to registry
- private final String host;
+ protected String host;
// by default, port to registry
- private final int port;
+ protected int port;
- private final String path;
+ protected String path;
private final Map<String, String> parameters;
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 4db0298..931a331 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -29,6 +29,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.SortedSet;
import java.util.TreeSet;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,7 +155,13 @@ public class MetadataInfo implements Serializable {
if (serviceInfo == null) {
return Collections.emptyMap();
}
- return serviceInfo.getParams();
+ return serviceInfo.getAllParams();
+ }
+
+ @Override
+ public String toString() {
+ // FIXME
+ return super.toString();
}
public static class ServiceInfo implements Serializable {
@@ -163,9 +170,12 @@ public class MetadataInfo implements Serializable {
private String group;
private String version;
private String protocol;
+ private String path; // most of the time, path is the same with the interface name.
private Map<String, String> params;
+ private transient Map<String, String> consumerParams;
private transient Map<String, Map<String, String>> methodParams;
+ private transient Map<String, Map<String, String>> consumerMethodParams;
private transient String serviceKey;
private transient String matchKey;
@@ -173,14 +183,7 @@ public class MetadataInfo implements Serializable {
}
public ServiceInfo(URL url) {
- // FIXME, how to match registry.
- this(
- url.getServiceInterface(),
- url.getParameter(GROUP_KEY),
- url.getParameter(VERSION_KEY),
- url.getProtocol(),
- null
- );
+ this(url.getServiceInterface(), url.getParameter(GROUP_KEY), url.getParameter(VERSION_KEY), url.getProtocol(), url.getPath(), null);
Map<String, String> params = new HashMap<>();
List<MetadataParamsFilter> filters = loader.getActivateExtension(url, "params-filter");
@@ -207,11 +210,12 @@ public class MetadataInfo implements Serializable {
this.params = params;
}
- public ServiceInfo(String name, String group, String version, String protocol, Map<String, String> params) {
+ public ServiceInfo(String name, String group, String version, String protocol, String path, Map<String, String> params) {
this.name = name;
this.group = group;
this.version = version;
this.protocol = protocol;
+ this.path = path;
this.params = params == null ? new HashMap<>() : params;
this.serviceKey = URL.buildKey(name, group, version);
@@ -266,6 +270,14 @@ public class MetadataInfo implements Serializable {
this.version = version;
}
+ public String getPath() {
+ return path;
+ }
+
+ public void setPath(String path) {
+ this.path = path;
+ }
+
public Map<String, String> getParams() {
if (params == null) {
return Collections.emptyMap();
@@ -277,16 +289,42 @@ public class MetadataInfo implements Serializable {
this.params = params;
}
+ public Map<String, String> getAllParams() {
+ if (consumerParams != null) {
+ Map<String, String> allParams = new HashMap<>((int) ((params.size() + consumerParams.size()) / 0.75f + 1));
+ allParams.putAll(params);
+ allParams.putAll(consumerParams);
+ return allParams;
+ }
+ return params;
+ }
+
public String getParameter(String key) {
+ if (consumerParams != null) {
+ String value = consumerParams.get(key);
+ if (value != null) {
+ return value;
+ }
+ }
return params.get(key);
}
public String getMethodParameter(String method, String key, String defaultValue) {
if (methodParams == null) {
methodParams = URL.toMethodParameters(params);
+ consumerMethodParams = URL.toMethodParameters(consumerParams);
+ }
+
+ String value = getMethodParameter(method, key, consumerMethodParams);
+ if (value != null) {
+ return value;
}
+ value = getMethodParameter(method, key, methodParams);
+ return value == null ? defaultValue : value;
+ }
- Map<String, String> keyMap = methodParams.get(method);
+ private String getMethodParameter(String method, String key, Map<String, Map<String, String>> map) {
+ Map<String, String> keyMap = map.get(method);
String value = null;
if (keyMap != null) {
value = keyMap.get(key);
@@ -294,7 +332,21 @@ public class MetadataInfo implements Serializable {
if (StringUtils.isEmpty(value)) {
value = getParameter(key);
}
- return value == null ? defaultValue : value;
+ return value;
+ }
+
+ public boolean hasMethodParameter(String method, String key) {
+ String value = this.getMethodParameter(method, key, (String) null);
+ return StringUtils.isNotEmpty(value);
+ }
+
+ public boolean hasMethodParameter(String method) {
+ if (methodParams == null) {
+ methodParams = URL.toMethodParameters(params);
+ consumerMethodParams = URL.toMethodParameters(consumerParams);
+ }
+
+ return consumerMethodParams.containsKey(method) || methodParams.containsKey(method);
}
public String toDescString() {
@@ -310,5 +362,47 @@ public class MetadataInfo implements Serializable {
}
return methodStrings.toString();
}
+
+ public void addParameter(String key, String value) {
+ if (consumerParams != null) {
+ this.consumerParams.put(key, value);
+ }
+ }
+
+ public void addParameterIfAbsent(String key, String value) {
+ if (consumerParams != null) {
+ this.consumerParams.putIfAbsent(key, value);
+ }
+ }
+
+ public void addConsumerParams(Map<String, String> params) {
+ // copy once for one service subscription
+ if (consumerParams == null) {
+ consumerParams = new HashMap<>(params);
+ }
+ }
+
+ @Override
+ public boolean equals(Object obj) {
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof ServiceInfo)) {
+ return false;
+ }
+
+ ServiceInfo serviceInfo = (ServiceInfo) obj;
+ return this.getMatchKey().equals(serviceInfo.getMatchKey()) && this.getParams().equals(serviceInfo.getParams());
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(getMatchKey(), getParams());
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
+ }
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index e44bd4f..3ee5dd2 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -22,6 +22,8 @@ import java.util.HashMap;
import java.util.Map;
import java.util.Objects;
+import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
+
/**
* The default implementation of {@link ServiceInstance}.
*
@@ -166,18 +168,29 @@ public class DefaultServiceInstance implements ServiceInstance {
if (this == o) return true;
if (!(o instanceof DefaultServiceInstance)) return false;
DefaultServiceInstance that = (DefaultServiceInstance) o;
- return isEnabled() == that.isEnabled() &&
- isHealthy() == that.isHealthy() &&
- Objects.equals(getId(), that.getId()) &&
- Objects.equals(getServiceName(), that.getServiceName()) &&
+ boolean equals = Objects.equals(getServiceName(), that.getServiceName()) &&
Objects.equals(getHost(), that.getHost()) &&
- Objects.equals(getPort(), that.getPort()) &&
- Objects.equals(getMetadata(), that.getMetadata());
+ Objects.equals(getPort(), that.getPort());
+ for (Map.Entry<String, String> entry : this.getMetadata().entrySet()) {
+ if (entry.getKey().equals(REVISION_KEY)) {
+ continue;
+ }
+ equals = equals && !entry.getValue().equals(that.getMetadata().get(entry.getKey()));
+ }
+
+ return equals;
}
@Override
public int hashCode() {
- return Objects.hash(getId(), getServiceName(), getHost(), getPort(), isEnabled(), isHealthy(), getMetadata());
+ int result = Objects.hash(getServiceName(), getHost(), getPort());
+ for (Map.Entry<String, String> entry : this.getMetadata().entrySet()) {
+ if (entry.getKey().equals(REVISION_KEY)) {
+ continue;
+ }
+ result = 31 * result + (entry.getValue() == null ? 0 : entry.getValue().hashCode());
+ }
+ return result;
}
@Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
index 18632bf..149c6e2 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -28,6 +28,9 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
+/**
+ * FIXME, replace RpcContext operations with explicitly defined APIs
+ */
public class InstanceAddressURL extends URL {
private ServiceInstance instance;
private MetadataInfo metadataInfo;
@@ -36,10 +39,20 @@ public class InstanceAddressURL extends URL {
ServiceInstance instance,
MetadataInfo metadataInfo
) {
+// super()
this.instance = instance;
this.metadataInfo = metadataInfo;
- this.setHost(instance.getHost());
- this.setPort(instance.getPort());
+ this.host = instance.getHost();
+ this.port = instance.getPort();
+ }
+
+
+ public ServiceInstance getInstance() {
+ return instance;
+ }
+
+ public MetadataInfo getMetadataInfo() {
+ return metadataInfo;
}
@Override
@@ -71,6 +84,12 @@ public class InstanceAddressURL extends URL {
}
@Override
+ public String getPath() {
+ MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+ return serviceInfo.getPath();
+ }
+
+ @Override
public String getParameter(String key) {
if (VERSION_KEY.equals(key)) {
return getVersion();
@@ -80,10 +99,7 @@ public class InstanceAddressURL extends URL {
return getServiceInterface();
}
- String value = getConsumerParameters().get(key);
- if (StringUtils.isEmpty(value)) {
- value = getInstanceMetadata().get(key);
- }
+ String value = getInstanceMetadata().get(key);
if (StringUtils.isEmpty(value) && metadataInfo != null) {
value = metadataInfo.getParameter(key, RpcContext.getContext().getProtocolServiceKey());
}
@@ -109,15 +125,52 @@ public class InstanceAddressURL extends URL {
@Override
public String getMethodParameter(String method, String key) {
- String value = getMethodParameter(method, key);
+ MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+ String value = serviceInfo.getMethodParameter(method, key, null);
if (StringUtils.isNotEmpty(value)) {
return value;
}
- MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getServiceKey());
- return serviceInfo.getMethodParameter(method, key, null);
+ return getParameter(key);
}
@Override
+ public boolean hasMethodParameter(String method, String key) {
+ MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+
+ if (method == null) {
+ String suffix = "." + key;
+ for (String fullKey : getParameters().keySet()) {
+ if (fullKey.endsWith(suffix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+ if (key == null) {
+ String prefix = method + ".";
+ for (String fullKey : getParameters().keySet()) {
+ if (fullKey.startsWith(prefix)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ return serviceInfo.hasMethodParameter(method, key);
+ }
+
+ @Override
+ public boolean hasMethodParameter(String method) {
+ MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+ return serviceInfo.hasMethodParameter(method);
+ }
+
+ /**
+ * Avoid calling this method in RPC call.
+ *
+ * @return
+ */
+ @Override
public Map<String, String> getParameters() {
Map<String, String> instanceParams = getInstanceMetadata();
Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(RpcContext.getContext().getProtocolServiceKey()));
@@ -130,8 +183,6 @@ public class InstanceAddressURL extends URL {
if (metadataParams != null) {
params.putAll(metadataParams);
}
-
- params.putAll(getConsumerParameters());
return params;
}
@@ -139,32 +190,56 @@ public class InstanceAddressURL extends URL {
return this.instance.getMetadata();
}
- private Map<String, String> getConsumerParameters() {
- return RpcContext.getContext().getConsumerUrl().getParameters();
- }
+ @Override
+ public URL addParameter(String key, String value) {
+ if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+ return this;
+ }
- private String getConsumerParameter(String key) {
- return RpcContext.getContext().getConsumerUrl().getParameter(key);
+ String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+ getMetadataInfo().getServiceInfo(protocolServiceKey).addParameter(key, value);
+ return this;
}
- private String getConsumerMethodParameter(String method, String key) {
- return RpcContext.getContext().getConsumerUrl().getMethodParameter(method, key);
+ @Override
+ public URL addParameterIfAbsent(String key, String value) {
+ if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+ return this;
+ }
+
+ String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+ getMetadataInfo().getServiceInfo(protocolServiceKey).addParameterIfAbsent(key, value);
+ return this;
}
- @Override
- public URL addParameter(String key, String value) {
- throw new UnsupportedOperationException("");
+ public URL addConsumerParams(Map<String, String> params) {
+ String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+ getMetadataInfo().getServiceInfo(protocolServiceKey).addConsumerParams(params);
+ return this;
}
@Override
public boolean equals(Object obj) {
// instance metadata equals
- // service metadata equals
- return super.equals(obj);
+ if (obj == null) {
+ return false;
+ }
+ if (!(obj instanceof InstanceAddressURL)) {
+ return false;
+ }
+
+ InstanceAddressURL that = (InstanceAddressURL) obj;
+
+ return this.getInstance().equals(that.getInstance());
}
@Override
public int hashCode() {
- return super.hashCode();
+ return getInstance().hashCode();
+ }
+
+ @Override
+ public String toString() {
+ return super.toString();
}
}
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 9b129f9..7d4d48e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -52,6 +52,8 @@ import static java.util.Collections.emptySet;
import static java.util.Collections.unmodifiableSet;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Stream.of;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPERATOR;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MAPPING_KEY;
@@ -316,7 +318,7 @@ public class ServiceDiscoveryRegistry implements Registry {
List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
serviceListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
});
- listener.notify(serviceListener.getUrls(url.getProtocolServiceKey()));
+ listener.notify(serviceListener.getUrls(url.getServiceKey() + GROUP_CHAR_SEPERATOR + url.getParameter(PROTOCOL_KEY, DUBBO)));
serviceListener.addListener(url.getProtocolServiceKey(), listener);
registerServiceInstancesChangedListener(url, serviceListener);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index 3fd7713..e01a36e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -45,7 +45,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
private static final Logger logger = LoggerFactory.getLogger(ServiceDiscoveryRegistryDirectory.class);
// Map<url, Invoker> cache service url to invoker mapping.
- private volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+ private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
private ServiceInstancesChangedListener listener;
@@ -75,11 +75,12 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
destroyAllInvokers(); // Close all invokers
} else {
this.forbidden = false; // Allow to access
- Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
if (CollectionUtils.isEmpty(invokerUrls)) {
return;
}
- Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
+
+ Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
@@ -109,51 +110,63 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
* @param urls
* @return invokers
*/
- private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
- Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+ private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
+ Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
if (urls == null || urls.isEmpty()) {
return newUrlInvokerMap;
}
for (URL url : urls) {
- if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
+ InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
+ if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
continue;
}
- if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(url.getProtocol())) {
- logger.error(new IllegalStateException("Unsupported protocol " + url.getProtocol() +
- " in notified url: " + url + " from registry " + getUrl().getAddress() +
+ if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) {
+ logger.error(new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() +
+ " in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() +
" to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
continue;
}
- if (urlInvokerMap != null && urlInvokerMap.containsKey(url)) { // Repeated url
- continue;
- }
- Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(url);
- if (invoker == null) { // Not in the cache, refer again
+ // FIXME, some keys may need to be removed.
+ instanceAddressURL.addConsumerParams(queryMap);
+
+ Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(instanceAddressURL.getAddress());
+ if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again
try {
boolean enabled = true;
- if (url.hasParameter(DISABLED_KEY)) {
- enabled = !url.getParameter(DISABLED_KEY, false);
+ if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
+ enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
} else {
- enabled = url.getParameter(ENABLED_KEY, true);
+ enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
}
if (enabled) {
- invoker = protocol.refer(serviceType, url);
+ invoker = protocol.refer(serviceType, instanceAddressURL);
}
} catch (Throwable t) {
- logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
+ logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
}
if (invoker != null) { // Put new invoker in cache
- newUrlInvokerMap.put(url, invoker);
+ newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
}
} else {
- newUrlInvokerMap.put(url, invoker);
+ newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
}
}
return newUrlInvokerMap;
}
+ private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL) {
+ InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl();
+
+ if (!newURL.getInstance().equals(oldURL.getInstance())) {
+ return true;
+ }
+
+ return !oldURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey())
+ .equals(newURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey()));
+ }
+
private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
return invokers;
}
@@ -162,7 +175,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
* Close all invokers
*/
private void destroyAllInvokers() {
- Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+ Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
if (localUrlInvokerMap != null) {
for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
try {
@@ -183,16 +196,16 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
* @param oldUrlInvokerMap
* @param newUrlInvokerMap
*/
- private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) {
+ private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
destroyAllInvokers();
return;
}
// check deleted invoker
- List<URL> deleted = null;
+ List<String> deleted = null;
if (oldUrlInvokerMap != null) {
Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
- for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
+ for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
if (!newInvokers.contains(entry.getValue())) {
if (deleted == null) {
deleted = new ArrayList<>();
@@ -203,9 +216,9 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
}
if (deleted != null) {
- for (URL url : deleted) {
- if (url != null) {
- Invoker<T> invoker = oldUrlInvokerMap.remove(url);
+ for (String addressKey : deleted) {
+ if (addressKey != null) {
+ Invoker<T> invoker = oldUrlInvokerMap.remove(addressKey);
if (invoker != null) {
try {
invoker.destroy();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
index 57010cf..1afb486 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
@@ -79,12 +79,7 @@ public class ServiceInstanceMetadataUtils {
/**
* The property name of The revision for all exported Dubbo services.
*/
- public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision";
-
- /**
- * The property name of The revision for all subscribed Dubbo services.
- */
- public static String SUBSCRIBER_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision";
+ public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision";
/**
* The property name of metadata storage type.
@@ -163,17 +158,6 @@ public class ServiceInstanceMetadataUtils {
}
/**
- * The revision for all subscribed Dubbo services from the specified {@link ServiceInstance}.
- *
- * @param serviceInstance the specified {@link ServiceInstance}
- * @return <code>null</code> if not exits
- */
- public static String getSubscribedServicesRevision(ServiceInstance serviceInstance) {
- Map<String, String> metadata = serviceInstance.getMetadata();
- return metadata.get(SUBSCRIBER_SERVICES_REVISION_PROPERTY_NAME);
- }
-
- /**
* Get metadata's storage type
*
* @param registryURL the {@link URL} to connect the registry
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 8268361..80aadc8 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -43,10 +43,14 @@ import java.util.Map;
import java.util.Set;
import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
@@ -122,7 +126,10 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
private URL turnRegistryUrlToConsumerUrl(URL url) {
return URLBuilder.from(url)
- .setPath(url.getServiceInterface())
+ .setHost(queryMap.get(REGISTER_IP_KEY))
+ .setPort(0)
+ .setProtocol(queryMap.get(PROTOCOL_KEY) == null ? DUBBO : queryMap.get(PROTOCOL_KEY))
+ .setPath(queryMap.get(INTERFACE_KEY))
.clearParameters()
.addParameters(queryMap)
.removeParameter(MONITOR_KEY)
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 7a06106..1062768 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -394,7 +394,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyL
providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
// The combination of directoryUrl and override is at the end of notify, which can't be handled here
- this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
+// this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
if ((providerUrl.getPath() == null || providerUrl.getPath()
.length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
index 36bcc74..1e3b278 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
@@ -105,7 +105,7 @@ public class Exchangers {
if (handler == null) {
throw new IllegalArgumentException("handler == null");
}
- url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
+// url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
return getExchanger(url).connect(url, handler);
}
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
index 94738a8..53ce72f 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
@@ -51,7 +51,7 @@ public abstract class AbstractEndpoint extends AbstractPeer implements Resetable
}
protected static Codec2 getChannelCodec(URL url) {
- String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
+ String codecName = url.getProtocol(); // codec extension name must stay the same with protocol name
if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
} else {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
index b5c9e67..0eae664 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
@@ -67,6 +67,7 @@ public class InvokerInvocationHandler implements InvocationHandler {
String serviceKey = invoker.getUrl().getServiceKey();
rpcInvocation.setTargetServiceUniqueName(serviceKey);
+ // invoker.getUrl() returns consumer url.
RpcContext.setRpcContext(invoker.getUrl());
if (consumerModel != null) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 83190d2..e095523 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -572,9 +572,10 @@ public class DubboProtocol extends AbstractProtocol {
// client type setting.
String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
- url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
+// url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
// enable heartbeat by default
- url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
+ // FIXME,
+// url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
// BIO is not allowed since it has severe performance issue.
if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
index 1737d54..2758c5e 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
@@ -191,7 +191,7 @@ public class ThriftProtocol extends AbstractProtocol {
ExchangeClient client;
- url = url.addParameter(CODEC_KEY, ThriftCodec.NAME);
+// url = url.addParameter(CODEC_KEY, ThriftCodec.NAME);
try {
client = Exchangers.connect(url);