You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@dubbo.apache.org by li...@apache.org on 2020/07/21 05:57:24 UTC

[dubbo] 03/04: can basically work with InstanceAddressURL

This is an automated email from the ASF dual-hosted git repository.

liujun pushed a commit to branch 3.0
in repository https://gitbox.apache.org/repos/asf/dubbo.git

commit 42f0529054187227dedd153b14d2b7b4605b8ff4
Author: ken.lj <ke...@gmail.com>
AuthorDate: Tue Jul 14 01:51:51 2020 +0800

    can basically work with InstanceAddressURL
---
 .../src/main/java/org/apache/dubbo/common/URL.java |  12 +-
 .../org/apache/dubbo/metadata/MetadataInfo.java    | 118 ++++++++++++++++++--
 .../registry/client/DefaultServiceInstance.java    |  27 +++--
 .../dubbo/registry/client/InstanceAddressURL.java  | 123 +++++++++++++++++----
 .../registry/client/ServiceDiscoveryRegistry.java  |   4 +-
 .../client/ServiceDiscoveryRegistryDirectory.java  |  69 +++++++-----
 .../metadata/ServiceInstanceMetadataUtils.java     |  18 +--
 .../registry/integration/DynamicDirectory.java     |   9 +-
 .../registry/integration/RegistryDirectory.java    |   2 +-
 .../apache/dubbo/remoting/exchange/Exchangers.java |   2 +-
 .../dubbo/remoting/transport/AbstractEndpoint.java |   2 +-
 .../dubbo/rpc/proxy/InvokerInvocationHandler.java  |   1 +
 .../dubbo/rpc/protocol/dubbo/DubboProtocol.java    |   5 +-
 .../dubbo/rpc/protocol/thrift/ThriftProtocol.java  |   2 +-
 14 files changed, 292 insertions(+), 102 deletions(-)

diff --git a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
index 5f811fa..9742ebb 100644
--- a/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
+++ b/dubbo-common/src/main/java/org/apache/dubbo/common/URL.java
@@ -93,19 +93,19 @@ class URL implements Serializable {
 
     private static final long serialVersionUID = -1985165475234910535L;
 
-    private final String protocol;
+    protected String protocol;
 
-    private final String username;
+    protected String username;
 
-    private final String password;
+    protected String password;
 
     // by default, host to registry
-    private final String host;
+    protected String host;
 
     // by default, port to registry
-    private final int port;
+    protected int port;
 
-    private final String path;
+    protected String path;
 
     private final Map<String, String> parameters;
 
diff --git a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
index 4db0298..931a331 100644
--- a/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
+++ b/dubbo-metadata/dubbo-metadata-api/src/main/java/org/apache/dubbo/metadata/MetadataInfo.java
@@ -29,6 +29,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.SortedSet;
 import java.util.TreeSet;
 import java.util.concurrent.atomic.AtomicBoolean;
@@ -154,7 +155,13 @@ public class MetadataInfo implements Serializable {
         if (serviceInfo == null) {
             return Collections.emptyMap();
         }
-        return serviceInfo.getParams();
+        return serviceInfo.getAllParams();
+    }
+
+    @Override
+    public String toString() {
+        // FIXME
+        return super.toString();
     }
 
     public static class ServiceInfo implements Serializable {
@@ -163,9 +170,12 @@ public class MetadataInfo implements Serializable {
         private String group;
         private String version;
         private String protocol;
+        private String path; // most of the time, path is the same with the interface name.
         private Map<String, String> params;
 
+        private transient Map<String, String> consumerParams;
         private transient Map<String, Map<String, String>> methodParams;
+        private transient Map<String, Map<String, String>> consumerMethodParams;
         private transient String serviceKey;
         private transient String matchKey;
 
@@ -173,14 +183,7 @@ public class MetadataInfo implements Serializable {
         }
 
         public ServiceInfo(URL url) {
-            // FIXME, how to match registry.
-            this(
-                    url.getServiceInterface(),
-                    url.getParameter(GROUP_KEY),
-                    url.getParameter(VERSION_KEY),
-                    url.getProtocol(),
-                    null
-            );
+            this(url.getServiceInterface(), url.getParameter(GROUP_KEY), url.getParameter(VERSION_KEY), url.getProtocol(), url.getPath(), null);
 
             Map<String, String> params = new HashMap<>();
             List<MetadataParamsFilter> filters = loader.getActivateExtension(url, "params-filter");
@@ -207,11 +210,12 @@ public class MetadataInfo implements Serializable {
             this.params = params;
         }
 
-        public ServiceInfo(String name, String group, String version, String protocol, Map<String, String> params) {
+        public ServiceInfo(String name, String group, String version, String protocol, String path, Map<String, String> params) {
             this.name = name;
             this.group = group;
             this.version = version;
             this.protocol = protocol;
+            this.path = path;
             this.params = params == null ? new HashMap<>() : params;
 
             this.serviceKey = URL.buildKey(name, group, version);
@@ -266,6 +270,14 @@ public class MetadataInfo implements Serializable {
             this.version = version;
         }
 
+        public String getPath() {
+            return path;
+        }
+
+        public void setPath(String path) {
+            this.path = path;
+        }
+
         public Map<String, String> getParams() {
             if (params == null) {
                 return Collections.emptyMap();
@@ -277,16 +289,42 @@ public class MetadataInfo implements Serializable {
             this.params = params;
         }
 
+        public Map<String, String> getAllParams() {
+            if (consumerParams != null) {
+                Map<String, String> allParams = new HashMap<>((int) ((params.size() + consumerParams.size()) / 0.75f + 1));
+                allParams.putAll(params);
+                allParams.putAll(consumerParams);
+                return allParams;
+            }
+            return params;
+        }
+
         public String getParameter(String key) {
+            if (consumerParams != null) {
+                String value = consumerParams.get(key);
+                if (value != null) {
+                    return value;
+                }
+            }
             return params.get(key);
         }
 
         public String getMethodParameter(String method, String key, String defaultValue) {
             if (methodParams == null) {
                 methodParams = URL.toMethodParameters(params);
+                consumerMethodParams = URL.toMethodParameters(consumerParams);
+            }
+
+            String value = getMethodParameter(method, key, consumerMethodParams);
+            if (value != null) {
+                return value;
             }
+            value = getMethodParameter(method, key, methodParams);
+            return value == null ? defaultValue : value;
+        }
 
-            Map<String, String> keyMap = methodParams.get(method);
+        private String getMethodParameter(String method, String key, Map<String, Map<String, String>> map) {
+            Map<String, String> keyMap = map.get(method);
             String value = null;
             if (keyMap != null) {
                 value = keyMap.get(key);
@@ -294,7 +332,21 @@ public class MetadataInfo implements Serializable {
             if (StringUtils.isEmpty(value)) {
                 value = getParameter(key);
             }
-            return value == null ? defaultValue : value;
+            return value;
+        }
+
+        public boolean hasMethodParameter(String method, String key) {
+            String value = this.getMethodParameter(method, key, (String) null);
+            return StringUtils.isNotEmpty(value);
+        }
+
+        public boolean hasMethodParameter(String method) {
+            if (methodParams == null) {
+                methodParams = URL.toMethodParameters(params);
+                consumerMethodParams = URL.toMethodParameters(consumerParams);
+            }
+
+            return consumerMethodParams.containsKey(method) || methodParams.containsKey(method);
         }
 
         public String toDescString() {
@@ -310,5 +362,47 @@ public class MetadataInfo implements Serializable {
             }
             return methodStrings.toString();
         }
+
+        public void addParameter(String key, String value) {
+            if (consumerParams != null) {
+                this.consumerParams.put(key, value);
+            }
+        }
+
+        public void addParameterIfAbsent(String key, String value) {
+            if (consumerParams != null) {
+                this.consumerParams.putIfAbsent(key, value);
+            }
+        }
+
+        public void addConsumerParams(Map<String, String> params) {
+            // copy once for one service subscription
+            if (consumerParams == null) {
+                consumerParams = new HashMap<>(params);
+            }
+        }
+
+        @Override
+        public boolean equals(Object obj) {
+            if (obj == null) {
+                return false;
+            }
+            if (!(obj instanceof ServiceInfo)) {
+                return false;
+            }
+
+            ServiceInfo serviceInfo = (ServiceInfo) obj;
+            return this.getMatchKey().equals(serviceInfo.getMatchKey()) && this.getParams().equals(serviceInfo.getParams());
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(getMatchKey(), getParams());
+        }
+
+        @Override
+        public String toString() {
+            return super.toString();
+        }
     }
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
index e44bd4f..3ee5dd2 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/DefaultServiceInstance.java
@@ -22,6 +22,8 @@ import java.util.HashMap;
 import java.util.Map;
 import java.util.Objects;
 
+import static org.apache.dubbo.common.constants.CommonConstants.REVISION_KEY;
+
 /**
  * The default implementation of {@link ServiceInstance}.
  *
@@ -166,18 +168,29 @@ public class DefaultServiceInstance implements ServiceInstance {
         if (this == o) return true;
         if (!(o instanceof DefaultServiceInstance)) return false;
         DefaultServiceInstance that = (DefaultServiceInstance) o;
-        return isEnabled() == that.isEnabled() &&
-                isHealthy() == that.isHealthy() &&
-                Objects.equals(getId(), that.getId()) &&
-                Objects.equals(getServiceName(), that.getServiceName()) &&
+        boolean equals = Objects.equals(getServiceName(), that.getServiceName()) &&
                 Objects.equals(getHost(), that.getHost()) &&
-                Objects.equals(getPort(), that.getPort()) &&
-                Objects.equals(getMetadata(), that.getMetadata());
+                Objects.equals(getPort(), that.getPort());
+        for (Map.Entry<String, String> entry : this.getMetadata().entrySet()) {
+            if (entry.getKey().equals(REVISION_KEY)) {
+                continue;
+            }
+            equals = equals && !entry.getValue().equals(that.getMetadata().get(entry.getKey()));
+        }
+
+        return equals;
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(getId(), getServiceName(), getHost(), getPort(), isEnabled(), isHealthy(), getMetadata());
+        int result = Objects.hash(getServiceName(), getHost(), getPort());
+        for (Map.Entry<String, String> entry : this.getMetadata().entrySet()) {
+            if (entry.getKey().equals(REVISION_KEY)) {
+                continue;
+            }
+            result = 31 * result + (entry.getValue() == null ? 0 : entry.getValue().hashCode());
+        }
+        return result;
     }
 
     @Override
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
index 18632bf..149c6e2 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/InstanceAddressURL.java
@@ -28,6 +28,9 @@ import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.VERSION_KEY;
 
+/**
+ * FIXME, replace RpcContext operations with explicitly defined APIs
+ */
 public class InstanceAddressURL extends URL {
     private ServiceInstance instance;
     private MetadataInfo metadataInfo;
@@ -36,10 +39,20 @@ public class InstanceAddressURL extends URL {
             ServiceInstance instance,
             MetadataInfo metadataInfo
     ) {
+//        super()
         this.instance = instance;
         this.metadataInfo = metadataInfo;
-        this.setHost(instance.getHost());
-        this.setPort(instance.getPort());
+        this.host = instance.getHost();
+        this.port = instance.getPort();
+    }
+
+
+    public ServiceInstance getInstance() {
+        return instance;
+    }
+
+    public MetadataInfo getMetadataInfo() {
+        return metadataInfo;
     }
 
     @Override
@@ -71,6 +84,12 @@ public class InstanceAddressURL extends URL {
     }
 
     @Override
+    public String getPath() {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+        return serviceInfo.getPath();
+    }
+
+    @Override
     public String getParameter(String key) {
         if (VERSION_KEY.equals(key)) {
             return getVersion();
@@ -80,10 +99,7 @@ public class InstanceAddressURL extends URL {
             return getServiceInterface();
         }
 
-        String value = getConsumerParameters().get(key);
-        if (StringUtils.isEmpty(value)) {
-            value = getInstanceMetadata().get(key);
-        }
+        String value = getInstanceMetadata().get(key);
         if (StringUtils.isEmpty(value) && metadataInfo != null) {
             value = metadataInfo.getParameter(key, RpcContext.getContext().getProtocolServiceKey());
         }
@@ -109,15 +125,52 @@ public class InstanceAddressURL extends URL {
 
     @Override
     public String getMethodParameter(String method, String key) {
-        String value = getMethodParameter(method, key);
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+        String value = serviceInfo.getMethodParameter(method, key, null);
         if (StringUtils.isNotEmpty(value)) {
             return value;
         }
-        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getServiceKey());
-        return serviceInfo.getMethodParameter(method, key, null);
+        return getParameter(key);
     }
 
     @Override
+    public boolean hasMethodParameter(String method, String key) {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+
+        if (method == null) {
+            String suffix = "." + key;
+            for (String fullKey : getParameters().keySet()) {
+                if (fullKey.endsWith(suffix)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+        if (key == null) {
+            String prefix = method + ".";
+            for (String fullKey : getParameters().keySet()) {
+                if (fullKey.startsWith(prefix)) {
+                    return true;
+                }
+            }
+            return false;
+        }
+
+        return serviceInfo.hasMethodParameter(method, key);
+    }
+
+    @Override
+    public boolean hasMethodParameter(String method) {
+        MetadataInfo.ServiceInfo serviceInfo = metadataInfo.getServiceInfo(getProtocolServiceKey());
+        return serviceInfo.hasMethodParameter(method);
+    }
+
+    /**
+     * Avoid calling this method in RPC call.
+     *
+     * @return
+     */
+    @Override
     public Map<String, String> getParameters() {
         Map<String, String> instanceParams = getInstanceMetadata();
         Map<String, String> metadataParams = (metadataInfo == null ? new HashMap<>() : metadataInfo.getParameters(RpcContext.getContext().getProtocolServiceKey()));
@@ -130,8 +183,6 @@ public class InstanceAddressURL extends URL {
         if (metadataParams != null) {
             params.putAll(metadataParams);
         }
-
-        params.putAll(getConsumerParameters());
         return params;
     }
 
@@ -139,32 +190,56 @@ public class InstanceAddressURL extends URL {
         return this.instance.getMetadata();
     }
 
-    private Map<String, String> getConsumerParameters() {
-        return RpcContext.getContext().getConsumerUrl().getParameters();
-    }
+    @Override
+    public URL addParameter(String key, String value) {
+        if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+            return this;
+        }
 
-    private String getConsumerParameter(String key) {
-        return RpcContext.getContext().getConsumerUrl().getParameter(key);
+        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+        getMetadataInfo().getServiceInfo(protocolServiceKey).addParameter(key, value);
+        return this;
     }
 
-    private String getConsumerMethodParameter(String method, String key) {
-        return RpcContext.getContext().getConsumerUrl().getMethodParameter(method, key);
+    @Override
+    public URL addParameterIfAbsent(String key, String value) {
+        if (StringUtils.isEmpty(key) || StringUtils.isEmpty(value)) {
+            return this;
+        }
+
+        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+        getMetadataInfo().getServiceInfo(protocolServiceKey).addParameterIfAbsent(key, value);
+        return this;
     }
 
-    @Override
-    public URL addParameter(String key, String value) {
-        throw new UnsupportedOperationException("");
+    public URL addConsumerParams(Map<String, String> params) {
+        String protocolServiceKey = RpcContext.getContext().getProtocolServiceKey();
+        getMetadataInfo().getServiceInfo(protocolServiceKey).addConsumerParams(params);
+        return this;
     }
 
     @Override
     public boolean equals(Object obj) {
         // instance metadata equals
-        // service metadata equals
-        return super.equals(obj);
+        if (obj == null) {
+            return false;
+        }
+        if (!(obj instanceof InstanceAddressURL)) {
+            return false;
+        }
+
+        InstanceAddressURL that = (InstanceAddressURL) obj;
+
+        return this.getInstance().equals(that.getInstance());
     }
 
     @Override
     public int hashCode() {
-        return super.hashCode();
+        return getInstance().hashCode();
+    }
+
+    @Override
+    public String toString() {
+        return super.toString();
     }
 }
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
index 9b129f9..7d4d48e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistry.java
@@ -52,6 +52,8 @@ import static java.util.Collections.emptySet;
 import static java.util.Collections.unmodifiableSet;
 import static java.util.stream.Collectors.toSet;
 import static java.util.stream.Stream.of;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
+import static org.apache.dubbo.common.constants.CommonConstants.GROUP_CHAR_SEPERATOR;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.MAPPING_KEY;
@@ -316,7 +318,7 @@ public class ServiceDiscoveryRegistry implements Registry {
             List<ServiceInstance> serviceInstances = serviceDiscovery.getInstances(serviceName);
             serviceListener.onEvent(new ServiceInstancesChangedEvent(serviceName, serviceInstances));
         });
-        listener.notify(serviceListener.getUrls(url.getProtocolServiceKey()));
+        listener.notify(serviceListener.getUrls(url.getServiceKey() + GROUP_CHAR_SEPERATOR + url.getParameter(PROTOCOL_KEY, DUBBO)));
 
         serviceListener.addListener(url.getProtocolServiceKey(), listener);
         registerServiceInstancesChangedListener(url, serviceListener);
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
index 3fd7713..e01a36e 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/ServiceDiscoveryRegistryDirectory.java
@@ -45,7 +45,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
     private static final Logger logger = LoggerFactory.getLogger(ServiceDiscoveryRegistryDirectory.class);
 
     // Map<url, Invoker> cache service url to invoker mapping.
-    private volatile Map<URL, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
+    private volatile Map<String, Invoker<T>> urlInvokerMap; // The initial value is null and the midway may be assigned to null, please use the local variable reference
 
     private ServiceInstancesChangedListener listener;
 
@@ -75,11 +75,12 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
             destroyAllInvokers(); // Close all invokers
         } else {
             this.forbidden = false; // Allow to access
-            Map<URL, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
+            Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference
             if (CollectionUtils.isEmpty(invokerUrls)) {
                 return;
             }
-            Map<URL, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
+
+            Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls);// Translate url list to Invoker map
 
             if (CollectionUtils.isEmptyMap(newUrlInvokerMap)) {
                 logger.error(new IllegalStateException("Cannot create invokers from url address list (total " + invokerUrls.size() + ")"));
@@ -109,51 +110,63 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
      * @param urls
      * @return invokers
      */
-    private Map<URL, Invoker<T>> toInvokers(List<URL> urls) {
-        Map<URL, Invoker<T>> newUrlInvokerMap = new HashMap<>();
+    private Map<String, Invoker<T>> toInvokers(List<URL> urls) {
+        Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<>();
         if (urls == null || urls.isEmpty()) {
             return newUrlInvokerMap;
         }
         for (URL url : urls) {
-            if (EMPTY_PROTOCOL.equals(url.getProtocol())) {
+            InstanceAddressURL instanceAddressURL = (InstanceAddressURL) url;
+            if (EMPTY_PROTOCOL.equals(instanceAddressURL.getProtocol())) {
                 continue;
             }
-            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(url.getProtocol())) {
-                logger.error(new IllegalStateException("Unsupported protocol " + url.getProtocol() +
-                        " in notified url: " + url + " from registry " + getUrl().getAddress() +
+            if (!ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(instanceAddressURL.getProtocol())) {
+                logger.error(new IllegalStateException("Unsupported protocol " + instanceAddressURL.getProtocol() +
+                        " in notified url: " + instanceAddressURL + " from registry " + getUrl().getAddress() +
                         " to consumer " + NetUtils.getLocalHost() + ", supported protocol: " +
                         ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions()));
                 continue;
             }
 
-            if (urlInvokerMap != null && urlInvokerMap.containsKey(url)) { // Repeated url
-                continue;
-            }
-            Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(url);
-            if (invoker == null) { // Not in the cache, refer again
+            // FIXME, some keys may need to be removed.
+            instanceAddressURL.addConsumerParams(queryMap);
+
+            Invoker<T> invoker = urlInvokerMap == null ? null : urlInvokerMap.get(instanceAddressURL.getAddress());
+            if (invoker == null || urlChanged(invoker, instanceAddressURL)) { // Not in the cache, refer again
                 try {
                     boolean enabled = true;
-                    if (url.hasParameter(DISABLED_KEY)) {
-                        enabled = !url.getParameter(DISABLED_KEY, false);
+                    if (instanceAddressURL.hasParameter(DISABLED_KEY)) {
+                        enabled = !instanceAddressURL.getParameter(DISABLED_KEY, false);
                     } else {
-                        enabled = url.getParameter(ENABLED_KEY, true);
+                        enabled = instanceAddressURL.getParameter(ENABLED_KEY, true);
                     }
                     if (enabled) {
-                        invoker = protocol.refer(serviceType, url);
+                        invoker = protocol.refer(serviceType, instanceAddressURL);
                     }
                 } catch (Throwable t) {
-                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + url + ")" + t.getMessage(), t);
+                    logger.error("Failed to refer invoker for interface:" + serviceType + ",url:(" + instanceAddressURL + ")" + t.getMessage(), t);
                 }
                 if (invoker != null) { // Put new invoker in cache
-                    newUrlInvokerMap.put(url, invoker);
+                    newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
                 }
             } else {
-                newUrlInvokerMap.put(url, invoker);
+                newUrlInvokerMap.put(instanceAddressURL.getAddress(), invoker);
             }
         }
         return newUrlInvokerMap;
     }
 
+    private boolean urlChanged(Invoker<T> invoker, InstanceAddressURL newURL) {
+        InstanceAddressURL oldURL = (InstanceAddressURL) invoker.getUrl();
+
+        if (!newURL.getInstance().equals(oldURL.getInstance())) {
+            return true;
+        }
+
+        return !oldURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey())
+                .equals(newURL.getMetadataInfo().getServiceInfo(getConsumerUrl().getProtocolServiceKey()));
+    }
+
     private List<Invoker<T>> toMergeInvokerList(List<Invoker<T>> invokers) {
         return invokers;
     }
@@ -162,7 +175,7 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
      * Close all invokers
      */
     private void destroyAllInvokers() {
-        Map<URL, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
+        Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference
         if (localUrlInvokerMap != null) {
             for (Invoker<T> invoker : new ArrayList<>(localUrlInvokerMap.values())) {
                 try {
@@ -183,16 +196,16 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
      * @param oldUrlInvokerMap
      * @param newUrlInvokerMap
      */
-    private void destroyUnusedInvokers(Map<URL, Invoker<T>> oldUrlInvokerMap, Map<URL, Invoker<T>> newUrlInvokerMap) {
+    private void destroyUnusedInvokers(Map<String, Invoker<T>> oldUrlInvokerMap, Map<String, Invoker<T>> newUrlInvokerMap) {
         if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0) {
             destroyAllInvokers();
             return;
         }
         // check deleted invoker
-        List<URL> deleted = null;
+        List<String> deleted = null;
         if (oldUrlInvokerMap != null) {
             Collection<Invoker<T>> newInvokers = newUrlInvokerMap.values();
-            for (Map.Entry<URL, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
+            for (Map.Entry<String, Invoker<T>> entry : oldUrlInvokerMap.entrySet()) {
                 if (!newInvokers.contains(entry.getValue())) {
                     if (deleted == null) {
                         deleted = new ArrayList<>();
@@ -203,9 +216,9 @@ public class ServiceDiscoveryRegistryDirectory<T> extends DynamicDirectory<T> im
         }
 
         if (deleted != null) {
-            for (URL url : deleted) {
-                if (url != null) {
-                    Invoker<T> invoker = oldUrlInvokerMap.remove(url);
+            for (String addressKey : deleted) {
+                if (addressKey != null) {
+                    Invoker<T> invoker = oldUrlInvokerMap.remove(addressKey);
                     if (invoker != null) {
                         try {
                             invoker.destroy();
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
index 57010cf..1afb486 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/client/metadata/ServiceInstanceMetadataUtils.java
@@ -79,12 +79,7 @@ public class ServiceInstanceMetadataUtils {
     /**
      * The property name of The revision for all exported Dubbo services.
      */
-    public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.exported-services.revision";
-
-    /**
-     * The property name of The revision for all subscribed Dubbo services.
-     */
-    public static String SUBSCRIBER_SERVICES_REVISION_PROPERTY_NAME = "dubbo.subscribed-services.revision";
+    public static String EXPORTED_SERVICES_REVISION_PROPERTY_NAME = "dubbo.metadata.revision";
 
     /**
      * The property name of metadata storage type.
@@ -163,17 +158,6 @@ public class ServiceInstanceMetadataUtils {
     }
 
     /**
-     * The revision for all subscribed Dubbo services from the specified {@link ServiceInstance}.
-     *
-     * @param serviceInstance the specified {@link ServiceInstance}
-     * @return <code>null</code> if not exits
-     */
-    public static String getSubscribedServicesRevision(ServiceInstance serviceInstance) {
-        Map<String, String> metadata = serviceInstance.getMetadata();
-        return metadata.get(SUBSCRIBER_SERVICES_REVISION_PROPERTY_NAME);
-    }
-
-    /**
      * Get metadata's storage type
      *
      * @param registryURL the {@link URL} to connect the registry
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
index 8268361..80aadc8 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/DynamicDirectory.java
@@ -43,10 +43,14 @@ import java.util.Map;
 import java.util.Set;
 
 import static org.apache.dubbo.common.constants.CommonConstants.ANY_VALUE;
+import static org.apache.dubbo.common.constants.CommonConstants.DUBBO;
 import static org.apache.dubbo.common.constants.CommonConstants.GROUP_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.INTERFACE_KEY;
 import static org.apache.dubbo.common.constants.CommonConstants.MONITOR_KEY;
+import static org.apache.dubbo.common.constants.CommonConstants.PROTOCOL_KEY;
 import static org.apache.dubbo.common.constants.RegistryConstants.CATEGORY_KEY;
 import static org.apache.dubbo.common.constants.RegistryConstants.CONSUMERS_CATEGORY;
+import static org.apache.dubbo.registry.Constants.REGISTER_IP_KEY;
 import static org.apache.dubbo.registry.Constants.REGISTER_KEY;
 import static org.apache.dubbo.registry.Constants.SIMPLIFIED_KEY;
 import static org.apache.dubbo.registry.integration.RegistryProtocol.DEFAULT_REGISTER_CONSUMER_KEYS;
@@ -122,7 +126,10 @@ public abstract class DynamicDirectory<T> extends AbstractDirectory<T> implement
 
     private URL turnRegistryUrlToConsumerUrl(URL url) {
         return URLBuilder.from(url)
-                .setPath(url.getServiceInterface())
+                .setHost(queryMap.get(REGISTER_IP_KEY))
+                .setPort(0)
+                .setProtocol(queryMap.get(PROTOCOL_KEY) == null ? DUBBO : queryMap.get(PROTOCOL_KEY))
+                .setPath(queryMap.get(INTERFACE_KEY))
                 .clearParameters()
                 .addParameters(queryMap)
                 .removeParameter(MONITOR_KEY)
diff --git a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
index 7a06106..1062768 100644
--- a/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
+++ b/dubbo-registry/dubbo-registry-api/src/main/java/org/apache/dubbo/registry/integration/RegistryDirectory.java
@@ -394,7 +394,7 @@ public class RegistryDirectory<T> extends DynamicDirectory<T> implements NotifyL
         providerUrl = providerUrl.addParameter(Constants.CHECK_KEY, String.valueOf(false)); // Do not check whether the connection is successful or not, always create Invoker!
 
         // The combination of directoryUrl and override is at the end of notify, which can't be handled here
-        this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
+//        this.overrideDirectoryUrl = this.overrideDirectoryUrl.addParametersIfAbsent(providerUrl.getParameters()); // Merge the provider side parameters
 
         if ((providerUrl.getPath() == null || providerUrl.getPath()
                 .length() == 0) && DUBBO_PROTOCOL.equals(providerUrl.getProtocol())) { // Compatible version 1.0
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
index 36bcc74..1e3b278 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/exchange/Exchangers.java
@@ -105,7 +105,7 @@ public class Exchangers {
         if (handler == null) {
             throw new IllegalArgumentException("handler == null");
         }
-        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
+//        url = url.addParameterIfAbsent(Constants.CODEC_KEY, "exchange");
         return getExchanger(url).connect(url, handler);
     }
 
diff --git a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
index 94738a8..53ce72f 100644
--- a/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
+++ b/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/AbstractEndpoint.java
@@ -51,7 +51,7 @@ public abstract class AbstractEndpoint extends AbstractPeer implements Resetable
     }
 
     protected static Codec2 getChannelCodec(URL url) {
-        String codecName = url.getParameter(Constants.CODEC_KEY, "telnet");
+        String codecName = url.getProtocol(); // codec extension name must stay the same with protocol name
         if (ExtensionLoader.getExtensionLoader(Codec2.class).hasExtension(codecName)) {
             return ExtensionLoader.getExtensionLoader(Codec2.class).getExtension(codecName);
         } else {
diff --git a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
index b5c9e67..0eae664 100644
--- a/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
+++ b/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/proxy/InvokerInvocationHandler.java
@@ -67,6 +67,7 @@ public class InvokerInvocationHandler implements InvocationHandler {
         String serviceKey = invoker.getUrl().getServiceKey();
         rpcInvocation.setTargetServiceUniqueName(serviceKey);
 
+        // invoker.getUrl() returns consumer url.
         RpcContext.setRpcContext(invoker.getUrl());
 
         if (consumerModel != null) {
diff --git a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
index 83190d2..e095523 100644
--- a/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-dubbo/src/main/java/org/apache/dubbo/rpc/protocol/dubbo/DubboProtocol.java
@@ -572,9 +572,10 @@ public class DubboProtocol extends AbstractProtocol {
         // client type setting.
         String str = url.getParameter(CLIENT_KEY, url.getParameter(SERVER_KEY, DEFAULT_REMOTING_CLIENT));
 
-        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
+//        url = url.addParameter(CODEC_KEY, DubboCodec.NAME);
         // enable heartbeat by default
-        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
+        // FIXME,
+//        url = url.addParameterIfAbsent(HEARTBEAT_KEY, String.valueOf(DEFAULT_HEARTBEAT));
 
         // BIO is not allowed since it has severe performance issue.
         if (str != null && str.length() > 0 && !ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) {
diff --git a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
index 1737d54..2758c5e 100644
--- a/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
+++ b/dubbo-rpc/dubbo-rpc-thrift/src/main/java/org/apache/dubbo/rpc/protocol/thrift/ThriftProtocol.java
@@ -191,7 +191,7 @@ public class ThriftProtocol extends AbstractProtocol {
 
         ExchangeClient client;
 
-        url = url.addParameter(CODEC_KEY, ThriftCodec.NAME);
+//        url = url.addParameter(CODEC_KEY, ThriftCodec.NAME);
 
         try {
             client = Exchangers.connect(url);